Skip to content

Support replace_table and replace_table_transaction#3220

Draft
smaheshwar-pltr wants to merge 25 commits into
apache:mainfrom
smaheshwar-pltr:sm/replace-table
Draft

Support replace_table and replace_table_transaction#3220
smaheshwar-pltr wants to merge 25 commits into
apache:mainfrom
smaheshwar-pltr:sm/replace-table

Conversation

@smaheshwar-pltr
Copy link
Copy Markdown
Contributor

@smaheshwar-pltr smaheshwar-pltr commented Apr 7, 2026

Closes #281.

Rationale for this change

  • Adds replace_table and replace_table_transaction on Catalog. The former delegates to the latter, with the latter being abstract.
  • See REPLACE TABLE Support #281 (comment) in particular for design motivation
  • Largely inspired by the reference (Java) implementation throughout (I've gotten an LLM to drop review comments referencing relevant places in the Iceberg Java codebase)
  • Implementation on MetastoreCatalog and on RestCatalog, with a shared Catalog._replace_staged_table helper that builds the fresh metadata

Further notes:

  • Field IDs are reused by name from the current schema
  • Schema, partition-spec, and sort-order IDs are reused from history when identical to a previous entry — see Support for REPLACE TABLE operation #433 (comment)
  • Partition field IDs are reused by (source_id, transform) across all specs (v2+). On v1 (append-only specs), removed fields are carried forward as VoidTransform.
  • properties={"format-version": "2"} upgrades; downgrade is rejected.
  • SetCurrentSchema / SetDefaultPartitionSpec / SetDefaultSortOrder are always emitted, even when the resulting id is reused.
  • Commit requirements: AssertTableUUID, AssertLastAssignedFieldId, AssertLastAssignedPartitionId.
  • Properties are merged onto existing (new values override; untouched keys preserved).

Are these changes tested?

Yes, with both unit and integration tests:

  • tests/catalog/test_catalog_behaviors.py
  • tests/catalog/test_rest.py
  • tests/integration/test_catalog.py
  • tests/test_schema.py, tests/table/test_partitioning.py

Are there any user-facing changes?

  • New public API on Catalog + abstract method
  • Docs!

@smaheshwar-pltr smaheshwar-pltr force-pushed the sm/replace-table branch 3 times, most recently from c798d18 to 4361d29 Compare April 19, 2026 08:07
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
- Move replace_table_transaction implementations to MetastoreCatalog and
  RestCatalog; keep stub + shared _replace_staged_table helper on Catalog
  so out-of-tree subclasses don't break.
- Emit UpgradeFormatVersionUpdate when properties bump format-version,
  matching Java's TableMetadata.buildReplacement.
- Always emit SetCurrentSchema / SetDefaultPartitionSpec /
  SetDefaultSortOrder, mirroring RESTSessionCatalog.replaceTransaction.
- Handle v1 partition specs by carrying forward removed fields as
  VoidTransform (v1 specs are append-only).
- Reject view collisions before replacing.
- Assign fresh schema_id / spec_id / order_id on Add* updates so
  AddSchemaUpdate / AddPartitionSpecUpdate / AddSortOrderUpdate produce
  uniquely-keyed entries.
- Tests: behaviour now covered in tests/catalog/test_catalog_behaviors.py
  parametrized across InMemoryCatalog + SqlCatalog. test_rest.py keeps
  only REST wire-specific cases (payload shape, 404, view collision,
  immediate commit). One end-to-end smoke test remains in
  tests/integration/test_rest_catalog.py.
- Add docs section in mkdocs/docs/api.md.
Adds two new behavior tests:
- test_replace_table_transaction_with_write_atomic_rtas (memory + sql):
  replace + fast_append in one transaction lands schema swap and new
  data atomically. New snapshot is current, old snapshot preserved in
  history.
- test_replace_table_followed_by_separate_append (memory + sql):
  replace_table clears the current snapshot; a subsequent append
  restores main ref with new data only.
- test_replace_table_transaction_rtas_against_rest_server: same RTAS
  flow exercised end-to-end against the REST docker stack.

The bare replace_table() is the DDL-only form (clears current snapshot,
preserves history). RTAS via replace_table_transaction is the primary
use case for atomic schema-and-data swaps.
@smaheshwar-pltr smaheshwar-pltr changed the title Support replace_table and replace_table_transaction for REST catalog Support replace_table and replace_table_transaction May 18, 2026
except TableAlreadyExistsError:
return self.load_table(identifier)

def replace_table(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default impl is just composition — every concrete catalog only needs to override replace_table_transaction. Mirrors how create_table_if_not_exists is structured on the base.

requested_format_version = properties.get(TableProperties.FORMAT_VERSION)
if requested_format_version is not None and int(requested_format_version) < existing_metadata.format_version:
raise ValueError(
f"Cannot downgrade format-version from {existing_metadata.format_version} to {requested_format_version}"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java's buildReplacement reads format-version from properties and only upgrades. Rejecting downgrade explicitly here — otherwise _convert_schema_if_needed would run with v1 semantics while the actual upgrade silently drops, producing a confusing mismatch.

"""
raise NotImplementedError("replace_table_transaction is not supported for this catalog type")

def _replace_staged_table(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maps to Java's TableMetadata.buildReplacement. All the bookkeeping (fresh schema, partition spec, sort order, location resolution, StagedTable construction) lives here so MetastoreCatalog and RestCatalog share it — analogous to how _create_staged_table is factored.

return self._table


class ReplaceTableTransaction(Transaction):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same role as Java's Transactions.replaceTableTransaction — collects the metadata updates that transform the existing table into the replacement and commits them with the replace-specific requirements set.

self._updates += (UpgradeFormatVersionUpdate(format_version=requested_format_version),)

# Remove the main branch ref to clear the current snapshot.
self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only main is cleared, matching Java's buildReplacement which calls removeRef(SnapshotRef.MAIN_BRANCH). Other branches / tags survive replace.


# Schema: reuse an existing schema_id if structurally identical, else add a new one
# with a fresh schema_id (max + 1, matching UpdateSchema's convention).
existing_schema_id = self._find_matching_schema_id(table_metadata, new_schema)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per Java's reuseOrCreateNewSchemaId — walks all historical schemas, reuses the id if structurally identical, otherwise max(id) + 1. The unconditional SetCurrentSchemaUpdate (also for the reuse branch) mirrors Java's RESTSessionCatalog.replaceTransaction which always ensures a SetCurrentSchema change is emitted.

# Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on add to avoid
# collisions with existing specs (AddPartitionSpecUpdate refuses duplicate IDs).
effective_spec = UNPARTITIONED_PARTITION_SPEC if new_spec.is_unpartitioned() else new_spec
existing_spec_id = self._find_matching_spec_id(table_metadata, effective_spec)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per Java's reuseOrCreateNewSpecId. SetDefaultSpecUpdate is unconditional, also per the RESTSessionCatalog.replaceTransaction block.


# Sort order: same reuse-or-add pattern with fresh order_id on add.
effective_sort_order = UNSORTED_SORT_ORDER if new_sort_order.is_unsorted else new_sort_order
existing_order_id = self._find_matching_sort_order_id(table_metadata, effective_sort_order)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per Java's reuseOrCreateNewSortOrderId. Unsorted reuses id 0 in Java; the effective_sort_order substitution above achieves the same.

Comment thread pyiceberg/table/__init__.py Outdated

# Merge properties (SetPropertiesUpdate merges onto existing properties).
if new_properties:
self._updates += (SetPropertiesUpdate(updates=new_properties),)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Properties are merged onto existing, matching Java's TableMetadata.Builder.setProperties which does properties.putAll(updated). Documented in the public docstring — callers wanting to remove keys must use replace_table_transaction and drop them explicitly inside the txn.

Comment thread pyiceberg/partitioning.py
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)


def assign_fresh_partition_spec_ids_for_replace(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirrors the v2 path in Java's TableMetadata.reassignPartitionIds — collect (source_id, transform) -> field_id across all existing specs, reuse on match, fresh ids for the rest.

Comment thread pyiceberg/partitioning.py
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id


def _assign_fresh_partition_spec_ids_for_replace_v1(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirrors the v1 path in Java's reassignPartitionIds. v1 partition specs are append-only with sequential ids — fields absent from the new spec must be carried forward as VoidTransform, otherwise replace would be illegal for v1. The unique-name suffix loop in _unique_void_name matches Java's collision-renaming pattern, generalized to loop further if both name and name_<field_id> are taken.

Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V1 partition specs are append-only by spec rule, so a replace that drops a partition field would produce an invalid v1 spec without this carry-forward. Mirrors Java's v1 branch in reassignPartitionIds — covered by test_replace_table_v1_carries_forward_partition_fields_as_void and the helper-level v1 tests.

Comment thread pyiceberg/schema.py
return new_id


def assign_fresh_schema_ids_for_replace(schema: Schema, base_schema: Schema, last_column_id: int) -> tuple[Schema, int]:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maps to Java's TypeUtil.assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) and its AssignFreshIds visitor. Name-based reuse from the current schema only (not the full history) — matches Java's behaviour. Type compatibility is the caller's responsibility.

Comment thread mkdocs/docs/api.md Outdated
Comment on lines +215 to +218
with txn.update_snapshot().fast_append() as snap:
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=df, io=txn._table.io):
snap.append_data_file(data_file)
txn.set_properties(write_replaced_at="2026-04-19T00:00:00Z")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely we should be showing / testing an example where you have a pyarrow table that you want to replace your table with, so you do replace_table_transaction with that Arrow table's schema and do append on the transaction to achieve it? this feels like the most common use case by far, no?

Comment thread tests/test_schema.py
Comment on lines +1891 to +1892


Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran make lint — picked up trailing whitespace + ruff format issues. Now passing cleanly.

Comment thread mkdocs/docs/api.md Outdated

## Replace a table

Atomically replace an existing table's schema, partition spec, sort order, location, and properties. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). Use this when you want to redefine the table's metadata; pair it with `replace_table_transaction` to atomically write new data alongside the metadata change (RTAS-style).
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adopted your wording with light tweaks ("lets you write new data alongside this change" instead of the slightly clunky "allows for new data to be written").

- Drop view-collision pre-flight check; deferring to a follow-up PR
  that ports the check across create/replace/rename in one pass.
- Simplify RTAS tests + docs to use txn.append(df) instead of the
  verbose update_snapshot().fast_append() + _dataframe_to_data_files
  pattern.
- Drop parallel new_schema construction in RTAS tests; pass df.schema
  directly (replace_table* accepts Schema | pa.Schema).
- Reword docs intro per reviewer suggestion.
- Run formatter / linter; trailing whitespace and trailing newline fix.
- mypy: cast format_version to TableVersion; extract inner with into
  helper to silence unreachable warning under pytest.raises.
Per reviewer feedback: bare replace_table examples and tests should
construct an explicit Schema, since that's the natural user-facing
API for DDL-only redefinition. RTAS flows keep df.schema since the
data and schema are coupled there.
Matches PR apache#498 (create_table_transaction) precedent. NoopCatalog gets
an explicit stub since it extends Catalog directly.
Per audit: clarify that we always emit AssertLastAssigned*Id, which is
stricter than the reference contract that conditions them on Add*
being present. Fail-safe vs corruption-risk.
Matches the sibling commit_transaction signatures — the requirements
list is visible in the code below; no need to enumerate it in prose.
return existing.order_id
return None

def commit_transaction(self) -> Table:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per Java's UpdateRequirements.forReplaceTable plus the AddSchema / AddPartitionSpec per-update additions. AssertCurrentSchemaID / AssertDefaultSpecID / AssertDefaultSortOrderID are deliberately not emitted — Java suppresses them via the !isReplace guards.

One deliberate divergence: Python emits AssertLastAssignedFieldId / AssertLastAssignedPartitionId unconditionally, where Java emits them only when the matching Add* update is in the change set. Stricter, fail-safe — a true no-op replace just confirms the asserts hold; a concurrent column-add between load and commit fails fast.

self._updates += (SetPropertiesUpdate(updates=new_properties),)

@staticmethod
def _find_matching_schema_id(table_metadata: TableMetadata, schema: Schema) -> int | None:
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mirrors Java's reuseOrCreateNewSchemaId (and the spec / sort-order siblings) — walk all historical entries, return the existing id on structural match, otherwise generate a fresh one. Covers the case Fokko walked through in #433 (comment): CREATE OR REPLACE back to a previously-seen schema reuses its schema_id and does not append a duplicate.

Critical:
- test_concurrent_replace_table now uses identical new schemas across
  the two transactions and matches on 'last assigned field id' — this
  is the only path that actually validates AssertLastAssignedFieldId.

Coverage:
- Add test_replace_table_with_sort_order_changes: unsorted → sorted →
  unsorted with order_id reuse from history.
- Extend the v2 partition-spec helper test with a reuse-by-bucket case
  (matching field_id under a renamed partition field).

Brittleness fixes:
- Drop dead HEAD mock in test_replace_table_transaction_404_raises;
  no view-exists pre-flight to mock anymore.
- Pin wire-payload assertions to fixture metadata fields (no more
  magic 999 / spec_id / sort_order_id constants).
- Tighten set-default-spec / set-default-sort-order to check the
  emitted id, not just existence.
- Split test_replace_table_location_resolution into two non-stringly-
  discriminated tests.

Removed:
- test_replace_table_accepts_pyarrow_schema: redundant with the RTAS
  test which already exercises pa.Schema via df.schema.
- test_replace_table_drops_identifier_field: pairs with the existing
  preserve test; verifies that a new schema without identifier_field_ids
  clears the previous set rather than silently carrying it forward.
- test_replace_table_v2_does_not_carry_forward_void_field: v2 specs
  aren't append-only, so a dropped partition field is gone (unlike v1).
- test_replace_after_format_version_upgrade: v1 -> v2 via replace,
  then a second replace on the now-v2 table must not retrigger the
  upgrade or fail.
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
staged_table, fresh_schema, fresh_spec, fresh_sort_order, resolved_location = self._replace_staged_table(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads-up on a deliberate scope cut: Java's RESTSessionCatalog.replaceTransaction does a view-existence pre-flight check before replacing. PyIceberg doesn't currently do this in the equivalent create_table / rename_table / register_table paths either, so adding it only to replace_table here would be inconsistent. Splitting that into a follow-up PR that ports the check across all the relevant call sites in one pass.

Match new_table_metadata's behavior of popping format-version before
persistence, and validate schema compatibility against the resolved
format version at the same site.
- Pin that 'format-version' is not in replaced.metadata.properties.
- Add list/map element_id/key_id/value_id reuse coverage for
  assign_fresh_schema_ids_for_replace.
- Extend sort-order reuse test to cover reusing a non-zero sorted
  order_id from history.
- Drop the stale HEAD (view-exists) reference in
  test_replace_table_issues_commit_post_immediately.
int(requested_format_version) if requested_format_version is not None else existing_metadata.format_version
)
iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version))
iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same call new_table_metadata makes (metadata.py:597), and the same check Java's Builder runs inside addSchemaInternal. Catches v1-incompatible types up front rather than failing later inside AddSchemaUpdate's apply path.

int(requested_format_version) if requested_format_version is not None else existing_metadata.format_version
)
iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version))
iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same call new_table_metadata makes (metadata.py:597), and the same check Java's Builder runs inside addSchemaInternal. Catches v1-incompatible types up front rather than failing later inside AddSchemaUpdate's apply path.

Comment thread tests/integration/test_rest_catalog.py Outdated
Comment on lines +78 to +82
"""End-to-end smoke test: replace_table against a real REST server.

Detailed replace_table semantics are covered against InMemoryCatalog and SqlCatalog in
`tests/catalog/test_catalog_behaviors.py`. This test verifies the REST wire path: server
accepts the commit, preserves the UUID, and clears the current snapshot."""
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""End-to-end smoke test: replace_table against a real REST server.
Detailed replace_table semantics are covered against InMemoryCatalog and SqlCatalog in
`tests/catalog/test_catalog_behaviors.py`. This test verifies the REST wire path: server
accepts the commit, preserves the UUID, and clears the current snapshot."""

Comment thread tests/integration/test_rest_catalog.py Outdated

@pytest.mark.integration
@pytest.mark.parametrize("catalog", [lf("session_catalog")])
def test_replace_table_end_to_end_against_rest_server(catalog: Catalog) -> None:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_replace_table_end_to_end_against_rest_server(catalog: Catalog) -> None:
def test_replace_table(catalog: Catalog) -> None:

Comment thread tests/integration/test_rest_catalog.py Outdated
Comment on lines +118 to +120
def test_replace_table_transaction_rtas_against_rest_server(catalog: Catalog) -> None:
"""RTAS (Replace Table As Select) against a real REST server: the schema swap and the
new-data write must land atomically — the new snapshot is current on commit."""
Copy link
Copy Markdown
Contributor Author

@smaheshwar-pltr smaheshwar-pltr May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_replace_table_transaction_rtas_against_rest_server(catalog: Catalog) -> None:
"""RTAS (Replace Table As Select) against a real REST server: the schema swap and the
new-data write must land atomicallythe new snapshot is current on commit."""
def test_replace_table_transaction(catalog: Catalog) -> None:

Drop docstrings that restated the function name or assertions, drop
narrative inline comments where variable names already carry the
meaning, and tighten multi-paragraph docstrings to one line. Also
shorten the integration test function names to match how the rest
of the file is structured.
identifier, schema, location, partition_spec, sort_order, properties
).commit_transaction()

@abstractmethod
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointing out that this is a new abstract method. I'm matching #498 that also introduced a new abstract method create_table_transaction. This technically breaks out-of-tree Catalog implementations that do not implement this, but I thought this was fine given the precedence in the above PR. (Happy to change to this raising NotImplemented instead and removing abstractmethod, but I do prefer abstractmethod from a design perspective and think it's fine to make this change)

That file is the cross-catalog integration analog to Java's
CatalogTests.java; every other mutation (create/rename/drop/etc.)
already has a slot there parametrized over six catalog fixtures.
The two replace tests have no REST-specific code and don't belong
in test_rest_catalog.py.

Adopts the file's existing conventions: test_catalog parameter
name, database_name + table_name fixtures, no manual cleanup
guards (clean_up runs between tests).
…g.py

The fixture had no S3 credentials, which was fine when every test in
the file was metadata-only. The new replace tests do .append() and
fail with ACCESS_DENIED against minio. Mirrors the credentials the
hive_catalog fixture in the same file already uses.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

REPLACE TABLE Support

1 participant