Skip to main content
Version: 1.3.1.0

Iceberg with Apache Spark

Apache Spark 3.5.6 is the primary batch and streaming engine for Iceberg in ODP. This page covers configuration, read/write patterns, streaming, schema evolution, time travel, and partitioning strategies.

Spark-Iceberg Configuration in ODP

ODP preconfigures the Spark Iceberg catalog on edge and gateway nodes. The Hive Metastore-backed catalog is the standard catalog in ODP.

spark-defaults.conf (pre-configured in ODP)

The following settings are applied cluster-wide through Ambari (Spark2 → Configs → Custom spark-defaults):

# Iceberg extension
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

# Hive-backed catalog (default catalog name: spark_catalog)
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type=hive

# Additional named catalog pointing to Hive Metastore
spark.sql.catalog.hive_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_catalog.type=hive
spark.sql.catalog.hive_catalog.uri=thrift://master02.dev01.hadoop.clemlab.com:9083
spark.sql.catalog.hive_catalog.warehouse=hdfs:///warehouse/iceberg

When using a Polaris REST catalog (available in ODP 1.3.2.0 as tech preview):

spark.sql.catalog.polaris=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.polaris.catalog-impl=org.apache.iceberg.rest.RESTCatalog
spark.sql.catalog.polaris.uri=http://master03.dev01.hadoop.clemlab.com:8181/api/catalog
spark.sql.catalog.polaris.warehouse=odp_catalog

Reading and Writing Iceberg Tables from Spark

Create an Iceberg table

# PySpark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("iceberg-demo").getOrCreate()

spark.sql("""
CREATE TABLE IF NOT EXISTS hive_catalog.iceberg_demo.events (
event_id BIGINT,
event_ts TIMESTAMP,
user_id STRING,
event_type STRING,
payload STRING
) USING iceberg
PARTITIONED BY (days(event_ts))
TBLPROPERTIES (
'write.format.default'='parquet',
'write.parquet.compression-codec'='zstd',
'write.metadata.compression-codec'='gzip'
)
""")

Write data

from pyspark.sql.functions import current_timestamp

df = spark.createDataFrame([
(1, "user_a", "click", '{"page": "/home"}'),
(2, "user_b", "view", '{"page": "/about"}'),
], ["event_id", "user_id", "event_type", "payload"])

df = df.withColumn("event_ts", current_timestamp())

df.writeTo("hive_catalog.iceberg_demo.events").append()

Read data

df = spark.read.table("hive_catalog.iceberg_demo.events")
df.show()

# Or with Spark SQL
spark.sql("SELECT * FROM hive_catalog.iceberg_demo.events WHERE event_type = 'click'").show()

Upsert (MERGE INTO)

spark.sql("""
MERGE INTO hive_catalog.iceberg_demo.events AS target
USING (SELECT * FROM staging_events) AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Spark Structured Streaming to Iceberg

Iceberg supports Spark Structured Streaming sinks. This is the recommended pattern for landing Kafka data into Iceberg tables in ODP.

# Read from Kafka
kafka_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "worker01.dev01.hadoop.clemlab.com:9092,"
"worker02.dev01.hadoop.clemlab.com:9092,"
"worker03.dev01.hadoop.clemlab.com:9092")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("subscribe", "events-topic")
.option("startingOffsets", "latest")
.load()
)

# Parse and transform
from pyspark.sql.functions import col, from_json, schema_of_json
from pyspark.sql.types import StructType, StringType, LongType, TimestampType

schema = StructType() \
.add("event_id", LongType()) \
.add("user_id", StringType()) \
.add("event_type", StringType()) \
.add("event_ts", TimestampType()) \
.add("payload", StringType())

parsed = kafka_stream.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# Write to Iceberg
query = (
parsed.writeStream
.format("iceberg")
.outputMode("append")
.trigger(processingTime="60 seconds")
.option("path", "hive_catalog.iceberg_demo.events")
.option("checkpointLocation", "hdfs:///checkpoints/events-iceberg")
.start()
)

query.awaitTermination()

Iceberg's streaming sink is exactly-once when paired with Kafka at-least-once delivery and Iceberg's atomic commit model.


Schema Evolution with Spark

Iceberg supports full schema evolution — column adds, renames, drops, reorders, and type promotions — without rewriting existing data files.

# Add a column
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
ADD COLUMN session_id STRING AFTER user_id
""")

# Rename a column
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
RENAME COLUMN payload TO raw_payload
""")

# Drop a column
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
DROP COLUMN raw_payload
""")

# Type promotion (widening only — e.g., INT → BIGINT, FLOAT → DOUBLE)
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
ALTER COLUMN event_id TYPE BIGINT
""")

Schema evolution is tracked in Iceberg metadata. Existing data files are not rewritten — the reader applies projection and type coercion at read time.


Time Travel Queries

Iceberg maintains a full history of snapshots. You can query any historical state using VERSION AS OF (snapshot ID) or TIMESTAMP AS OF.

List snapshots

spark.sql("""
SELECT snapshot_id, committed_at, operation, summary
FROM hive_catalog.iceberg_demo.events.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)

Query by snapshot ID

# VERSION AS OF <snapshot_id>
spark.sql("""
SELECT COUNT(*) FROM hive_catalog.iceberg_demo.events
VERSION AS OF 3874123456789012345
""").show()

Query by timestamp

# TIMESTAMP AS OF '<timestamp>'
spark.sql("""
SELECT * FROM hive_catalog.iceberg_demo.events
TIMESTAMP AS OF '2026-04-01 00:00:00'
WHERE event_type = 'click'
""").show()

Using DataFrameReader API

# Time travel via DataFrameReader
df = (
spark.read
.option("snapshot-id", "3874123456789012345")
.table("hive_catalog.iceberg_demo.events")
)

# Or by timestamp
df = (
spark.read
.option("as-of-timestamp", "1743465600000") # milliseconds since epoch
.table("hive_catalog.iceberg_demo.events")
)

Partitioning Strategies

Iceberg supports hidden partitioning — partition transforms are defined in the table spec and applied transparently. No partition column needs to appear in queries.

Common transforms

spark.sql("""
CREATE TABLE hive_catalog.iceberg_demo.events_partitioned (
event_id BIGINT,
event_ts TIMESTAMP,
user_id STRING,
region STRING,
event_type STRING
) USING iceberg
PARTITIONED BY (
days(event_ts), -- date-based partition from timestamp
bucket(16, user_id), -- hash buckets for high-cardinality columns
region -- direct column value
)
""")

Available transforms:

TransformExampleUse case
identity(col)PARTITIONED BY (region)Low-cardinality string columns
bucket(N, col)PARTITIONED BY (bucket(16, user_id))High-cardinality columns, even distribution
days(ts)PARTITIONED BY (days(event_ts))Daily time-series data
hours(ts)PARTITIONED BY (hours(event_ts))Hourly partitions (high-volume streams)
months(ts)PARTITIONED BY (months(event_ts))Monthly aggregation tables
years(ts)PARTITIONED BY (years(event_ts))Low-volume historical tables
truncate(N, col)PARTITIONED BY (truncate(4, sku))Prefix truncation of strings

Partition evolution

Unlike Hive, Iceberg allows changing the partition scheme of an existing table without rewriting data:

spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events_partitioned
ADD PARTITION FIELD hours(event_ts)
""")

spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events_partitioned
DROP PARTITION FIELD days(event_ts)
""")

Old data files retain their original partitioning; new writes use the updated scheme. Iceberg readers handle both transparently.


Iceberg + HWC Interaction Notes

When using ODP's Hive Warehouse Connector (HWC) alongside Iceberg:

  • HWC is not required for Iceberg tables. HWC is designed for ACID Hive ORC tables. Iceberg tables with the Hive storage handler are accessed directly via the Spark Iceberg catalog.
  • If you mix ACID Hive tables (read via HWC) and Iceberg tables in the same Spark job, the Iceberg tables should use hive_catalog.* naming while ACID tables use the HWC API (spark.read.format("com.hortonworks.spark.sql.rules.extensions.hive")).
  • Spark jobs running on YARN in a Kerberos cluster must have a valid Kerberos ticket. The edge node spark-submit command handles this automatically via the --principal and --keytab flags:
spark-submit \
--master yarn \
--deploy-mode cluster \
--principal spark@DEV01.HADOOP.CLEMLAB.COM \
--keytab /etc/security/keytabs/spark.service.keytab \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
my_iceberg_job.py