Iceberg with Apache Spark
Apache Spark 3.5.6 is the primary batch and streaming engine for Iceberg in ODP. This page covers configuration, read/write patterns, streaming, schema evolution, time travel, and partitioning strategies.
Spark-Iceberg Configuration in ODP
ODP preconfigures the Spark Iceberg catalog on edge and gateway nodes. The Hive Metastore-backed catalog is the standard catalog in ODP.
spark-defaults.conf (pre-configured in ODP)
The following settings are applied cluster-wide through Ambari (Spark2 → Configs → Custom spark-defaults):
# Iceberg extension
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
# Hive-backed catalog (default catalog name: spark_catalog)
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type=hive
# Additional named catalog pointing to Hive Metastore
spark.sql.catalog.hive_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_catalog.type=hive
spark.sql.catalog.hive_catalog.uri=thrift://master02.dev01.hadoop.clemlab.com:9083
spark.sql.catalog.hive_catalog.warehouse=hdfs:///warehouse/iceberg
When using a Polaris REST catalog (available in ODP 1.3.2.0 as tech preview):
spark.sql.catalog.polaris=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.polaris.catalog-impl=org.apache.iceberg.rest.RESTCatalog
spark.sql.catalog.polaris.uri=http://master03.dev01.hadoop.clemlab.com:8181/api/catalog
spark.sql.catalog.polaris.warehouse=odp_catalog
Reading and Writing Iceberg Tables from Spark
Create an Iceberg table
# PySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("iceberg-demo").getOrCreate()
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_catalog.iceberg_demo.events (
event_id BIGINT,
event_ts TIMESTAMP,
user_id STRING,
event_type STRING,
payload STRING
) USING iceberg
PARTITIONED BY (days(event_ts))
TBLPROPERTIES (
'write.format.default'='parquet',
'write.parquet.compression-codec'='zstd',
'write.metadata.compression-codec'='gzip'
)
""")
Write data
from pyspark.sql.functions import current_timestamp
df = spark.createDataFrame([
(1, "user_a", "click", '{"page": "/home"}'),
(2, "user_b", "view", '{"page": "/about"}'),
], ["event_id", "user_id", "event_type", "payload"])
df = df.withColumn("event_ts", current_timestamp())
df.writeTo("hive_catalog.iceberg_demo.events").append()
Read data
df = spark.read.table("hive_catalog.iceberg_demo.events")
df.show()
# Or with Spark SQL
spark.sql("SELECT * FROM hive_catalog.iceberg_demo.events WHERE event_type = 'click'").show()
Upsert (MERGE INTO)
spark.sql("""
MERGE INTO hive_catalog.iceberg_demo.events AS target
USING (SELECT * FROM staging_events) AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Spark Structured Streaming to Iceberg
Iceberg supports Spark Structured Streaming sinks. This is the recommended pattern for landing Kafka data into Iceberg tables in ODP.
# Read from Kafka
kafka_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "worker01.dev01.hadoop.clemlab.com:9092,"
"worker02.dev01.hadoop.clemlab.com:9092,"
"worker03.dev01.hadoop.clemlab.com:9092")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("subscribe", "events-topic")
.option("startingOffsets", "latest")
.load()
)
# Parse and transform
from pyspark.sql.functions import col, from_json, schema_of_json
from pyspark.sql.types import StructType, StringType, LongType, TimestampType
schema = StructType() \
.add("event_id", LongType()) \
.add("user_id", StringType()) \
.add("event_type", StringType()) \
.add("event_ts", TimestampType()) \
.add("payload", StringType())
parsed = kafka_stream.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Write to Iceberg
query = (
parsed.writeStream
.format("iceberg")
.outputMode("append")
.trigger(processingTime="60 seconds")
.option("path", "hive_catalog.iceberg_demo.events")
.option("checkpointLocation", "hdfs:///checkpoints/events-iceberg")
.start()
)
query.awaitTermination()
Iceberg's streaming sink is exactly-once when paired with Kafka at-least-once delivery and Iceberg's atomic commit model.
Schema Evolution with Spark
Iceberg supports full schema evolution — column adds, renames, drops, reorders, and type promotions — without rewriting existing data files.
# Add a column
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
ADD COLUMN session_id STRING AFTER user_id
""")
# Rename a column
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
RENAME COLUMN payload TO raw_payload
""")
# Drop a column
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
DROP COLUMN raw_payload
""")
# Type promotion (widening only — e.g., INT → BIGINT, FLOAT → DOUBLE)
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
ALTER COLUMN event_id TYPE BIGINT
""")
Schema evolution is tracked in Iceberg metadata. Existing data files are not rewritten — the reader applies projection and type coercion at read time.
Time Travel Queries
Iceberg maintains a full history of snapshots. You can query any historical state using VERSION AS OF (snapshot ID) or TIMESTAMP AS OF.
List snapshots
spark.sql("""
SELECT snapshot_id, committed_at, operation, summary
FROM hive_catalog.iceberg_demo.events.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)
Query by snapshot ID
# VERSION AS OF <snapshot_id>
spark.sql("""
SELECT COUNT(*) FROM hive_catalog.iceberg_demo.events
VERSION AS OF 3874123456789012345
""").show()
Query by timestamp
# TIMESTAMP AS OF '<timestamp>'
spark.sql("""
SELECT * FROM hive_catalog.iceberg_demo.events
TIMESTAMP AS OF '2026-04-01 00:00:00'
WHERE event_type = 'click'
""").show()
Using DataFrameReader API
# Time travel via DataFrameReader
df = (
spark.read
.option("snapshot-id", "3874123456789012345")
.table("hive_catalog.iceberg_demo.events")
)
# Or by timestamp
df = (
spark.read
.option("as-of-timestamp", "1743465600000") # milliseconds since epoch
.table("hive_catalog.iceberg_demo.events")
)
Partitioning Strategies
Iceberg supports hidden partitioning — partition transforms are defined in the table spec and applied transparently. No partition column needs to appear in queries.
Common transforms
spark.sql("""
CREATE TABLE hive_catalog.iceberg_demo.events_partitioned (
event_id BIGINT,
event_ts TIMESTAMP,
user_id STRING,
region STRING,
event_type STRING
) USING iceberg
PARTITIONED BY (
days(event_ts), -- date-based partition from timestamp
bucket(16, user_id), -- hash buckets for high-cardinality columns
region -- direct column value
)
""")
Available transforms:
| Transform | Example | Use case |
|---|---|---|
identity(col) | PARTITIONED BY (region) | Low-cardinality string columns |
bucket(N, col) | PARTITIONED BY (bucket(16, user_id)) | High-cardinality columns, even distribution |
days(ts) | PARTITIONED BY (days(event_ts)) | Daily time-series data |
hours(ts) | PARTITIONED BY (hours(event_ts)) | Hourly partitions (high-volume streams) |
months(ts) | PARTITIONED BY (months(event_ts)) | Monthly aggregation tables |
years(ts) | PARTITIONED BY (years(event_ts)) | Low-volume historical tables |
truncate(N, col) | PARTITIONED BY (truncate(4, sku)) | Prefix truncation of strings |
Partition evolution
Unlike Hive, Iceberg allows changing the partition scheme of an existing table without rewriting data:
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events_partitioned
ADD PARTITION FIELD hours(event_ts)
""")
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events_partitioned
DROP PARTITION FIELD days(event_ts)
""")
Old data files retain their original partitioning; new writes use the updated scheme. Iceberg readers handle both transparently.
Iceberg + HWC Interaction Notes
When using ODP's Hive Warehouse Connector (HWC) alongside Iceberg:
- HWC is not required for Iceberg tables. HWC is designed for ACID Hive ORC tables. Iceberg tables with the Hive storage handler are accessed directly via the Spark Iceberg catalog.
- If you mix ACID Hive tables (read via HWC) and Iceberg tables in the same Spark job, the Iceberg tables should use
hive_catalog.*naming while ACID tables use the HWC API (spark.read.format("com.hortonworks.spark.sql.rules.extensions.hive")). - Spark jobs running on YARN in a Kerberos cluster must have a valid Kerberos ticket. The edge node
spark-submitcommand handles this automatically via the--principaland--keytabflags:
spark-submit \
--master yarn \
--deploy-mode cluster \
--principal spark@DEV01.HADOOP.CLEMLAB.COM \
--keytab /etc/security/keytabs/spark.service.keytab \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
my_iceberg_job.py