[GLUTEN-12187][VL] Port AttachDistributedSequenceExec to Velox backend#12188
[GLUTEN-12187][VL] Port AttachDistributedSequenceExec to Velox backend#12188baibaichen wants to merge 5 commits into
Conversation
|
Run Gluten Clickhouse CI on x86 |
0d5f602 to
a50a94a
Compare
|
Run Gluten Clickhouse CI on x86 |
a50a94a to
fe3bf16
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
2 similar comments
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
@zhztheplayer would you please review this PR? |
Adds a Velox implementation of Spark's AttachDistributedSequenceExec that prepends a contiguous, globally increasing Long id column to its child output. Used by pandas-on-Spark distributed-sequence index and DataFrame.zipWithIndex. Plan-level - New abstract base ColumnarAttachDistributedSequenceBaseExec in gluten-substrait/, with factory from(plan) delegating to the backend API and a doColumnarCleanup() hook called from cleanupResources(). - New offload rule case + validator gate in OffloadSingleNodeRules / Validators. - New backend hook genColumnarAttachDistributedSequenceExec on SparkPlanExecApi. Velox override returns the columnar impl; CH override throws GlutenNotSupportException until that backend is ported. - Config spark.gluten.sql.columnar.attachDistributedSequence (default true) lets users disable the offload. Velox runtime - For >1 partition, materialize the child output once via Gluten's existing ColumnarCachedBatchSerializer, persisted at MEMORY_AND_DISK_SER. The cache blob is Velox-native serialization (CachedColumnarBatch), much more compact than unsafe-row SER for wide / nested data. - Count pass reads CachedColumnarBatch.numRows for partitions [0, numPartitions - 1) -- no native deserialization required. - Assign pass: convertCachedBatchToColumnarBatch -> ColumnarBatches.load (zero-copy Arrow C-Data ABI handoff) -> prepend one ArrowWritableColumnVector with the id column. - Single-partition queries skip caching entirely. Memory hygiene - doColumnarCleanup() unpersists the cached RDD when the query finishes so BlockManager does not hold the serialized batches beyond the operator's lifetime. - The persisted RDD is cached behind a synchronized accessor so repeated doExecuteColumnar() calls share a single persist() handle. - assignIds wraps the per-batch build in a try/catch that closes the freshly-loaded heavy batch on failure, preventing Arrow buffer leaks on mid-build OOM. Tests - New VeloxAttachDistributedSequenceExecSuite in backends-velox. Closes apache#12187 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…tedSequence config
The original implementation persisted the child's columnar output via
ColumnarCachedBatchSerializer to avoid re-executing the child plan twice.
That path fails on zero-column batches that can result from column
pruning (e.g. df.select("id") prunes every input column away):
ensureVeloxBatch -> isVeloxBatch -> getIndicatorVector throws because the
batch is neither LIGHT nor HEAVY.
Switch to the simpler vanilla-Spark-style approach (matches the
pandas-on-Spark cache="NONE" option): run the child once to count rows
per partition for [0, numPartitions - 1), then run it again to attach
the id column. One extra child execution; full robustness across
arbitrary projections.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…t batch Round-3 (drop-cache) made our output batch flow through OffloadArrowDataExec when a downstream Velox consumer (e.g. shuffle after repartition) follows our op. That path calls ColumnarBatches.offload -> getRefCntHeavy, which asserts every column in the heavy batch shares the same reference count. Our previous output mixed a freshly allocated id column (refCnt=1) with retain'd input columns (refCnt=2), tripping the assertion -- which surfaced as the SPARK-36338 regression in the inherited GlutenDataFrameSuite. Allocate fresh ArrowWritableColumnVectors for every output column and copy input values per row via ValueVector.copyFromSafe. All output columns then have refCnt=1, the input batch is untouched, and the offload path works regardless of which transition the planner inserts. Test additions: - New 'output survives a downstream Velox shuffle (offload path)' test reproduces the bug locally (repartition after attach). - Set spark.sql.ansi.enabled=false in suite sparkConf so the columnar exec is actually selected under Spark 4.x where ANSI is default true (Gluten falls back the whole plan otherwise). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…support The new case in OffloadSingleNodeRules unconditionally called ColumnarAttachDistributedSequenceBaseExec.from(plan) for every backend, which triggers CHSparkPlanExecApi.genColumnarAttachDistributedSequenceExec to throw GlutenNotSupportException on the ClickHouse backend during the inherited SPARK-36338 test. Add a backend-level flag supportColumnarAttachDistributedSequenceExec (default false in BackendSettingsApi, true for Velox). Only attempt the offload when the active backend opts in. CH plans now stay as vanilla AttachDistributedSequenceExec and execute via Spark's row-based path (C2R/R2C), restoring the previous CH behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
5350118 to
d0eff70
Compare
|
Run Gluten Clickhouse CI on x86 |
philo-he
left a comment
There was a problem hiding this comment.
Looks good overall. Could you check whether any of Spark's own related test suites are enabled in Gluten? Thanks.
| * materialized during execution. Called from [[cleanupResources]] after children have been | ||
| * cleaned up. The default implementation is a no-op. | ||
| */ | ||
| protected def doColumnarCleanup(): Unit = {} |
There was a problem hiding this comment.
Do we still need this? It seems to be useless.
| super.sparkConf | ||
| .set("spark.sql.shuffle.partitions", "3") | ||
| .set("spark.default.parallelism", "3") | ||
| .set(SQLConf.ANSI_ENABLED.key, "false") |
There was a problem hiding this comment.
Can we also set Spark master "local[3]" explicitly?
There was a problem hiding this comment.
It seems that gluten columnar shuffle is not set. Do we need to set it in the base class WholeStageTransformerSuite?
What changes were proposed in this pull request?
Closes #12187.
Adds a Velox implementation of Spark's
AttachDistributedSequenceExec(prepends a contiguousLongid column to child output). Used by pandas-on-Spark distributed-sequence index andDataFrame.zipWithIndex.How is this implemented?
Plan-level
ColumnarAttachDistributedSequenceBaseExecingluten-substrait/with factoryfrom(plan)delegating to the backend API.OffloadSingleNodeRules/Validators.genColumnarAttachDistributedSequenceExeconSparkPlanExecApi. Velox override returns the columnar impl; the CH override throwsGlutenNotSupportExceptionuntil that backend is ported.spark.gluten.sql.columnar.attachDistributedSequence(defaulttrue) lets users disable the offload.Velox runtime (
ColumnarAttachDistributedSequenceExec)For >1 partition:
ColumnarCachedBatchSerializer, persisted atMEMORY_AND_DISK_SER. The cache blob is Velox-native serialization (CachedColumnarBatch) — kryo-friendly and typically much more compact than unsafe-row SER.CachedColumnarBatch.numRowsof partitions[0, numPartitions - 1)— no native deserialization.convertCachedBatchToColumnarBatch→ Velox-native batch →ColumnarBatches.load(zero-copy Arrow C-Data ABI handoff) → prepend oneArrowWritableColumnVectorwith the id column.Single-partition queries skip caching entirely (
startOffset = 0).Memory hygiene
doColumnarCleanup()hook called fromcleanupResources(). The Velox impl uses it tounpersistthe cached RDD when the query finishes, so BlockManager does not hold the serialized batches beyond the operator's lifetime.synchronizedaccessor so repeateddoExecuteColumnar()calls share a singlepersist()handle.assignIdswraps the per-batch build in atry/catchthat closes the freshly-loaded heavy batch on failure, so a mid-build OOM (e.g. while allocating the id vector) cannot leak Arrow buffers.Known overhead: cache write/read crosses the heap boundary
Per batch, the cache path does two full data copies:
serializecopies the off-heap Velox batch into an on-heapArray[Byte](CachedColumnarBatch.bytes).deserializecopies the bytes back into a fresh off-heap Velox batch.This is the price for
zipWithIndex's two-pass semantics (count + assign) without re-executing the child plan. The Velox↔Arrow hand-offs elsewhere in the operator are zero-copy ABI transfers and not relevant to this cost.Alternative considered
We considered the row-mode pipeline
Velox → C2R → RDD[InternalRow] cached → R2C. For Gluten that costs a full C2R/R2C transition on every row, and the unsafe-row serialized cache is typically 2–5× larger than Velox-native serialization for wide / nested data. The columnar path keeps everything columnar and pays serialization only once.How was this patch tested?
New
VeloxAttachDistributedSequenceExecSuiteinbackends-velox.Does this PR introduce any user-facing change?
Yes — a new config:
spark.gluten.sql.columnar.attachDistributedSequence(defaulttrue).When enabled,
df.zipWithIndexand pandas-on-Sparkdistributed-sequenceindex materialize the id column columnarly on Velox instead of falling back.