Skip to main content
Version: 1.3.1.0

Time Travel and Snapshot Management

Iceberg's snapshot model is one of its core differentiators from traditional Hive partitioned tables. Every write operation produces an immutable snapshot, enabling time travel queries, safe rollbacks, and auditable data history.

The Iceberg Snapshot Model

Every INSERT, INSERT OVERWRITE, MERGE, UPDATE, or DELETE against an Iceberg table produces a new snapshot. A snapshot is a complete, consistent view of the table at a point in time.

Timeline:
snapshot 1001 (initial load) → snapshot 1002 (daily append) → snapshot 1003 (correction MERGE)
│ │ │
metadata.json metadata.json metadata.json
manifest-list-1.avro manifest-list-2.avro manifest-list-3.avro
│ │ │
manifest-1.avro manifest-2.avro manifest-3a.avro (new files)
data-file-001.parquet data-file-002.parquet delete-file-001.parquet (eq. deletes)

Key metadata files:

  • metadata.json: Table state — current snapshot, schema, partition spec, snapshot history.
  • Manifest list (.avro): Pointer to all manifest files for a snapshot.
  • Manifest file (.avro): Lists data files and delete files, with statistics (min/max values, null counts) for partition pruning.
  • Data files (.parquet): Actual table data.
  • Delete files (.parquet): Row-level deletes (Iceberg v2 tables).

Snapshots are cheap — creating a new snapshot only writes new metadata files pointing to the same underlying data files where possible.


Time Travel Syntax

Hive (Beeline)

-- By snapshot ID
SELECT * FROM iceberg_demo.events
FOR SYSTEM_VERSION AS OF 3874123456789012345;

-- By timestamp
SELECT * FROM iceberg_demo.events
FOR SYSTEM_TIME AS OF '2026-03-01 00:00:00';

Spark SQL

-- By snapshot ID (VERSION AS OF)
SELECT COUNT(*) FROM hive_catalog.iceberg_demo.events
VERSION AS OF 3874123456789012345;

-- By timestamp (TIMESTAMP AS OF)
SELECT * FROM hive_catalog.iceberg_demo.events
TIMESTAMP AS OF '2026-03-01T00:00:00.000Z';

PySpark DataFrameReader API:

# By snapshot ID
df = spark.read.option("snapshot-id", 3874123456789012345) \
.table("hive_catalog.iceberg_demo.events")

# By timestamp (milliseconds since Unix epoch)
import datetime
ts = datetime.datetime(2026, 3, 1).timestamp() * 1000
df = spark.read.option("as-of-timestamp", int(ts)) \
.table("hive_catalog.iceberg_demo.events")

Trino

-- By snapshot ID
SELECT * FROM iceberg.iceberg_demo.events
FOR VERSION AS OF 3874123456789012345;

-- By timestamp
SELECT * FROM iceberg.iceberg_demo.events
FOR TIMESTAMP AS OF TIMESTAMP '2026-03-01 00:00:00 UTC';

Rollback to Snapshot

Rolling back sets the table's current snapshot to an older one. New writes after rollback create a new branch from the rolled-back state; the rolled-back-to snapshot is not deleted.

Spark (via Iceberg procedures)

# Roll back to a specific snapshot ID
spark.sql("""
CALL hive_catalog.system.rollback_to_snapshot(
'iceberg_demo.events',
3874123456789012345
)
""")

# Roll back to the state at a specific timestamp
spark.sql("""
CALL hive_catalog.system.rollback_to_timestamp(
'iceberg_demo.events',
TIMESTAMP '2026-03-01 00:00:00'
)
""")

Hive

-- Hive: use the Iceberg procedure syntax via Hive
ALTER TABLE iceberg_demo.events EXECUTE ROLLBACK(3874123456789012345);
caution

Rollback modifies the table's current-snapshot-id. Running concurrent reads during a rollback may see an inconsistent snapshot transition. Schedule rollbacks during maintenance windows for production tables.


Expire Snapshots (Maintenance)

By default, Iceberg retains all snapshots indefinitely. Old snapshots must be explicitly expired to reclaim storage. Expiring a snapshot does not delete data files that are still referenced by the current snapshot.

# Expire snapshots older than a retention period
spark.sql("""
CALL hive_catalog.system.expire_snapshots(
table => 'iceberg_demo.events',
older_than => TIMESTAMP '2026-01-01 00:00:00',
retain_last => 5
)
""")

Parameters:

  • older_than: Expire snapshots committed before this timestamp.
  • retain_last: Always keep at least this many recent snapshots, even if they are older than older_than.

Hive

ALTER TABLE iceberg_demo.events
EXECUTE expire_snapshots('2026-01-01 00:00:00');

Scheduling expiration

Set up a recurring Oozie workflow or a cron-triggered Spark job on the edge node:

# Weekly snapshot expiration job (run as spark service account)
spark-submit \
--master yarn \
--deploy-mode cluster \
--principal spark@DEV01.HADOOP.CLEMLAB.COM \
--keytab /etc/security/keytabs/spark.service.keytab \
--class org.apache.iceberg.spark.actions.ExpireSnapshotsAction \
iceberg-maintenance.jar \
--table hive_catalog.iceberg_demo.events \
--older-than 30d \
--retain-last 10

Metadata Tables

Iceberg exposes virtual tables for inspecting table internals. These are available in Spark, Hive, and Trino.

Snapshots

SELECT snapshot_id, committed_at, operation, summary
FROM hive_catalog.iceberg_demo.events.snapshots
ORDER BY committed_at DESC;

-- Example output:
-- snapshot_id committed_at operation summary
-- 3874123456789012345 2026-04-08 09:00:00.000 append {"added-data-files":"12","added-records":"500000"}
-- 3874123456789012344 2026-04-07 09:00:00.000 append {"added-data-files":"10","added-records":"420000"}

History

SELECT *
FROM hive_catalog.iceberg_demo.events.history;
-- Shows: made_current_at, snapshot_id, parent_id, is_current_ancestor

Manifests

SELECT path, length, partition_spec_id, added_snapshot_id,
added_data_files_count, existing_data_files_count, deleted_data_files_count
FROM hive_catalog.iceberg_demo.events.manifests;

Data Files

SELECT content, file_path, file_format, record_count, file_size_in_bytes,
column_sizes, value_counts, null_value_counts
FROM hive_catalog.iceberg_demo.events.data_files
LIMIT 20;

Partitions

SELECT partition, record_count, file_count, total_data_file_size_in_bytes,
spec_id
FROM hive_catalog.iceberg_demo.events.partitions
ORDER BY record_count DESC;

Compaction (rewrite_data_files)

Small files accumulate over time, especially from streaming ingestion. Compaction rewrites small data files into larger ones, improving read performance.

Spark compaction

# Basic compaction — rewrites small files in all partitions
spark.sql("""
CALL hive_catalog.system.rewrite_data_files('iceberg_demo.events')
""")

# Targeted compaction with options
spark.sql("""
CALL hive_catalog.system.rewrite_data_files(
table => 'iceberg_demo.events',
strategy => 'sort',
sort_order => 'zorder(user_id, event_type)',
options => map(
'target-file-size-bytes', '268435456',
'min-file-size-bytes', '67108864',
'max-concurrent-file-group-rewrites', '10'
)
)
""")

Compaction strategies:

  • binpack (default): Groups files by partition, rewrites files below the minimum size threshold.
  • sort: Sorts data within each file group by the specified sort order (reduces scan I/O for sorted queries). Z-order sorts across multiple columns simultaneously.

Rewrite manifests

After heavy compaction or many small writes, manifest files can also fragment:

spark.sql("""
CALL hive_catalog.system.rewrite_manifests('iceberg_demo.events')
""")

Use Cases

Debugging data pipeline issues

A pipeline job wrote incorrect data? Roll back to the pre-job snapshot and investigate:

# Find the snapshot just before the bad job ran
spark.sql("""
SELECT snapshot_id, committed_at, summary
FROM hive_catalog.iceberg_demo.events.snapshots
WHERE committed_at < '2026-04-08 08:00:00'
ORDER BY committed_at DESC
LIMIT 1
""").show(truncate=False)

# Compare record counts
spark.sql("""
SELECT COUNT(*) FROM hive_catalog.iceberg_demo.events
VERSION AS OF <good-snapshot-id>
""").show()

spark.sql("SELECT COUNT(*) FROM hive_catalog.iceberg_demo.events").show()

Audit and compliance

Produce an exact copy of a table as it existed on a regulatory reporting date:

# Extract data as it was on the reporting date
reporting_date_df = (
spark.read
.option("as-of-timestamp", "1743465600000") # 2026-04-01 00:00:00 UTC
.table("hive_catalog.finance.transactions")
)

# Write to an audit archive table
reporting_date_df.writeTo("hive_catalog.audit.transactions_2026_q1").create()

ML reproducibility — retraining on historical data

Machine learning pipelines need reproducible training datasets. Use snapshot IDs to pin the exact dataset version used for a model training run:

# At training time: record the snapshot ID
training_snapshot = spark.sql("""
SELECT snapshot_id FROM hive_catalog.iceberg_demo.features.snapshots
ORDER BY committed_at DESC LIMIT 1
""").first()["snapshot_id"]

# Log snapshot_id with the model (MLflow, custom metadata, etc.)
mlflow.log_param("training_data_snapshot_id", training_snapshot)

# At retraining or audit time: reproduce the exact dataset
training_df = (
spark.read
.option("snapshot-id", training_snapshot)
.table("hive_catalog.iceberg_demo.features")
)

This makes the training dataset reproducible regardless of downstream data changes, ensuring that model validation and debugging use the same data the model was trained on.