[python] Extend commit protocol for compaction (DataIncrement/CompactIncrement)#7873
Open
TheR1sing3un wants to merge 2 commits into
Open
[python] Extend commit protocol for compaction (DataIncrement/CompactIncrement)#7873TheR1sing3un wants to merge 2 commits into
TheR1sing3un wants to merge 2 commits into
Conversation
…Increment) Lay the protocol-level groundwork for upcoming compaction work in pypaimon by aligning CommitMessage with Java's CommitMessageImpl shape and adding a JSON-safe wire format for cross-process transport. Structural changes: - New DataIncrement (write side) and CompactIncrement (compaction side) value objects, direct ports of org.apache.paimon.io.DataIncrement and CompactIncrement. Each holds (new_files, deleted_files, changelog_files, new_index_files, deleted_index_files) so future deletion-vector / changelog work has an unambiguous slot. - CommitMessage refactored to (partition, bucket, total_buckets, data_increment, compact_increment, check_from_snapshot). Convenience properties (new_files, compact_before, compact_after, ...) preserve read-site ergonomics. - FileStoreCommit emits ADD entries for compact_after, DELETE entries for compact_before, and auto-selects commit_kind=COMPACT when a message carries only compact increments. A dedicated commit_compact() helper enforces COMPACT-only semantics with no row-id assignment. - FileStoreWrite / TableUpdate construct CommitMessage via DataIncrement on the existing write path — no behavior change for current callers. DataFileMeta serde: - to_dict / from_dict round-trip with tagged-value encoding for bytes, Decimal, datetime, date, time, and Timestamp so file metas can ship through JSON-only transports (e.g. Ray task payloads later). - Public encode_value / decode_value helpers reused by CommitMessage's partition tuples (DATE / DECIMAL / bytes / Timestamp partitions). - Tolerates manifest-side BinaryRow (lazy-decoded) and pyarrow Array-like null_counts so round-tripping a freshly-produced file meta doesn't fail. CommitMessageSerializer: - VERSION=1 wire format covering full DataIncrement + CompactIncrement shape (including IndexFileMeta identity fields). dv_ranges / global_index_meta will be wired up alongside deletion-vector phases. No observable behavior change for read / write / commit today; this is foundation for the compaction module, append-only compaction job, PK LSM compaction, and Ray distributed executor that land in follow-up PRs (apache#7771 originally bundled all of them). Test plan: - New commit_message_serializer_test: round-trip CommitMessage with DataIncrement / CompactIncrement / index files / non-JSON-native partition tuples (DATE, Decimal, bytes, Timestamp); IndexFileMeta round-trip; unknown-version rejection. - New file_store_commit_compact_test: protocol-level coverage of compact_before -> DELETE entry, compact_after -> ADD entry, and auto-COMPACT kind selection (full e2e covered when the compactor lands). - Existing file_store_commit_test / partition_predicate_test / table_commit_test updated to construct CommitMessage via DataIncrement instead of the legacy new_files= kwarg. Refs: split from apache#7771 to ease incremental community review.
81e00a8 to
c1fc089
Compare
Correctness:
- FileStoreCommit._build_commit_entries now rejects DataIncrement.{deleted_files,
changelog_files, new_index_files, deleted_index_files} and
CompactIncrement.{changelog_files, new_index_files, deleted_index_files}.
These slots used to be silently dropped at commit; raising loudly turns
a future correctness foot-gun into a NotImplementedError so later
changelog-producer / deletion-vector / row-level-delete work has to wire
the new path through commit explicitly.
- ManifestEntry now uses msg.total_buckets when set, falling back to
self.table.total_buckets otherwise. A stale plan whose bucket count has
since been rescaled would otherwise be silently overwritten with the
new value.
- FileStoreCommit.commit() now rejects messages carrying compact_increment.
commit() is the write-side entry (always APPEND, OVERWRITE if conflict
detection demands it); compact_increment must go through commit_compact().
The previous 'auto-pick COMPACT when no new_files' branch was unreachable
(FileStoreWrite.prepare_commit() only fills new_files; CompactJob calls
commit_compact() directly) and would have produced the wrong snapshot
shape for a mixed message anyway.
Docs / naming:
- CommitMessageSerializer: docstring trimmed to its job (JSON wire format
for pypaimon ↔ pypaimon transport, e.g. Ray driver ↔ workers).
- commit_compact: docstring trimmed to its behavior.
- DataIncrement.empty() / CompactIncrement.empty() renamed to
empty_increment() for a more specific name; no callers in-tree yet.
- Trim cross-language commentary from class docstrings on CommitMessage,
DataIncrement, CompactIncrement.
Tests (file_store_commit_compact_test):
- New: NotImplementedError raised for unsupported DataIncrement /
CompactIncrement fields.
- New: msg.total_buckets wins over table.total_buckets when set; fallback
otherwise.
- New: commit() rejects compact_increment messages.
- Removed: two cases that exercised the old auto-COMPACT branch (now
unreachable).
c1fc089 to
4ee0654
Compare
Member
Author
|
Ready for review, 1st pr of the entire compaction feature. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Align
CommitMessagewith Java'sCommitMessageImplshape and add a JSON-safe wire format, so later compaction work has somewhere to plugcompact_before/compact_afterfiles and a serializer to ship them through Ray workers.Foundation only — read / write / commit produce the same snapshots.
Split from #7771.
Changes
DataIncrement/CompactIncrementvalue objects;CommitMessagenow holds(partition, bucket, total_buckets, data_increment, compact_increment, check_from_snapshot). Convenience properties preservemsg.new_files/msg.compact_beforeergonomics.FileStoreCommitemits ADD forcompact_after, DELETE forcompact_before, auto-pickscommit_kind=COMPACTwhen only compact increments are present. Newcommit_compact()skips row-id assignment.DataFileMeta.to_dict/from_dictwith tagged encoding forbytes/Decimal/datetime/date/time/Timestamp;encode_value/decode_valuepublic forCommitMessage.partitionround-trip.CommitMessageSerializer(VERSION=1) covers the fullDataIncrement+CompactIncrementshape includingIndexFileMeta.Tests
commit_message_serializer_test— round-trip with non-JSON-native partition values + index files + version rejection.file_store_commit_compact_test—compact_before→ DELETE,compact_after→ ADD, auto-COMPACT kind.file_store_commit_test/partition_predicate_test/table_commit_testadapted to the newCommitMessagesignature.