Iceberg avec Apache Spark
Apache Spark 3.5.6 est le moteur principal de traitement batch et streaming pour Iceberg dans ODP. Cette page couvre la configuration, les patterns de lecture/écriture, le streaming, l'évolution de schéma, le voyage dans le temps et les stratégies de partitionnement.
Configuration Spark-Iceberg dans ODP
ODP préconfigure le catalogue Iceberg Spark sur les nœuds edge et gateway. Le catalogue adossé au Hive Metastore est le catalogue standard dans ODP.
spark-defaults.conf (préconfiguré dans ODP)
Les paramètres suivants sont appliqués à l'ensemble du cluster via Ambari (Spark2 → Configs → Custom spark-defaults) :
# Extension Iceberg
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
# Catalogue adossé à Hive (nom de catalogue par défaut : spark_catalog)
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type=hive
# Catalogue nommé supplémentaire pointant vers le Hive Metastore
spark.sql.catalog.hive_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_catalog.type=hive
spark.sql.catalog.hive_catalog.uri=thrift://master02.dev01.hadoop.clemlab.com:9083
spark.sql.catalog.hive_catalog.warehouse=hdfs:///warehouse/iceberg
Lors de l'utilisation d'un catalogue Polaris REST (disponible en aperçu technique dans ODP 1.3.2.0) :
spark.sql.catalog.polaris=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.polaris.catalog-impl=org.apache.iceberg.rest.RESTCatalog
spark.sql.catalog.polaris.uri=http://master03.dev01.hadoop.clemlab.com:8181/api/catalog
spark.sql.catalog.polaris.warehouse=odp_catalog
Lecture et écriture de tables Iceberg depuis Spark
Créer une table Iceberg
# PySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("iceberg-demo").getOrCreate()
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_catalog.iceberg_demo.events (
event_id BIGINT,
event_ts TIMESTAMP,
user_id STRING,
event_type STRING,
payload STRING
) USING iceberg
PARTITIONED BY (days(event_ts))
TBLPROPERTIES (
'write.format.default'='parquet',
'write.parquet.compression-codec'='zstd',
'write.metadata.compression-codec'='gzip'
)
""")
Écrire des données
from pyspark.sql.functions import current_timestamp
df = spark.createDataFrame([
(1, "user_a", "click", '{"page": "/home"}'),
(2, "user_b", "view", '{"page": "/about"}'),
], ["event_id", "user_id", "event_type", "payload"])
df = df.withColumn("event_ts", current_timestamp())
df.writeTo("hive_catalog.iceberg_demo.events").append()
Lire des données
df = spark.read.table("hive_catalog.iceberg_demo.events")
df.show()
# Ou avec Spark SQL
spark.sql("SELECT * FROM hive_catalog.iceberg_demo.events WHERE event_type = 'click'").show()
Upsert (MERGE INTO)
spark.sql("""
MERGE INTO hive_catalog.iceberg_demo.events AS target
USING (SELECT * FROM staging_events) AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Spark Structured Streaming vers Iceberg
Iceberg prend en charge les sinks Spark Structured Streaming. Il s'agit du pattern recommandé pour déposer des données Kafka dans des tables Iceberg dans ODP.
# Lecture depuis Kafka
kafka_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "worker01.dev01.hadoop.clemlab.com:9092,"
"worker02.dev01.hadoop.clemlab.com:9092,"
"worker03.dev01.hadoop.clemlab.com:9092")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("subscribe", "events-topic")
.option("startingOffsets", "latest")
.load()
)
# Parsing et transformation
from pyspark.sql.functions import col, from_json, schema_of_json
from pyspark.sql.types import StructType, StringType, LongType, TimestampType
schema = StructType() \
.add("event_id", LongType()) \
.add("user_id", StringType()) \
.add("event_type", StringType()) \
.add("event_ts", TimestampType()) \
.add("payload", StringType())
parsed = kafka_stream.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Écriture vers Iceberg
query = (
parsed.writeStream
.format("iceberg")
.outputMode("append")
.trigger(processingTime="60 seconds")
.option("path", "hive_catalog.iceberg_demo.events")
.option("checkpointLocation", "hdfs:///checkpoints/events-iceberg")
.start()
)
query.awaitTermination()
Le sink streaming d'Iceberg garantit exactement-une-fois lorsqu'il est associé à la livraison Kafka au-moins-une-fois et au modèle de commit atomique d'Iceberg.
Évolution de schéma avec Spark
Iceberg prend en charge l'évolution complète du schéma — ajouts, renommages, suppressions, réordonnements de colonnes et promotions de types — sans réécriture des fichiers de données existants.
# Ajouter une colonne
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
ADD COLUMN session_id STRING AFTER user_id
""")
# Renommer une colonne
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
RENAME COLUMN payload TO raw_payload
""")
# Supprimer une colonne
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
DROP COLUMN raw_payload
""")
# Promotion de type (élargissement uniquement — ex. INT → BIGINT, FLOAT → DOUBLE)
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events
ALTER COLUMN event_id TYPE BIGINT
""")
L'évolution du schéma est tracée dans les métadonnées Iceberg. Les fichiers de données existants ne sont pas réécrits — le lecteur applique la projection et la coercition de type au moment de la lecture.
Requêtes de voyage dans le temps
Iceberg conserve un historique complet des snapshots. Vous pouvez interroger n'importe quel état historique en utilisant VERSION AS OF (ID de snapshot) ou TIMESTAMP AS OF.
Lister les snapshots
spark.sql("""
SELECT snapshot_id, committed_at, operation, summary
FROM hive_catalog.iceberg_demo.events.snapshots
ORDER BY committed_at DESC
""").show(truncate=False)
Requête par ID de snapshot
# VERSION AS OF <snapshot_id>
spark.sql("""
SELECT COUNT(*) FROM hive_catalog.iceberg_demo.events
VERSION AS OF 3874123456789012345
""").show()
Requête par timestamp
# TIMESTAMP AS OF '<timestamp>'
spark.sql("""
SELECT * FROM hive_catalog.iceberg_demo.events
TIMESTAMP AS OF '2026-04-01 00:00:00'
WHERE event_type = 'click'
""").show()
Utilisation de l'API DataFrameReader
# Voyage dans le temps via DataFrameReader
df = (
spark.read
.option("snapshot-id", "3874123456789012345")
.table("hive_catalog.iceberg_demo.events")
)
# Ou par timestamp
df = (
spark.read
.option("as-of-timestamp", "1743465600000") # millisecondes depuis l'époque Unix
.table("hive_catalog.iceberg_demo.events")
)
Stratégies de partitionnement
Iceberg prend en charge le partitionnement caché — les transformations de partition sont définies dans la spécification de table et appliquées de façon transparente. Aucune colonne de partition n'a besoin d'apparaître dans les requêtes.
Transformations courantes
spark.sql("""
CREATE TABLE hive_catalog.iceberg_demo.events_partitioned (
event_id BIGINT,
event_ts TIMESTAMP,
user_id STRING,
region STRING,
event_type STRING
) USING iceberg
PARTITIONED BY (
days(event_ts), -- partition basée sur la date à partir du timestamp
bucket(16, user_id), -- buckets de hachage pour les colonnes à haute cardinalité
region -- valeur de colonne directe
)
""")
Transformations disponibles :
| Transformation | Exemple | Cas d'usage |
|---|---|---|
identity(col) | PARTITIONED BY (region) | Colonnes chaîne à faible cardinalité |
bucket(N, col) | PARTITIONED BY (bucket(16, user_id)) | Colonnes à haute cardinalité, distribution uniforme |
days(ts) | PARTITIONED BY (days(event_ts)) | Données de série temporelle quotidienne |
hours(ts) | PARTITIONED BY (hours(event_ts)) | Partitions horaires (flux à grand volume) |
months(ts) | PARTITIONED BY (months(event_ts)) | Tables d'agrégation mensuelle |
years(ts) | PARTITIONED BY (years(event_ts)) | Tables historiques à faible volume |
truncate(N, col) | PARTITIONED BY (truncate(4, sku)) | Troncature de préfixe de chaînes |
Évolution de partition
Contrairement à Hive, Iceberg permet de modifier le schéma de partition d'une table existante sans réécrire les données :
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events_partitioned
ADD PARTITION FIELD hours(event_ts)
""")
spark.sql("""
ALTER TABLE hive_catalog.iceberg_demo.events_partitioned
DROP PARTITION FIELD days(event_ts)
""")
Les anciens fichiers de données conservent leur partitionnement d'origine ; les nouvelles écritures utilisent le schéma mis à jour. Les lecteurs Iceberg gèrent les deux de manière transparente.
Notes d'interaction Iceberg + HWC
Lors de l'utilisation du Hive Warehouse Connector (HWC) d'ODP conjointement avec Iceberg :
- HWC n'est pas requis pour les tables Iceberg. HWC est conçu pour les tables Hive ACID ORC. Les tables Iceberg avec le storage handler Hive sont accédées directement via le catalogue Iceberg Spark.
- Si vous mélangez des tables Hive ACID (lues via HWC) et des tables Iceberg dans le même job Spark, les tables Iceberg doivent utiliser la dénomination
hive_catalog.*tandis que les tables ACID utilisent l'API HWC (spark.read.format("com.hortonworks.spark.sql.rules.extensions.hive")). - Les jobs Spark s'exécutant sur YARN dans un cluster Kerberos doivent disposer d'un ticket Kerberos valide. La commande
spark-submitsur le nœud edge gère cela automatiquement via les drapeaux--principalet--keytab:
spark-submit \
--master yarn \
--deploy-mode cluster \
--principal spark@DEV01.HADOOP.CLEMLAB.COM \
--keytab /etc/security/keytabs/spark.service.keytab \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
my_iceberg_job.py