[core] Optimize Flink BTree index topology#7852
Conversation
JingsongLi
left a comment
There was a problem hiding this comment.
Review: [core] Optimize Flink BTree index topology
Nice optimization. Replacing N separate Flink topologies (one per row range) with a single unified topology keyed by a synthetic buildTaskId sort prefix is a clean approach to reducing JobGraph construction overhead.
Correctness
The overall design is sound:
- The buildTaskId field is prepended as the primary sort key, so after range-shuffle + local-sort, data within each writer subtask is guaranteed monotonically ordered by (taskId, indexColumn). Task transitions are one-directional within each subtask.
- flushCurrentWriter() correctly handles both task-boundary flush and within-task overflow flush.
- The ReadDataOperator output type matches sortReadType (with the taskId column), and the taskId column survives the sort.
Suggestions
-
BUILD_TASK_ID_FIELD_ID = -1 -- The choice of a negative field ID avoids collision with real schema field IDs. A short comment documenting this invariant would help future readers.
-
buildTasksById HashMap rebuilt in every parallel writer subtask -- Each subtask independently reconstructs the full map. For a small number of tasks this is negligible, but could be restricted to only relevant tasks in the future.
-
Parallelism calculation -- Integer division (totalRecords / recordsPerRange) means 1500 records with recordsPerRange=1000 yields parallelism=1. Matches old behavior but worth noting.
-
BTreeSplitTask.split field relies on Java-Serializable -- Slightly tighter coupling than the previous Flink TypeInformation-based serializer.
Tests
Good coverage: unit tests for calculateParallelism and IT tests for end-to-end flow. Overall a well-structured change. LGTM with minor suggestions.
|
+1 |
What changed
Why
When row ranges are highly fragmented, the old implementation creates a separate Flink topology for each range. That can make the create-index procedure spend a long time constructing the JobGraph and can produce an oversized topology.
Validation
mvn -pl paimon-flink/paimon-flink-common -DfailIfNoTests=false -Dtest=BTreeIndexTopoBuilderTest testmvn -pl paimon-flink/paimon-flink-common -Pfast-build -DfailIfNoTests=false -Dtest=BTreeGlobalIndexITCase#testBTreeIndexWithManyPartitions testmvn -pl paimon-flink/paimon-flink-common -Pfast-build -DfailIfNoTests=false -Dtest=BTreeGlobalIndexITCase#testBTreeIndexWithSingleRangeAndParallelWriters test