Support replace_table and replace_table_transaction#3220
Support replace_table and replace_table_transaction#3220smaheshwar-pltr wants to merge 25 commits into
replace_table and replace_table_transaction#3220Conversation
c798d18 to
4361d29
Compare
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
4361d29 to
93d77d3
Compare
- Move replace_table_transaction implementations to MetastoreCatalog and RestCatalog; keep stub + shared _replace_staged_table helper on Catalog so out-of-tree subclasses don't break. - Emit UpgradeFormatVersionUpdate when properties bump format-version, matching Java's TableMetadata.buildReplacement. - Always emit SetCurrentSchema / SetDefaultPartitionSpec / SetDefaultSortOrder, mirroring RESTSessionCatalog.replaceTransaction. - Handle v1 partition specs by carrying forward removed fields as VoidTransform (v1 specs are append-only). - Reject view collisions before replacing. - Assign fresh schema_id / spec_id / order_id on Add* updates so AddSchemaUpdate / AddPartitionSpecUpdate / AddSortOrderUpdate produce uniquely-keyed entries. - Tests: behaviour now covered in tests/catalog/test_catalog_behaviors.py parametrized across InMemoryCatalog + SqlCatalog. test_rest.py keeps only REST wire-specific cases (payload shape, 404, view collision, immediate commit). One end-to-end smoke test remains in tests/integration/test_rest_catalog.py. - Add docs section in mkdocs/docs/api.md.
Adds two new behavior tests: - test_replace_table_transaction_with_write_atomic_rtas (memory + sql): replace + fast_append in one transaction lands schema swap and new data atomically. New snapshot is current, old snapshot preserved in history. - test_replace_table_followed_by_separate_append (memory + sql): replace_table clears the current snapshot; a subsequent append restores main ref with new data only. - test_replace_table_transaction_rtas_against_rest_server: same RTAS flow exercised end-to-end against the REST docker stack. The bare replace_table() is the DDL-only form (clears current snapshot, preserves history). RTAS via replace_table_transaction is the primary use case for atomic schema-and-data swaps.
replace_table and replace_table_transaction for REST catalogreplace_table and replace_table_transaction
| except TableAlreadyExistsError: | ||
| return self.load_table(identifier) | ||
|
|
||
| def replace_table( |
There was a problem hiding this comment.
Default impl is just composition — every concrete catalog only needs to override replace_table_transaction. Mirrors how create_table_if_not_exists is structured on the base.
| requested_format_version = properties.get(TableProperties.FORMAT_VERSION) | ||
| if requested_format_version is not None and int(requested_format_version) < existing_metadata.format_version: | ||
| raise ValueError( | ||
| f"Cannot downgrade format-version from {existing_metadata.format_version} to {requested_format_version}" |
There was a problem hiding this comment.
Java's buildReplacement reads format-version from properties and only upgrades. Rejecting downgrade explicitly here — otherwise _convert_schema_if_needed would run with v1 semantics while the actual upgrade silently drops, producing a confusing mismatch.
| """ | ||
| raise NotImplementedError("replace_table_transaction is not supported for this catalog type") | ||
|
|
||
| def _replace_staged_table( |
There was a problem hiding this comment.
Maps to Java's TableMetadata.buildReplacement. All the bookkeeping (fresh schema, partition spec, sort order, location resolution, StagedTable construction) lives here so MetastoreCatalog and RestCatalog share it — analogous to how _create_staged_table is factored.
| return self._table | ||
|
|
||
|
|
||
| class ReplaceTableTransaction(Transaction): |
There was a problem hiding this comment.
Same role as Java's Transactions.replaceTableTransaction — collects the metadata updates that transform the existing table into the replacement and commits them with the replace-specific requirements set.
| self._updates += (UpgradeFormatVersionUpdate(format_version=requested_format_version),) | ||
|
|
||
| # Remove the main branch ref to clear the current snapshot. | ||
| self._updates += (RemoveSnapshotRefUpdate(ref_name=MAIN_BRANCH),) |
There was a problem hiding this comment.
Only main is cleared, matching Java's buildReplacement which calls removeRef(SnapshotRef.MAIN_BRANCH). Other branches / tags survive replace.
|
|
||
| # Schema: reuse an existing schema_id if structurally identical, else add a new one | ||
| # with a fresh schema_id (max + 1, matching UpdateSchema's convention). | ||
| existing_schema_id = self._find_matching_schema_id(table_metadata, new_schema) |
There was a problem hiding this comment.
Per Java's reuseOrCreateNewSchemaId — walks all historical schemas, reuses the id if structurally identical, otherwise max(id) + 1. The unconditional SetCurrentSchemaUpdate (also for the reuse branch) mirrors Java's RESTSessionCatalog.replaceTransaction which always ensures a SetCurrentSchema change is emitted.
| # Partition spec: same reuse-or-add pattern. Assign a fresh spec_id on add to avoid | ||
| # collisions with existing specs (AddPartitionSpecUpdate refuses duplicate IDs). | ||
| effective_spec = UNPARTITIONED_PARTITION_SPEC if new_spec.is_unpartitioned() else new_spec | ||
| existing_spec_id = self._find_matching_spec_id(table_metadata, effective_spec) |
There was a problem hiding this comment.
Per Java's reuseOrCreateNewSpecId. SetDefaultSpecUpdate is unconditional, also per the RESTSessionCatalog.replaceTransaction block.
|
|
||
| # Sort order: same reuse-or-add pattern with fresh order_id on add. | ||
| effective_sort_order = UNSORTED_SORT_ORDER if new_sort_order.is_unsorted else new_sort_order | ||
| existing_order_id = self._find_matching_sort_order_id(table_metadata, effective_sort_order) |
There was a problem hiding this comment.
Per Java's reuseOrCreateNewSortOrderId. Unsorted reuses id 0 in Java; the effective_sort_order substitution above achieves the same.
|
|
||
| # Merge properties (SetPropertiesUpdate merges onto existing properties). | ||
| if new_properties: | ||
| self._updates += (SetPropertiesUpdate(updates=new_properties),) |
There was a problem hiding this comment.
Properties are merged onto existing, matching Java's TableMetadata.Builder.setProperties which does properties.putAll(updated). Documented in the public docstring — callers wanting to remove keys must use replace_table_transaction and drop them explicitly inside the txn.
| return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID) | ||
|
|
||
|
|
||
| def assign_fresh_partition_spec_ids_for_replace( |
There was a problem hiding this comment.
Mirrors the v2 path in Java's TableMetadata.reassignPartitionIds — collect (source_id, transform) -> field_id across all existing specs, reuse on match, fresh ids for the rest.
| return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id | ||
|
|
||
|
|
||
| def _assign_fresh_partition_spec_ids_for_replace_v1( |
There was a problem hiding this comment.
Mirrors the v1 path in Java's reassignPartitionIds. v1 partition specs are append-only with sequential ids — fields absent from the new spec must be carried forward as VoidTransform, otherwise replace would be illegal for v1. The unique-name suffix loop in _unique_void_name matches Java's collision-renaming pattern, generalized to loop further if both name and name_<field_id> are taken.
There was a problem hiding this comment.
V1 partition specs are append-only by spec rule, so a replace that drops a partition field would produce an invalid v1 spec without this carry-forward. Mirrors Java's v1 branch in reassignPartitionIds — covered by test_replace_table_v1_carries_forward_partition_fields_as_void and the helper-level v1 tests.
| return new_id | ||
|
|
||
|
|
||
| def assign_fresh_schema_ids_for_replace(schema: Schema, base_schema: Schema, last_column_id: int) -> tuple[Schema, int]: |
There was a problem hiding this comment.
Maps to Java's TypeUtil.assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) and its AssignFreshIds visitor. Name-based reuse from the current schema only (not the full history) — matches Java's behaviour. Type compatibility is the caller's responsibility.
| with txn.update_snapshot().fast_append() as snap: | ||
| for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=df, io=txn._table.io): | ||
| snap.append_data_file(data_file) | ||
| txn.set_properties(write_replaced_at="2026-04-19T00:00:00Z") |
There was a problem hiding this comment.
Surely we should be showing / testing an example where you have a pyarrow table that you want to replace your table with, so you do replace_table_transaction with that Arrow table's schema and do append on the transaction to achieve it? this feels like the most common use case by far, no?
|
|
||
|
|
There was a problem hiding this comment.
Ran make lint — picked up trailing whitespace + ruff format issues. Now passing cleanly.
|
|
||
| ## Replace a table | ||
|
|
||
| Atomically replace an existing table's schema, partition spec, sort order, location, and properties. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). Use this when you want to redefine the table's metadata; pair it with `replace_table_transaction` to atomically write new data alongside the metadata change (RTAS-style). |
There was a problem hiding this comment.
Adopted your wording with light tweaks ("lets you write new data alongside this change" instead of the slightly clunky "allows for new data to be written").
- Drop view-collision pre-flight check; deferring to a follow-up PR that ports the check across create/replace/rename in one pass. - Simplify RTAS tests + docs to use txn.append(df) instead of the verbose update_snapshot().fast_append() + _dataframe_to_data_files pattern. - Drop parallel new_schema construction in RTAS tests; pass df.schema directly (replace_table* accepts Schema | pa.Schema). - Reword docs intro per reviewer suggestion. - Run formatter / linter; trailing whitespace and trailing newline fix. - mypy: cast format_version to TableVersion; extract inner with into helper to silence unreachable warning under pytest.raises.
Per reviewer feedback: bare replace_table examples and tests should construct an explicit Schema, since that's the natural user-facing API for DDL-only redefinition. RTAS flows keep df.schema since the data and schema are coupled there.
Matches PR apache#498 (create_table_transaction) precedent. NoopCatalog gets an explicit stub since it extends Catalog directly.
Per audit: clarify that we always emit AssertLastAssigned*Id, which is stricter than the reference contract that conditions them on Add* being present. Fail-safe vs corruption-risk.
Matches the sibling commit_transaction signatures — the requirements list is visible in the code below; no need to enumerate it in prose.
| return existing.order_id | ||
| return None | ||
|
|
||
| def commit_transaction(self) -> Table: |
There was a problem hiding this comment.
Per Java's UpdateRequirements.forReplaceTable plus the AddSchema / AddPartitionSpec per-update additions. AssertCurrentSchemaID / AssertDefaultSpecID / AssertDefaultSortOrderID are deliberately not emitted — Java suppresses them via the !isReplace guards.
One deliberate divergence: Python emits AssertLastAssignedFieldId / AssertLastAssignedPartitionId unconditionally, where Java emits them only when the matching Add* update is in the change set. Stricter, fail-safe — a true no-op replace just confirms the asserts hold; a concurrent column-add between load and commit fails fast.
| self._updates += (SetPropertiesUpdate(updates=new_properties),) | ||
|
|
||
| @staticmethod | ||
| def _find_matching_schema_id(table_metadata: TableMetadata, schema: Schema) -> int | None: |
There was a problem hiding this comment.
Mirrors Java's reuseOrCreateNewSchemaId (and the spec / sort-order siblings) — walk all historical entries, return the existing id on structural match, otherwise generate a fresh one. Covers the case Fokko walked through in #433 (comment): CREATE OR REPLACE back to a previously-seen schema reuses its schema_id and does not append a duplicate.
Critical: - test_concurrent_replace_table now uses identical new schemas across the two transactions and matches on 'last assigned field id' — this is the only path that actually validates AssertLastAssignedFieldId. Coverage: - Add test_replace_table_with_sort_order_changes: unsorted → sorted → unsorted with order_id reuse from history. - Extend the v2 partition-spec helper test with a reuse-by-bucket case (matching field_id under a renamed partition field). Brittleness fixes: - Drop dead HEAD mock in test_replace_table_transaction_404_raises; no view-exists pre-flight to mock anymore. - Pin wire-payload assertions to fixture metadata fields (no more magic 999 / spec_id / sort_order_id constants). - Tighten set-default-spec / set-default-sort-order to check the emitted id, not just existence. - Split test_replace_table_location_resolution into two non-stringly- discriminated tests. Removed: - test_replace_table_accepts_pyarrow_schema: redundant with the RTAS test which already exercises pa.Schema via df.schema.
- test_replace_table_drops_identifier_field: pairs with the existing preserve test; verifies that a new schema without identifier_field_ids clears the previous set rather than silently carrying it forward. - test_replace_table_v2_does_not_carry_forward_void_field: v2 specs aren't append-only, so a dropped partition field is gone (unlike v1). - test_replace_after_format_version_upgrade: v1 -> v2 via replace, then a second replace on the now-v2 table must not retrigger the upgrade or fail.
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> ReplaceTableTransaction: | ||
| staged_table, fresh_schema, fresh_spec, fresh_sort_order, resolved_location = self._replace_staged_table( |
There was a problem hiding this comment.
Heads-up on a deliberate scope cut: Java's RESTSessionCatalog.replaceTransaction does a view-existence pre-flight check before replacing. PyIceberg doesn't currently do this in the equivalent create_table / rename_table / register_table paths either, so adding it only to replace_table here would be inconsistent. Splitting that into a follow-up PR that ports the check across all the relevant call sites in one pass.
Match new_table_metadata's behavior of popping format-version before persistence, and validate schema compatibility against the resolved format version at the same site.
- Pin that 'format-version' is not in replaced.metadata.properties. - Add list/map element_id/key_id/value_id reuse coverage for assign_fresh_schema_ids_for_replace. - Extend sort-order reuse test to cover reusing a non-zero sorted order_id from history. - Drop the stale HEAD (view-exists) reference in test_replace_table_issues_commit_post_immediately.
| int(requested_format_version) if requested_format_version is not None else existing_metadata.format_version | ||
| ) | ||
| iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version)) | ||
| iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version)) |
There was a problem hiding this comment.
Same call new_table_metadata makes (metadata.py:597), and the same check Java's Builder runs inside addSchemaInternal. Catches v1-incompatible types up front rather than failing later inside AddSchemaUpdate's apply path.
| int(requested_format_version) if requested_format_version is not None else existing_metadata.format_version | ||
| ) | ||
| iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version)) | ||
| iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version)) |
There was a problem hiding this comment.
Same call new_table_metadata makes (metadata.py:597), and the same check Java's Builder runs inside addSchemaInternal. Catches v1-incompatible types up front rather than failing later inside AddSchemaUpdate's apply path.
| """End-to-end smoke test: replace_table against a real REST server. | ||
|
|
||
| Detailed replace_table semantics are covered against InMemoryCatalog and SqlCatalog in | ||
| `tests/catalog/test_catalog_behaviors.py`. This test verifies the REST wire path: server | ||
| accepts the commit, preserves the UUID, and clears the current snapshot.""" |
There was a problem hiding this comment.
| """End-to-end smoke test: replace_table against a real REST server. | |
| Detailed replace_table semantics are covered against InMemoryCatalog and SqlCatalog in | |
| `tests/catalog/test_catalog_behaviors.py`. This test verifies the REST wire path: server | |
| accepts the commit, preserves the UUID, and clears the current snapshot.""" |
|
|
||
| @pytest.mark.integration | ||
| @pytest.mark.parametrize("catalog", [lf("session_catalog")]) | ||
| def test_replace_table_end_to_end_against_rest_server(catalog: Catalog) -> None: |
There was a problem hiding this comment.
| def test_replace_table_end_to_end_against_rest_server(catalog: Catalog) -> None: | |
| def test_replace_table(catalog: Catalog) -> None: |
| def test_replace_table_transaction_rtas_against_rest_server(catalog: Catalog) -> None: | ||
| """RTAS (Replace Table As Select) against a real REST server: the schema swap and the | ||
| new-data write must land atomically — the new snapshot is current on commit.""" |
There was a problem hiding this comment.
| def test_replace_table_transaction_rtas_against_rest_server(catalog: Catalog) -> None: | |
| """RTAS (Replace Table As Select) against a real REST server: the schema swap and the | |
| new-data write must land atomically — the new snapshot is current on commit.""" | |
| def test_replace_table_transaction(catalog: Catalog) -> None: |
Drop docstrings that restated the function name or assertions, drop narrative inline comments where variable names already carry the meaning, and tighten multi-paragraph docstrings to one line. Also shorten the integration test function names to match how the rest of the file is structured.
| identifier, schema, location, partition_spec, sort_order, properties | ||
| ).commit_transaction() | ||
|
|
||
| @abstractmethod |
There was a problem hiding this comment.
Pointing out that this is a new abstract method. I'm matching #498 that also introduced a new abstract method create_table_transaction. This technically breaks out-of-tree Catalog implementations that do not implement this, but I thought this was fine given the precedence in the above PR. (Happy to change to this raising NotImplemented instead and removing abstractmethod, but I do prefer abstractmethod from a design perspective and think it's fine to make this change)
That file is the cross-catalog integration analog to Java's CatalogTests.java; every other mutation (create/rename/drop/etc.) already has a slot there parametrized over six catalog fixtures. The two replace tests have no REST-specific code and don't belong in test_rest_catalog.py. Adopts the file's existing conventions: test_catalog parameter name, database_name + table_name fixtures, no manual cleanup guards (clean_up runs between tests).
…g.py The fixture had no S3 credentials, which was fine when every test in the file was metadata-only. The new replace tests do .append() and fail with ACCESS_DENIED against minio. Mirrors the credentials the hive_catalog fixture in the same file already uses.
Closes #281.
Rationale for this change
replace_tableandreplace_table_transactiononCatalog. The former delegates to the latter, with the latter being abstract.MetastoreCatalogand onRestCatalog, with a sharedCatalog._replace_staged_tablehelper that builds the fresh metadataFurther notes:
(source_id, transform)across all specs (v2+). On v1 (append-only specs), removed fields are carried forward asVoidTransform.properties={"format-version": "2"}upgrades; downgrade is rejected.SetCurrentSchema/SetDefaultPartitionSpec/SetDefaultSortOrderare always emitted, even when the resulting id is reused.AssertTableUUID,AssertLastAssignedFieldId,AssertLastAssignedPartitionId.Are these changes tested?
Yes, with both unit and integration tests:
tests/catalog/test_catalog_behaviors.pytests/catalog/test_rest.pytests/integration/test_catalog.pytests/test_schema.py,tests/table/test_partitioning.pyAre there any user-facing changes?
Catalog+ abstract method