Machine Learning with Apache Spark MLlib
Spark MLlib in ODP
Apache Spark 3.5.6 is included in ODP and deployed on YARN. Spark MLlib is Spark's built-in machine learning library — it runs distributed ML algorithms directly on your cluster, reading data from HDFS, Iceberg, and Hive tables, without requiring any additional software installation.
MLlib is well-suited for classical machine learning at scale: working with structured tabular data that would be impractical to process on a single machine. For deep learning workloads (neural networks, LLMs), teams typically use frameworks like PyTorch or TensorFlow on separate GPU infrastructure, reading data prepared by Spark from ODP storage.
This guide covers MLlib as it is available today in ODP with Spark 3.5.6.
Available Algorithms
MLlib provides a broad set of algorithms through the spark.ml Pipeline API (the newer, DataFrame-based API):
Classification
| Algorithm | Class |
|---|---|
| Logistic Regression | LogisticRegression |
| Decision Tree | DecisionTreeClassifier |
| Random Forest | RandomForestClassifier |
| Gradient-Boosted Trees | GBTClassifier |
| Linear SVM | LinearSVC |
| Naive Bayes | NaiveBayes |
| Multilayer Perceptron | MultilayerPerceptronClassifier |
| One-vs-Rest | OneVsRest |
Regression
| Algorithm | Class |
|---|---|
| Linear Regression | LinearRegression |
| Generalized Linear Regression | GeneralizedLinearRegression |
| Decision Tree Regression | DecisionTreeRegressor |
| Random Forest Regression | RandomForestRegressor |
| Gradient-Boosted Tree Regression | GBTRegressor |
| Isotonic Regression | IsotonicRegression |
| Survival Regression (AFT) | AFTSurvivalRegression |
Clustering
| Algorithm | Class |
|---|---|
| K-Means | KMeans |
| Bisecting K-Means | BisectingKMeans |
| Gaussian Mixture Model | GaussianMixture |
| Latent Dirichlet Allocation | LDA |
Collaborative Filtering
| Algorithm | Class |
|---|---|
| Alternating Least Squares | ALS |
Dimensionality Reduction
| Algorithm | API |
|---|---|
| Principal Component Analysis | PCA |
| Singular Value Decomposition | SVD (via RDD API) |
Feature Engineering Transformers
MLlib includes a rich set of transformers for feature engineering, usable in Pipelines:
StringIndexer,IndexToString: encode categorical variablesOneHotEncoder: one-hot encodingVectorAssembler: combine feature columns into a single vector columnStandardScaler,MinMaxScaler,MaxAbsScaler,RobustScaler: feature scalingTokenizer,HashingTF,IDF,Word2Vec: text feature extractionChiSqSelector,UnivariateFeatureSelector: feature selectionImputer: fill missing valuesBucketizer,QuantileDiscretizer: binning continuous featuresSQLTransformer: apply SQL expressions as a pipeline step
The ML Pipelines API
MLlib's Pipeline API chains transformers and estimators into a reproducible, serializable workflow. This is the recommended approach for all MLlib work.
# pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("mlpipeline-example").getOrCreate()
# --- 1. Load data from Iceberg table ---
df = spark.table("hive_catalog.ml_datasets.customer_churn")
# --- 2. Feature engineering ---
indexer = StringIndexer(inputCol="contract_type", outputCol="contract_type_idx")
assembler = VectorAssembler(
inputCols=["tenure", "monthly_charges", "total_charges", "contract_type_idx"],
outputCol="features_raw"
)
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
# --- 3. Model ---
rf = RandomForestClassifier(
labelCol="churn",
featuresCol="features",
numTrees=100,
maxDepth=10,
seed=42
)
# --- 4. Pipeline ---
pipeline = Pipeline(stages=[indexer, assembler, scaler, rf])
# --- 5. Train / test split ---
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# --- 6. Fit ---
model = pipeline.fit(train_df)
# --- 7. Evaluate ---
predictions = model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(
labelCol="churn", predictionCol="prediction", metricName="accuracy"
)
print(f"Accuracy: {evaluator.evaluate(predictions):.4f}")
# --- 8. Save the model ---
model.save("hdfs:///models/customer-churn-rf-v1")
Reading Training Data from Iceberg Tables
Iceberg is the recommended table format for ML training datasets in ODP. Benefits:
- Schema enforcement: prevents bad data from landing in training sets
- Time travel: retrain on an exact historical snapshot for reproducibility
- ACID transactions: safe concurrent writes from ingestion pipelines alongside reads from training jobs
Standard Iceberg Read (Latest Snapshot)
df = spark.table("hive_catalog.ml_datasets.sensor_readings")
Time Travel Read (Reproducible Training)
To train on data as it existed on a specific date (crucial for experiment reproducibility):
# By timestamp
df = spark.read \
.option("as-of-timestamp", "2025-09-01T00:00:00.000Z") \
.format("iceberg") \
.load("hive_catalog.ml_datasets.sensor_readings")
# By snapshot ID (from Iceberg metadata)
df = spark.read \
.option("snapshot-id", "5931985158436469021") \
.format("iceberg") \
.load("hive_catalog.ml_datasets.sensor_readings")
Store the snapshot ID alongside your model metadata so you can reproduce the exact training dataset at any point.
Writing Feature Tables Back to Iceberg
Feature tables (derived features, embeddings, predictions) can be written back to Iceberg:
predictions \
.select("customer_id", "prediction", "probability") \
.writeTo("hive_catalog.ml_predictions.churn_scores") \
.createOrReplace()
Feature Engineering with Spark SQL on Hive/Iceberg
For complex feature engineering, Spark SQL is often more readable than the DataFrame API. Spark SQL queries can reference both Hive tables and Iceberg tables:
spark.sql("""
CREATE OR REPLACE TABLE hive_catalog.ml_datasets.customer_features
USING iceberg AS
SELECT
c.customer_id,
c.tenure_months,
c.contract_type,
COUNT(t.transaction_id) AS total_transactions_6m,
SUM(t.amount) AS total_spend_6m,
AVG(t.amount) AS avg_transaction_amount,
MAX(t.amount) AS max_transaction_amount,
COUNT(CASE WHEN t.status = 'failed' THEN 1 END) AS failed_transactions,
c.churn_label
FROM hive_catalog.raw.customers c
LEFT JOIN hive_catalog.raw.transactions t
ON c.customer_id = t.customer_id
AND t.transaction_date >= date_sub(current_date(), 180)
GROUP BY
c.customer_id, c.tenure_months, c.contract_type, c.churn_label
""")
This creates a versioned, Ranger-governed feature table in Iceberg that downstream training jobs can read.
Saving and Loading MLlib Models
MLlib models are serializable and can be saved to and loaded from HDFS.
Saving a Model
# Save a fitted Pipeline model
model.save("hdfs:///models/churn-rf-pipeline-v2")
# Or to a specific HDFS path with versioning
import datetime
version = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
model.save(f"hdfs:///models/churn-rf-pipeline-{version}")
Loading a Model
from pyspark.ml import PipelineModel
# Load the saved model
loaded_model = PipelineModel.load("hdfs:///models/churn-rf-pipeline-v2")
# Apply to new data
new_predictions = loaded_model.transform(new_data_df)
Model Metadata in Atlas
While MLlib does not natively publish model metadata to Atlas, teams can create Atlas entities for model artifacts programmatically using the Atlas REST API. This links the model to its training dataset (the Iceberg snapshot ID) and creates a lineage chain: raw data → feature table → model.
Running ML Jobs via YARN
MLlib jobs run on YARN like any other Spark application. The key configuration parameters for ML workloads:
spark-submit \
--master yarn \
--deploy-mode cluster \
--name "customer-churn-training" \
--num-executors 10 \
--executor-cores 4 \
--executor-memory 16g \
--driver-memory 8g \
--conf spark.yarn.queue=ml-training \
--conf spark.sql.catalog.hive_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hive_catalog.type=hive \
--conf spark.sql.catalog.hive_catalog.uri=thrift://master02.example.com:9083 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=2 \
--conf spark.dynamicAllocation.maxExecutors=20 \
train_churn_model.py
YARN queue configuration: for production ML workloads, dedicate a YARN capacity queue for training jobs to prevent them from consuming all cluster resources. Configure queue capacity in the YARN Capacity Scheduler via Ambari.
Dynamic allocation: enable dynamic allocation for training jobs that have variable data sizes. Spark will scale executors up during computation-intensive phases and release them afterward.
Kerberos: on Kerberos-secured clusters, ensure that the spark-submit process has a valid Kerberos ticket or that the application uses a keytab:
spark-submit \
--principal ml-service@REALM.EXAMPLE.COM \
--keytab /etc/security/keytabs/ml-service.keytab \
... other options ...
Zeppelin Notebooks for ML Experimentation
Apache Zeppelin 0.10.2 is included in ODP and is the recommended tool for interactive ML experimentation before productionizing jobs.
Spark Interpreter for MLlib
Zeppelin's Spark interpreter gives you an interactive PySpark or Scala Spark session with full access to MLlib:
%pyspark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
df = spark.table("hive_catalog.ml_datasets.customer_features")
df.printSchema()
df.show(5)
Useful Zeppelin Patterns for ML
Inline visualization: Zeppelin's %sql paragraph can plot query results directly:
%sql
SELECT contract_type, AVG(churn_label) AS churn_rate
FROM hive_catalog.ml_datasets.customer_features
GROUP BY contract_type
ORDER BY churn_rate DESC
Parameter forms: Zeppelin supports dynamic form inputs, useful for iterating over hyperparameters:
%pyspark
# ${num_trees=100,100|200|300}
num_trees = int(z.input("num_trees", "100"))
rf = RandomForestClassifier(numTrees=num_trees, ...)
Notebook as training log: document your experiments in Zeppelin notebooks, saving each notebook version with the corresponding model HDFS path and Iceberg snapshot ID. This creates a lightweight experiment tracking record.
For more structured experiment tracking, teams may integrate external tools (MLflow, DVC) that store their metadata externally while reading data from ODP storage.
Hyperparameter Tuning
MLlib includes CrossValidator and TrainValidationSplit for hyperparameter tuning:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.build()
evaluator = BinaryClassificationEvaluator(labelCol="churn", metricName="areaUnderROC")
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5,
parallelism=4 # number of models to train in parallel
)
cv_model = cv.fit(train_df)
print(f"Best AUC: {evaluator.evaluate(cv_model.transform(test_df)):.4f}")
# Save the best model
cv_model.bestModel.save("hdfs:///models/churn-rf-best")
parallelism=4 tells Spark to evaluate 4 hyperparameter combinations simultaneously. Set this to a value that fits within your YARN queue capacity.