Voyage dans le temps et gestion des snapshots
Le modèle de snapshot d'Iceberg est l'un de ses différenciateurs fondamentaux par rapport aux tables Hive partitionnées traditionnelles. Chaque opération d'écriture produit un snapshot immuable, permettant les requêtes de voyage dans le temps, les rollbacks sécurisés et un historique des données auditable.
Le modèle de snapshot Iceberg
Chaque INSERT, INSERT OVERWRITE, MERGE, UPDATE ou DELETE sur une table Iceberg produit un nouveau snapshot. Un snapshot est une vue complète et cohérente de la table à un instant donné.
Chronologie :
snapshot 1001 (chargement initial) → snapshot 1002 (ajout quotidien) → snapshot 1003 (correction MERGE)
│ │ │
metadata.json metadata.json metadata.json
manifest-list-1.avro manifest-list-2.avro manifest-list-3.avro
│ │ │
manifest-1.avro manifest-2.avro manifest-3a.avro (nouveaux fichiers)
data-file-001.parquet data-file-002.parquet delete-file-001.parquet (eq. deletes)
Fichiers de métadonnées clés :
metadata.json: état de la table — snapshot courant, schéma, spécification de partition, historique des snapshots.- Liste de manifestes (
.avro) : pointeur vers tous les fichiers de manifestes d'un snapshot. - Fichier de manifeste (
.avro) : liste les fichiers de données et de suppression, avec des statistiques (valeurs min/max, comptages de nulls) pour l'élagage de partitions. - Fichiers de données (
.parquet) : données réelles de la table. - Fichiers de suppression (
.parquet) : suppressions au niveau des lignes (tables Iceberg v2).
Les snapshots sont peu coûteux — la création d'un nouveau snapshot n'écrit que de nouveaux fichiers de métadonnées pointant vers les mêmes fichiers de données sous-jacents dans la mesure du possible.
Syntaxe du voyage dans le temps
Hive (Beeline)
-- Par ID de snapshot
SELECT * FROM iceberg_demo.events
FOR SYSTEM_VERSION AS OF 3874123456789012345;
-- Par timestamp
SELECT * FROM iceberg_demo.events
FOR SYSTEM_TIME AS OF '2026-03-01 00:00:00';
Spark SQL
-- Par ID de snapshot (VERSION AS OF)
SELECT COUNT(*) FROM hive_catalog.iceberg_demo.events
VERSION AS OF 3874123456789012345;
-- Par timestamp (TIMESTAMP AS OF)
SELECT * FROM hive_catalog.iceberg_demo.events
TIMESTAMP AS OF '2026-03-01T00:00:00.000Z';
API PySpark DataFrameReader :
# Par ID de snapshot
df = spark.read.option("snapshot-id", 3874123456789012345) \
.table("hive_catalog.iceberg_demo.events")
# Par timestamp (millisecondes depuis l'époque Unix)
import datetime
ts = datetime.datetime(2026, 3, 1).timestamp() * 1000
df = spark.read.option("as-of-timestamp", int(ts)) \
.table("hive_catalog.iceberg_demo.events")
Trino
-- Par ID de snapshot
SELECT * FROM iceberg.iceberg_demo.events
FOR VERSION AS OF 3874123456789012345;
-- Par timestamp
SELECT * FROM iceberg.iceberg_demo.events
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-01 00:00:00 UTC';
Rollback vers un snapshot
Le rollback définit le snapshot courant de la table sur un snapshot plus ancien. Les nouvelles écritures après un rollback créent une nouvelle branche à partir de l'état du rollback ; le snapshot vers lequel on effectue le rollback n'est pas supprimé.
Spark (via les procédures Iceberg)
# Rollback vers un ID de snapshot spécifique
spark.sql("""
CALL hive_catalog.system.rollback_to_snapshot(
'iceberg_demo.events',
3874123456789012345
)
""")
# Rollback vers l'état à un timestamp spécifique
spark.sql("""
CALL hive_catalog.system.rollback_to_timestamp(
'iceberg_demo.events',
TIMESTAMP '2026-03-01 00:00:00'
)
""")
Hive
-- Hive : utiliser la syntaxe de procédure Iceberg via Hive
ALTER TABLE iceberg_demo.events EXECUTE ROLLBACK(3874123456789012345);
Le rollback modifie le current-snapshot-id de la table. Les lectures simultanées pendant un rollback peuvent voir une transition de snapshot incohérente. Planifiez les rollbacks pendant des fenêtres de maintenance pour les tables en production.
Expiration des snapshots (maintenance)
Par défaut, Iceberg conserve tous les snapshots indéfiniment. Les anciens snapshots doivent être explicitement expirés pour libérer du stockage. L'expiration d'un snapshot ne supprime pas les fichiers de données encore référencés par le snapshot courant.
Spark (recommandé)
# Expirer les snapshots plus anciens qu'une période de rétention
spark.sql("""
CALL hive_catalog.system.expire_snapshots(
table => 'iceberg_demo.events',
older_than => TIMESTAMP '2026-01-01 00:00:00',
retain_last => 5
)
""")
Paramètres :
older_than: expire les snapshots commités avant ce timestamp.retain_last: toujours conserver au minimum ce nombre de snapshots récents, même s'ils sont plus anciens queolder_than.
Hive
ALTER TABLE iceberg_demo.events
EXECUTE expire_snapshots('2026-01-01 00:00:00');
Planification de l'expiration
Mettez en place un workflow Oozie récurrent ou un job Spark déclenché par cron sur le nœud edge :
# Job d'expiration hebdomadaire des snapshots (exécuté sous le compte de service spark)
spark-submit \
--master yarn \
--deploy-mode cluster \
--principal spark@DEV01.HADOOP.CLEMLAB.COM \
--keytab /etc/security/keytabs/spark.service.keytab \
--class org.apache.iceberg.spark.actions.ExpireSnapshotsAction \
iceberg-maintenance.jar \
--table hive_catalog.iceberg_demo.events \
--older-than 30d \
--retain-last 10
Tables de métadonnées
Iceberg expose des tables virtuelles pour inspecter les internals des tables. Celles-ci sont disponibles dans Spark, Hive et Trino.
Snapshots
SELECT snapshot_id, committed_at, operation, summary
FROM hive_catalog.iceberg_demo.events.snapshots
ORDER BY committed_at DESC;
-- Exemple de sortie :
-- snapshot_id committed_at operation summary
-- 3874123456789012345 2026-04-08 09:00:00.000 append {"added-data-files":"12","added-records":"500000"}
-- 3874123456789012344 2026-04-07 09:00:00.000 append {"added-data-files":"10","added-records":"420000"}
Historique
SELECT *
FROM hive_catalog.iceberg_demo.events.history;
-- Affiche : made_current_at, snapshot_id, parent_id, is_current_ancestor
Manifestes
SELECT path, length, partition_spec_id, added_snapshot_id,
added_data_files_count, existing_data_files_count, deleted_data_files_count
FROM hive_catalog.iceberg_demo.events.manifests;
Fichiers de données
SELECT content, file_path, file_format, record_count, file_size_in_bytes,
column_sizes, value_counts, null_value_counts
FROM hive_catalog.iceberg_demo.events.data_files
LIMIT 20;
Partitions
SELECT partition, record_count, file_count, total_data_file_size_in_bytes,
spec_id
FROM hive_catalog.iceberg_demo.events.partitions
ORDER BY record_count DESC;
Compaction (rewrite_data_files)
Les petits fichiers s'accumulent au fil du temps, notamment lors de l'ingestion en streaming. La compaction réécrit les petits fichiers de données en fichiers plus grands, améliorant les performances de lecture.
Compaction Spark
# Compaction de base — réécrit les petits fichiers dans toutes les partitions
spark.sql("""
CALL hive_catalog.system.rewrite_data_files('iceberg_demo.events')
""")
# Compaction ciblée avec options
spark.sql("""
CALL hive_catalog.system.rewrite_data_files(
table => 'iceberg_demo.events',
strategy => 'sort',
sort_order => 'zorder(user_id, event_type)',
options => map(
'target-file-size-bytes', '268435456',
'min-file-size-bytes', '67108864',
'max-concurrent-file-group-rewrites', '10'
)
)
""")
Stratégies de compaction :
binpack(par défaut) : regroupe les fichiers par partition, réécrit les fichiers en dessous du seuil de taille minimale.sort: trie les données dans chaque groupe de fichiers selon l'ordre de tri spécifié (réduit les E/S de scan pour les requêtes triées). Le tri Z-order trie simultanément sur plusieurs colonnes.
Réécriture des manifestes
Après une compaction intensive ou de nombreuses petites écritures, les fichiers de manifestes peuvent également se fragmenter :
spark.sql("""
CALL hive_catalog.system.rewrite_manifests('iceberg_demo.events')
""")
Cas d'usage
Débogage de problèmes dans les pipelines de données
Un job de pipeline a écrit des données incorrectes ? Effectuez un rollback vers le snapshot précédant le job et investiguez :
# Trouver le snapshot juste avant l'exécution du mauvais job
spark.sql("""
SELECT snapshot_id, committed_at, summary
FROM hive_catalog.iceberg_demo.events.snapshots
WHERE committed_at < '2026-04-08 08:00:00'
ORDER BY committed_at DESC
LIMIT 1
""").show(truncate=False)
# Comparer le nombre d'enregistrements
spark.sql("""
SELECT COUNT(*) FROM hive_catalog.iceberg_demo.events
VERSION AS OF <good-snapshot-id>
""").show()
spark.sql("SELECT COUNT(*) FROM hive_catalog.iceberg_demo.events").show()
Audit et conformité
Produire une copie exacte d'une table telle qu'elle existait à une date de reporting réglementaire :
# Extraire les données telles qu'elles étaient à la date de reporting
reporting_date_df = (
spark.read
.option("as-of-timestamp", "1743465600000") # 2026-04-01 00:00:00 UTC
.table("hive_catalog.finance.transactions")
)
# Écrire dans une table d'archive d'audit
reporting_date_df.writeTo("hive_catalog.audit.transactions_2026_q1").create()
Reproductibilité ML — ré-entraînement sur des données historiques
Les pipelines d'apprentissage automatique nécessitent des jeux d'entraînement reproductibles. Utilisez les IDs de snapshot pour épingler la version exacte du jeu de données utilisée lors d'un entraînement de modèle :
# Au moment de l'entraînement : enregistrer l'ID du snapshot
training_snapshot = spark.sql("""
SELECT snapshot_id FROM hive_catalog.iceberg_demo.features.snapshots
ORDER BY committed_at DESC LIMIT 1
""").first()["snapshot_id"]
# Journaliser snapshot_id avec le modèle (MLflow, métadonnées personnalisées, etc.)
mlflow.log_param("training_data_snapshot_id", training_snapshot)
# Au moment du ré-entraînement ou de l'audit : reproduire le jeu de données exact
training_df = (
spark.read
.option("snapshot-id", training_snapshot)
.table("hive_catalog.iceberg_demo.features")
)
Cela rend le jeu d'entraînement reproductible indépendamment des modifications de données en aval, garantissant que la validation du modèle et le débogage utilisent les mêmes données que celles sur lesquelles le modèle a été entraîné.