From 37fd192dba3675266bf40a3180ba78aa6c9d24b3 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Fri, 29 May 2026 06:16:38 +0000 Subject: [PATCH 1/5] [GLUTEN-12187][VL] Port AttachDistributedSequenceExec to Velox backend Adds a Velox implementation of Spark's AttachDistributedSequenceExec that prepends a contiguous, globally increasing Long id column to its child output. Used by pandas-on-Spark distributed-sequence index and DataFrame.zipWithIndex. Plan-level - New abstract base ColumnarAttachDistributedSequenceBaseExec in gluten-substrait/, with factory from(plan) delegating to the backend API and a doColumnarCleanup() hook called from cleanupResources(). - New offload rule case + validator gate in OffloadSingleNodeRules / Validators. - New backend hook genColumnarAttachDistributedSequenceExec on SparkPlanExecApi. Velox override returns the columnar impl; CH override throws GlutenNotSupportException until that backend is ported. - Config spark.gluten.sql.columnar.attachDistributedSequence (default true) lets users disable the offload. Velox runtime - For >1 partition, materialize the child output once via Gluten's existing ColumnarCachedBatchSerializer, persisted at MEMORY_AND_DISK_SER. The cache blob is Velox-native serialization (CachedColumnarBatch), much more compact than unsafe-row SER for wide / nested data. - Count pass reads CachedColumnarBatch.numRows for partitions [0, numPartitions - 1) -- no native deserialization required. - Assign pass: convertCachedBatchToColumnarBatch -> ColumnarBatches.load (zero-copy Arrow C-Data ABI handoff) -> prepend one ArrowWritableColumnVector with the id column. - Single-partition queries skip caching entirely. Memory hygiene - doColumnarCleanup() unpersists the cached RDD when the query finishes so BlockManager does not hold the serialized batches beyond the operator's lifetime. - The persisted RDD is cached behind a synchronized accessor so repeated doExecuteColumnar() calls share a single persist() handle. - assignIds wraps the per-batch build in a try/catch that closes the freshly-loaded heavy batch on failure, preventing Arrow buffer leaks on mid-build OOM. Tests - New VeloxAttachDistributedSequenceExecSuite in backends-velox. Closes #12187 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../clickhouse/CHSparkPlanExecApi.scala | 6 + .../velox/VeloxSparkPlanExecApi.scala | 6 +- ...olumnarAttachDistributedSequenceExec.scala | 241 ++++++++++++++++++ ...oxAttachDistributedSequenceExecSuite.scala | 112 ++++++++ .../gluten/backendsapi/SparkPlanExecApi.scala | 4 + .../apache/gluten/config/GlutenConfig.scala | 11 + .../offload/OffloadSingleNodeRules.scala | 4 +- .../columnar/validator/Validators.scala | 4 + ...narAttachDistributedSequenceBaseExec.scala | 85 ++++++ 9 files changed, 471 insertions(+), 2 deletions(-) create mode 100644 backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala create mode 100644 backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala create mode 100644 gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarAttachDistributedSequenceBaseExec.scala diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 832932f0a49..513c41c93e5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -979,6 +979,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { override def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec = CHRangeExecTransformer(rangeExec.range) + override def genColumnarAttachDistributedSequenceExec( + plan: org.apache.spark.sql.execution.python.AttachDistributedSequenceExec) + : ColumnarAttachDistributedSequenceBaseExec = + throw new GlutenNotSupportException( + "AttachDistributedSequenceExec is not supported in ClickHouse backend yet.") + override def expressionFlattenSupported(expr: Expression): Boolean = expr match { case ca: FlattenedAnd => CHFlattenedExpression.supported(ca.name) case co: FlattenedOr => CHFlattenedExpression.supported(co.name) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 5f1003689dd..13d29994704 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode, SparkHashJoinUtils} import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.execution.python.ArrowEvalPythonExec +import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, AttachDistributedSequenceExec} import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation import org.apache.spark.sql.execution.utils.ExecUtil import org.apache.spark.sql.expression.{UDFExpression, UserDefinedAggregateFunction} @@ -1205,6 +1205,10 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { override def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec = ColumnarRangeExec(rangeExec.range) + override def genColumnarAttachDistributedSequenceExec( + plan: AttachDistributedSequenceExec): ColumnarAttachDistributedSequenceBaseExec = + ColumnarAttachDistributedSequenceExec(plan.sequenceAttr, plan.child) + override def genColumnarTailExec(limit: Int, child: SparkPlan): ColumnarCollectTailBaseExec = ColumnarCollectTailExec(limit, child) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala new file mode 100644 index 00000000000..8bc64b28c2c --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowJavaBatchType +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq} +import org.apache.gluten.iterator.Iterators +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators +import org.apache.gluten.vectorized.ArrowWritableColumnVector + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.columnar.CachedBatch +import org.apache.spark.sql.execution.{ColumnarAttachDistributedSequenceBaseExec, ColumnarCachedBatchSerializer, SparkPlan} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.storage.StorageLevel + +/** + * Velox implementation of [[ColumnarAttachDistributedSequenceBaseExec]] that prepends a contiguous, + * globally increasing `Long` id column to its child output while keeping the columnar pipeline + * intact. + * + * Mirrors Spark's `AttachDistributedSequenceExec` semantics: + * 1. The child columnar output is materialized once into a cached `RDD[CachedBatch]` using + * Gluten's [[ColumnarCachedBatchSerializer]] (Velox native serialization, persisted at + * `MEMORY_AND_DISK_SER`). This prevents the inherent two-pass nature of `zipWithIndex` from + * re-running the child plan twice. + * 2. A first pass over the cached partitions `[0, numPartitions - 1)` reads only the `numRows` + * field of each `CachedColumnarBatch` -- no native deserialization is required to count. + * 3. The per-partition prefix-sum is broadcast and a second pass deserializes the cached batches + * back to columnar form, ensures they are Arrow-loaded, and prepends the new id column by + * retaining the input column vectors (zero-copy) and allocating one + * [[ArrowWritableColumnVector]] for the id column. + * + * For the trivial single-partition case the cache step is skipped and the assignment is done + * directly over the child output with `startOffset = 0`. + */ +case class ColumnarAttachDistributedSequenceExec( + sequenceAttr: Attribute, + override val child: SparkPlan) + extends ColumnarAttachDistributedSequenceBaseExec(sequenceAttr, child) { + + override def batchType(): Convention.BatchType = ArrowJavaBatchType + + override def requiredChildConvention(): Seq[ConventionReq] = Seq( + ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatchType))) + + private val idSchema: StructType = + StructType(Seq(StructField(sequenceAttr.name, LongType, nullable = false))) + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val childRdd = child.executeColumnar() + val numPartitions = childRdd.getNumPartitions + + if (numPartitions <= 1) { + // Fast path: at most one partition, no need to count or cache. + return childRdd.mapPartitions(it => assignIds(it, startOffset = 0L, alreadyLoaded = true)) + } + + val sqlConf = SQLConf.get + val cacheAttrs = child.output + val cached = getOrBuildCache(childRdd, cacheAttrs, sqlConf) + + // First pass: count rows per partition for partitions [0, numPartitions - 1) -- the last + // partition's count is unused. Reading `CachedColumnarBatch.numRows` is a case-class field + // access; no native deserialization is required. + val frontCounts: Array[Long] = sparkContext.runJob( + cached, + (it: Iterator[CachedBatch]) => { + var sum = 0L + while (it.hasNext) { + sum += it.next().numRows.toLong + } + sum + }, + 0 until (numPartitions - 1) + ) + val offsets = frontCounts.scanLeft(0L)(_ + _) + val bcOffsets = sparkContext.broadcast(offsets) + + // Second pass: deserialize cached batches back to Velox-native ColumnarBatches and prepend + // the id column. `convertCachedBatchToColumnarBatch` already wraps its output with + // Iterators.recyclePayload, so closing happens correctly. + val rehydrated = new ColumnarCachedBatchSerializer() + .convertCachedBatchToColumnarBatch(cached, cacheAttrs, cacheAttrs, sqlConf) + + rehydrated.mapPartitionsWithIndex { + (pid, it) => assignIds(it, bcOffsets.value(pid), alreadyLoaded = false) + } + } + + // Idempotent cache materialization. doExecuteColumnar() may be called more than once during + // planning / fallback handling; we want to persist only once and have a single handle to + // unpersist in cleanupResources(). + @transient @volatile private var cachedRdd: RDD[CachedBatch] = _ + + private def getOrBuildCache( + childRdd: RDD[ColumnarBatch], + cacheAttrs: Seq[Attribute], + sqlConf: SQLConf): RDD[CachedBatch] = { + val existing = cachedRdd + if (existing != null) { + return existing + } + synchronized { + if (cachedRdd == null) { + // Serialize child batches into on-heap Velox-native byte blobs (CachedColumnarBatch) and + // persist them so that the count pass and the assign pass each read from the cache + // instead of recomputing the child plan. + cachedRdd = new ColumnarCachedBatchSerializer() + .convertColumnarBatchToCachedBatch( + childRdd, + cacheAttrs, + StorageLevel.MEMORY_AND_DISK_SER, + sqlConf) + .persist(StorageLevel.MEMORY_AND_DISK_SER) + } + cachedRdd + } + } + + override protected def doColumnarCleanup(): Unit = { + val toRelease = synchronized { + val r = cachedRdd + cachedRdd = null + r + } + if (toRelease != null) { + try { + toRelease.unpersist(blocking = false) + } catch { + case _: Throwable => // best-effort; do not propagate from cleanup + } + } + } + + /** + * Prepends a `Long` id column to each input batch starting from `startOffset` and incrementing by + * row index. If `alreadyLoaded` is false the input batch is brought into Arrow-loaded (heavy) + * form first via [[ColumnarBatches#ensureLoaded]] -- this is a no-op when the batch is already + * heavy. + */ + private def assignIds( + batches: Iterator[ColumnarBatch], + startOffset: Long, + alreadyLoaded: Boolean): Iterator[ColumnarBatch] = { + val attached = new Iterator[ColumnarBatch] { + private var running: Long = startOffset + + override def hasNext: Boolean = batches.hasNext + + override def next(): ColumnarBatch = { + val rawCb = batches.next() + val inputCb = + if (alreadyLoaded) { + ColumnarBatches.checkLoaded(rawCb) + rawCb + } else { + // After `convertCachedBatchToColumnarBatch` the batch is Velox-native (light); load + // it into Arrow-Java (heavy) form via a zero-copy ABI handoff so the id column + // (an ArrowWritableColumnVector) can sit next to the input columns in the output + // batch. `load` closes the light input on success. + ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), rawCb) + } + try { + val numRows = inputCb.numRows() + + val idVec = ArrowWritableColumnVector + .allocateColumns(numRows, idSchema) + .head + try { + var i = 0 + while (i < numRows) { + idVec.putLong(i, running + i) + i += 1 + } + idVec.setValueCount(numRows) + + // Retain input columns once so that closing both the input batch (by upstream) and + // the output batch (by the wrapping iterator) leaves the underlying Arrow buffers' + // ref-counts at zero. + ColumnarBatches.retain(inputCb) + + val outCols = new Array[ColumnVector](inputCb.numCols() + 1) + outCols(0) = idVec + var j = 0 + while (j < inputCb.numCols()) { + outCols(j + 1) = inputCb.column(j) + j += 1 + } + running += numRows + new ColumnarBatch(outCols, numRows) + } catch { + case t: Throwable => + idVec.close() + throw t + } + } catch { + case t: Throwable => + // If we loaded the input ourselves and failed before constructing the output + // batch (which would otherwise own the retain), release it here so the underlying + // Arrow buffers are freed. When alreadyLoaded == true the upstream iterator still + // owns rawCb, so we must not close it here. + if (!alreadyLoaded) { + try { + inputCb.close() + } catch { + case _: Throwable => // swallow secondary failure + } + } + throw t + } + } + } + Iterators + .wrap(attached) + .recyclePayload(_.close()) + .create() + } + + override protected def withNewChildInternal( + newChild: SparkPlan): ColumnarAttachDistributedSequenceExec = + copy(child = newChild) +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala new file mode 100644 index 00000000000..f7eaac2732c --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.SparkConf +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.AttachDistributedSequence +import org.apache.spark.sql.classic.ClassicDataset +import org.apache.spark.sql.execution.python.AttachDistributedSequenceExec +import org.apache.spark.sql.types.LongType + +class VeloxAttachDistributedSequenceExecSuite extends VeloxWholeStageTransformerSuite { + + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + override def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.shuffle.partitions", "3") + .set("spark.default.parallelism", "3") + } + + /** + * Build a DataFrame that prepends a distributed-sequence id column using a directly constructed + * [[AttachDistributedSequence]] logical node. This avoids depending on pandas-on-Spark / PySpark + * in JVM tests. + */ + private def attachSequence(df: DataFrame, name: String = "id"): DataFrame = { + val attr = AttributeReference(name, LongType, nullable = false)() + ClassicDataset.ofRows(spark, AttachDistributedSequence(attr, df.queryExecution.analyzed)) + } + + test("contiguous ids for a single partition") { + val df = attachSequence(spark.range(0, 7, 1, 1).toDF("v")) + val ids = df.select("id").collect().map(_.getLong(0)).toSeq + assert(ids == Seq(0L, 1L, 2L, 3L, 4L, 5L, 6L)) + } + + test("contiguous ids across multiple partitions of equal size") { + val df = attachSequence(spark.range(0, 12, 1, 4).toDF("v")) + val ids = df.select("id").collect().map(_.getLong(0)).toSeq.sorted + assert(ids == (0L until 12L)) + // Check the offload happened. + val plan = df.queryExecution.executedPlan + val matched = plan.collectFirst { + case e: ColumnarAttachDistributedSequenceExec => e + } + assert(matched.isDefined, s"Expected ColumnarAttachDistributedSequenceExec in:\n$plan") + } + + test("contiguous ids across multiple partitions of unequal size") { + val base = spark.range(0, 100, 1, 8).toDF("v").filter("v % 3 = 0") + val df = attachSequence(base) + val rows = df.collect() + val ids = rows.map(_.getAs[Long]("id")).toSeq.sorted + assert(ids == (0L until rows.length)) + } + + test("empty input produces empty output") { + val df = attachSequence(spark.range(0, 0, 1, 4).toDF("v")) + assert(df.collect().isEmpty) + } + + test("id is paired with the correct row payload") { + val df = attachSequence(spark.range(0, 5, 1, 1).toDF("v")) + val rows = df.select("id", "v").collect().map(r => (r.getLong(0), r.getLong(1))).toSeq + assert(rows == Seq((0L, 0L), (1L, 1L), (2L, 2L), (3L, 3L), (4L, 4L))) + } + + test("falls back to vanilla exec when columnar attach-distributed-sequence is disabled") { + withSQLConf( + "spark.gluten.sql.columnar.attachDistributedSequence" -> "false" + ) { + val df = attachSequence(spark.range(0, 4, 1, 2).toDF("v")) + val plan = df.queryExecution.executedPlan + assert( + plan.find(_.isInstanceOf[ColumnarAttachDistributedSequenceExec]).isEmpty, + s"Expected no ColumnarAttachDistributedSequenceExec in:\n$plan") + val ids = df.select("id").collect().map(_.getLong(0)).toSeq.sorted + assert(ids == Seq(0L, 1L, 2L, 3L)) + } + } + + test("GlutenConfig getter returns default true") { + assert(GlutenConfig.get.enableColumnarAttachDistributedSequence) + } + + test("vanilla exec construction does not break offload pattern") { + // Sanity: confirm vanilla exec class is available and constructible (used in offload). + val attr = AttributeReference("id", LongType, nullable = false)() + val child = spark.range(0, 1, 1, 1).queryExecution.executedPlan + val vanilla = AttachDistributedSequenceExec(attr, child) + assert(vanilla.output.head.name == "id") + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 84e2d865541..4783c95af99 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.python.ArrowEvalPythonExec +import org.apache.spark.sql.execution.python.AttachDistributedSequenceExec import org.apache.spark.sql.execution.window._ import org.apache.spark.sql.hive.HiveUDFTransformer import org.apache.spark.sql.types.{DecimalType, LongType, NullType, StructType} @@ -763,6 +764,9 @@ trait SparkPlanExecApi { def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec + def genColumnarAttachDistributedSequenceExec( + plan: AttachDistributedSequenceExec): ColumnarAttachDistributedSequenceBaseExec + def genColumnarTailExec(limit: Int, plan: SparkPlan): ColumnarCollectTailBaseExec def genColumnarToCarrierRow(plan: SparkPlan): SparkPlan diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 821d7472287..4f39677c99d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -385,6 +385,8 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { } def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED) + def enableColumnarAttachDistributedSequence: Boolean = + getConf(COLUMNAR_ATTACH_DISTRIBUTED_SEQUENCE_ENABLED) def enableColumnarCollectLimit: Boolean = getConf(COLUMNAR_COLLECT_LIMIT_ENABLED) def enableColumnarCollectTail: Boolean = getConf(COLUMNAR_COLLECT_TAIL_ENABLED) def getSupportedFlattenedExpressions: String = getConf(GLUTEN_SUPPORTED_FLATTENED_FUNCTIONS) @@ -1606,6 +1608,15 @@ object GlutenConfig extends ConfigRegistry { .booleanConf .createWithDefault(true) + val COLUMNAR_ATTACH_DISTRIBUTED_SEQUENCE_ENABLED = + buildConf("spark.gluten.sql.columnar.attachDistributedSequence") + .doc( + "Enable or disable columnar AttachDistributedSequenceExec, which prepends a " + + "contiguous distributed-sequence id column used by pandas-on-Spark's default " + + "index and DataFrame.zipWithIndex.") + .booleanConf + .createWithDefault(true) + val COLUMNAR_COLLECT_LIMIT_ENABLED = buildConf("spark.gluten.sql.columnar.collectLimit") .doc("Enable or disable columnar collectLimit.") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 3d844607b39..e80eb7f0574 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ -import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, BatchEvalPythonExec, EvalPythonExecTransformer} +import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, AttachDistributedSequenceExec, BatchEvalPythonExec, EvalPythonExecTransformer} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer @@ -305,6 +305,8 @@ object OffloadOthers { } case plan: RangeExec => ColumnarRangeBaseExec.from(plan) + case plan: AttachDistributedSequenceExec => + ColumnarAttachDistributedSequenceBaseExec.from(plan) case plan: SampleExec => val child = plan.child BackendsApiManager.getSparkPlanExecApiInstance.genSampleExecTransformer( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index faa0d9e2e54..729a6306d84 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, BatchScanExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.execution.python.AttachDistributedSequenceExec import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} @@ -216,6 +217,9 @@ object Validators { .supportSampleExec()) => fail(p) case p: RangeExec if !glutenConf.enableColumnarRange => fail(p) + case p: AttachDistributedSequenceExec + if !glutenConf.enableColumnarAttachDistributedSequence => + fail(p) case p: CollectLimitExec if !glutenConf.enableColumnarCollectLimit => fail(p) case p: CollectTailExec if !glutenConf.enableColumnarCollectTail => fail(p) case _ => pass() diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarAttachDistributedSequenceBaseExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarAttachDistributedSequenceBaseExec.scala new file mode 100644 index 00000000000..3b5332f883e --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarAttachDistributedSequenceBaseExec.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.ValidatablePlan +import org.apache.gluten.extension.columnar.transition.Convention + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.execution.python.AttachDistributedSequenceExec + +/** + * Base class for [[AttachDistributedSequenceExec]] transformation that can be implemented by + * supported backends. The exec prepends a contiguous, globally increasing `Long` id column to the + * child output. Used by pandas-on-Spark distributed-sequence default index and + * `DataFrame.zipWithIndex`. + */ +abstract class ColumnarAttachDistributedSequenceBaseExec( + sequenceAttr: Attribute, + override val child: SparkPlan) + extends UnaryExecNode + with ValidatablePlan { + + override def producedAttributes: AttributeSet = AttributeSet(sequenceAttr) + + override val output: Seq[Attribute] = sequenceAttr +: child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def rowType(): Convention.RowType = Convention.RowType.None + + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") + } + + override def simpleString(maxFields: Int): String = { + val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields) + val indexColumn = s"Index: $sequenceAttr" + s"$nodeName$truncatedOutputString $indexColumn" + } + + /** + * Hook for backend implementations to release any resources (e.g. cached RDDs) that were + * materialized during execution. Called from [[cleanupResources]] after children have been + * cleaned up. The default implementation is a no-op. + */ + protected def doColumnarCleanup(): Unit = {} + + override protected[sql] def cleanupResources(): Unit = { + try { + doColumnarCleanup() + } finally { + super.cleanupResources() + } + } +} + +/** + * Companion object for ColumnarAttachDistributedSequenceBaseExec, provides factory methods to + * create instance from existing AttachDistributedSequenceExec plan. + */ +object ColumnarAttachDistributedSequenceBaseExec { + def from(plan: AttachDistributedSequenceExec): ColumnarAttachDistributedSequenceBaseExec = { + BackendsApiManager.getSparkPlanExecApiInstance + .genColumnarAttachDistributedSequenceExec(plan) + } +} From 5cf4d9ac08edabe84143a43358100f1532096050 Mon Sep 17 00:00:00 2001 From: baibaichen Date: Fri, 29 May 2026 09:33:59 +0000 Subject: [PATCH 2/5] [GLUTEN-12187][VL] Regenerate Configuration.md for new attachDistributedSequence config --- docs/Configuration.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/Configuration.md b/docs/Configuration.md index 748a200cfbf..080b98e3025 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -47,6 +47,7 @@ nav_order: 15 | spark.gluten.sql.collapseGetJsonObject.enabled | 🔄 Dynamic | false | Collapse nested get_json_object functions as one for optimization. | | spark.gluten.sql.columnar.appendData | 🔄 Dynamic | true | Enable or disable columnar v2 command append data. | | spark.gluten.sql.columnar.arrowUdf | 🔄 Dynamic | true | Enable or disable columnar arrow udf. | +| spark.gluten.sql.columnar.attachDistributedSequence | 🔄 Dynamic | true | Enable or disable columnar AttachDistributedSequenceExec, which prepends a contiguous distributed-sequence id column used by pandas-on-Spark's default index and DataFrame.zipWithIndex. | | spark.gluten.sql.columnar.batchscan | 🔄 Dynamic | true | Enable or disable columnar batchscan. | | spark.gluten.sql.columnar.broadcastExchange | 🔄 Dynamic | true | Enable or disable columnar broadcastExchange. | | spark.gluten.sql.columnar.broadcastJoin | 🔄 Dynamic | true | Enable or disable columnar broadcastJoin. | From 9e91b6e95ef324deb97d07ebaef24159e81e0046 Mon Sep 17 00:00:00 2001 From: baibaichen Date: Fri, 29 May 2026 11:42:55 +0000 Subject: [PATCH 3/5] [GLUTEN-12187][VL] Drop columnar cache: use two-pass child execution The original implementation persisted the child's columnar output via ColumnarCachedBatchSerializer to avoid re-executing the child plan twice. That path fails on zero-column batches that can result from column pruning (e.g. df.select("id") prunes every input column away): ensureVeloxBatch -> isVeloxBatch -> getIndicatorVector throws because the batch is neither LIGHT nor HEAVY. Switch to the simpler vanilla-Spark-style approach (matches the pandas-on-Spark cache="NONE" option): run the child once to count rows per partition for [0, numPartitions - 1), then run it again to attach the id column. One extra child execution; full robustness across arbitrary projections. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- ...olumnarAttachDistributedSequenceExec.scala | 211 ++++++------------ 1 file changed, 66 insertions(+), 145 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala index 8bc64b28c2c..770d42a1b33 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala @@ -20,37 +20,39 @@ import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowJavaBatchType import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq} import org.apache.gluten.iterator.Iterators -import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.vectorized.ArrowWritableColumnVector import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.columnar.CachedBatch -import org.apache.spark.sql.execution.{ColumnarAttachDistributedSequenceBaseExec, ColumnarCachedBatchSerializer, SparkPlan} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.{ColumnarAttachDistributedSequenceBaseExec, SparkPlan} import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} -import org.apache.spark.storage.StorageLevel /** * Velox implementation of [[ColumnarAttachDistributedSequenceBaseExec]] that prepends a contiguous, * globally increasing `Long` id column to its child output while keeping the columnar pipeline * intact. * - * Mirrors Spark's `AttachDistributedSequenceExec` semantics: - * 1. The child columnar output is materialized once into a cached `RDD[CachedBatch]` using - * Gluten's [[ColumnarCachedBatchSerializer]] (Velox native serialization, persisted at - * `MEMORY_AND_DISK_SER`). This prevents the inherent two-pass nature of `zipWithIndex` from - * re-running the child plan twice. - * 2. A first pass over the cached partitions `[0, numPartitions - 1)` reads only the `numRows` - * field of each `CachedColumnarBatch` -- no native deserialization is required to count. - * 3. The per-partition prefix-sum is broadcast and a second pass deserializes the cached batches - * back to columnar form, ensures they are Arrow-loaded, and prepends the new id column by - * retaining the input column vectors (zero-copy) and allocating one - * [[ArrowWritableColumnVector]] for the id column. + * Mirrors Spark's `AttachDistributedSequenceExec` semantics with two passes over the child: + * 1. A first pass executes the child plan over partitions `[0, numPartitions - 1)` and sums the + * `numRows` of every produced batch -- the last partition's count is not needed for the + * prefix-sum. The batches are closed immediately; no native data is materialized for the count + * pass beyond what the child operator naturally produces. + * 2. The per-partition prefix-sum is broadcast and a second pass executes the child plan again, + * prepending the new id column by retaining the input column vectors (zero-copy) and + * allocating one [[ArrowWritableColumnVector]] for the id column. * - * For the trivial single-partition case the cache step is skipped and the assignment is done - * directly over the child output with `startOffset = 0`. + * Why no cache? The natural choice would be to wrap the child output in + * [[org.apache.spark.sql.execution.ColumnarCachedBatchSerializer]] and `persist` once, so the child + * plan is computed only once. That works for ordinary columnar batches but fails for zero-column + * batches that can result from column pruning when only the new id column is selected + * (`df.select("id")` projects away every input column): the cache serializer's + * `ensureVeloxBatch -> isVeloxBatch -> getIndicatorVector` path throws on zero-column input. The + * two-pass approach trades one extra child execution for robustness across all valid plans, and + * matches vanilla Spark's behavior when the pandas-on-Spark cache option is `NONE`. + * + * For the trivial single-partition case the count pass is skipped and the assignment runs directly + * with `startOffset = 0`. */ case class ColumnarAttachDistributedSequenceExec( sequenceAttr: Attribute, @@ -70,23 +72,21 @@ case class ColumnarAttachDistributedSequenceExec( val numPartitions = childRdd.getNumPartitions if (numPartitions <= 1) { - // Fast path: at most one partition, no need to count or cache. - return childRdd.mapPartitions(it => assignIds(it, startOffset = 0L, alreadyLoaded = true)) + // Fast path: at most one partition, no need to count. + return childRdd.mapPartitions(it => assignIds(it, startOffset = 0L)) } - val sqlConf = SQLConf.get - val cacheAttrs = child.output - val cached = getOrBuildCache(childRdd, cacheAttrs, sqlConf) - - // First pass: count rows per partition for partitions [0, numPartitions - 1) -- the last - // partition's count is unused. Reading `CachedColumnarBatch.numRows` is a case-class field - // access; no native deserialization is required. + // First pass: execute the child plan and count rows per partition for partitions + // [0, numPartitions - 1). The last partition's count is unused for the prefix-sum. + // Each batch is closed immediately after reading numRows so off-heap buffers are released. val frontCounts: Array[Long] = sparkContext.runJob( - cached, - (it: Iterator[CachedBatch]) => { + childRdd, + (it: Iterator[ColumnarBatch]) => { var sum = 0L while (it.hasNext) { - sum += it.next().numRows.toLong + val cb = it.next() + sum += cb.numRows().toLong + cb.close() } sum }, @@ -95,136 +95,61 @@ case class ColumnarAttachDistributedSequenceExec( val offsets = frontCounts.scanLeft(0L)(_ + _) val bcOffsets = sparkContext.broadcast(offsets) - // Second pass: deserialize cached batches back to Velox-native ColumnarBatches and prepend - // the id column. `convertCachedBatchToColumnarBatch` already wraps its output with - // Iterators.recyclePayload, so closing happens correctly. - val rehydrated = new ColumnarCachedBatchSerializer() - .convertCachedBatchToColumnarBatch(cached, cacheAttrs, cacheAttrs, sqlConf) - - rehydrated.mapPartitionsWithIndex { - (pid, it) => assignIds(it, bcOffsets.value(pid), alreadyLoaded = false) + // Second pass: re-execute the child plan and prepend the id column. + childRdd.mapPartitionsWithIndex { + (pid, it) => assignIds(it, bcOffsets.value(pid)) } } - // Idempotent cache materialization. doExecuteColumnar() may be called more than once during - // planning / fallback handling; we want to persist only once and have a single handle to - // unpersist in cleanupResources(). - @transient @volatile private var cachedRdd: RDD[CachedBatch] = _ - - private def getOrBuildCache( - childRdd: RDD[ColumnarBatch], - cacheAttrs: Seq[Attribute], - sqlConf: SQLConf): RDD[CachedBatch] = { - val existing = cachedRdd - if (existing != null) { - return existing - } - synchronized { - if (cachedRdd == null) { - // Serialize child batches into on-heap Velox-native byte blobs (CachedColumnarBatch) and - // persist them so that the count pass and the assign pass each read from the cache - // instead of recomputing the child plan. - cachedRdd = new ColumnarCachedBatchSerializer() - .convertColumnarBatchToCachedBatch( - childRdd, - cacheAttrs, - StorageLevel.MEMORY_AND_DISK_SER, - sqlConf) - .persist(StorageLevel.MEMORY_AND_DISK_SER) - } - cachedRdd - } - } - - override protected def doColumnarCleanup(): Unit = { - val toRelease = synchronized { - val r = cachedRdd - cachedRdd = null - r - } - if (toRelease != null) { - try { - toRelease.unpersist(blocking = false) - } catch { - case _: Throwable => // best-effort; do not propagate from cleanup - } - } - } + override protected def withNewChildInternal( + newChild: SparkPlan): ColumnarAttachDistributedSequenceExec = + copy(child = newChild) /** * Prepends a `Long` id column to each input batch starting from `startOffset` and incrementing by - * row index. If `alreadyLoaded` is false the input batch is brought into Arrow-loaded (heavy) - * form first via [[ColumnarBatches#ensureLoaded]] -- this is a no-op when the batch is already - * heavy. + * row index. The input batches are expected to be Arrow-loaded (heavy) because our + * `requiredChildConvention` requests `ArrowJavaBatchType`. */ private def assignIds( batches: Iterator[ColumnarBatch], - startOffset: Long, - alreadyLoaded: Boolean): Iterator[ColumnarBatch] = { + startOffset: Long): Iterator[ColumnarBatch] = { val attached = new Iterator[ColumnarBatch] { private var running: Long = startOffset override def hasNext: Boolean = batches.hasNext override def next(): ColumnarBatch = { - val rawCb = batches.next() - val inputCb = - if (alreadyLoaded) { - ColumnarBatches.checkLoaded(rawCb) - rawCb - } else { - // After `convertCachedBatchToColumnarBatch` the batch is Velox-native (light); load - // it into Arrow-Java (heavy) form via a zero-copy ABI handoff so the id column - // (an ArrowWritableColumnVector) can sit next to the input columns in the output - // batch. `load` closes the light input on success. - ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), rawCb) - } + val inputCb = batches.next() + ColumnarBatches.checkLoaded(inputCb) + val numRows = inputCb.numRows() + val idVec = ArrowWritableColumnVector + .allocateColumns(numRows, idSchema) + .head try { - val numRows = inputCb.numRows() - - val idVec = ArrowWritableColumnVector - .allocateColumns(numRows, idSchema) - .head - try { - var i = 0 - while (i < numRows) { - idVec.putLong(i, running + i) - i += 1 - } - idVec.setValueCount(numRows) - - // Retain input columns once so that closing both the input batch (by upstream) and - // the output batch (by the wrapping iterator) leaves the underlying Arrow buffers' - // ref-counts at zero. - ColumnarBatches.retain(inputCb) - - val outCols = new Array[ColumnVector](inputCb.numCols() + 1) - outCols(0) = idVec - var j = 0 - while (j < inputCb.numCols()) { - outCols(j + 1) = inputCb.column(j) - j += 1 - } - running += numRows - new ColumnarBatch(outCols, numRows) - } catch { - case t: Throwable => - idVec.close() - throw t + var i = 0 + while (i < numRows) { + idVec.putLong(i, running + i) + i += 1 } + idVec.setValueCount(numRows) + + // Retain input columns once so that closing both the input batch (by upstream) and + // the output batch (by the wrapping iterator) leaves the underlying Arrow buffers' + // ref-counts at zero. + ColumnarBatches.retain(inputCb) + + val outCols = new Array[ColumnVector](inputCb.numCols() + 1) + outCols(0) = idVec + var j = 0 + while (j < inputCb.numCols()) { + outCols(j + 1) = inputCb.column(j) + j += 1 + } + running += numRows + new ColumnarBatch(outCols, numRows) } catch { case t: Throwable => - // If we loaded the input ourselves and failed before constructing the output - // batch (which would otherwise own the retain), release it here so the underlying - // Arrow buffers are freed. When alreadyLoaded == true the upstream iterator still - // owns rawCb, so we must not close it here. - if (!alreadyLoaded) { - try { - inputCb.close() - } catch { - case _: Throwable => // swallow secondary failure - } - } + idVec.close() throw t } } @@ -234,8 +159,4 @@ case class ColumnarAttachDistributedSequenceExec( .recyclePayload(_.close()) .create() } - - override protected def withNewChildInternal( - newChild: SparkPlan): ColumnarAttachDistributedSequenceExec = - copy(child = newChild) } From c5045d8c1dc8fad61dda6d3236571e3cfaba7453 Mon Sep 17 00:00:00 2001 From: baibaichen Date: Fri, 29 May 2026 15:22:18 +0000 Subject: [PATCH 4/5] [GLUTEN-12187][VL] Copy input columns to keep uniform refCnt on output batch Round-3 (drop-cache) made our output batch flow through OffloadArrowDataExec when a downstream Velox consumer (e.g. shuffle after repartition) follows our op. That path calls ColumnarBatches.offload -> getRefCntHeavy, which asserts every column in the heavy batch shares the same reference count. Our previous output mixed a freshly allocated id column (refCnt=1) with retain'd input columns (refCnt=2), tripping the assertion -- which surfaced as the SPARK-36338 regression in the inherited GlutenDataFrameSuite. Allocate fresh ArrowWritableColumnVectors for every output column and copy input values per row via ValueVector.copyFromSafe. All output columns then have refCnt=1, the input batch is untouched, and the offload path works regardless of which transition the planner inserts. Test additions: - New 'output survives a downstream Velox shuffle (offload path)' test reproduces the bug locally (repartition after attach). - Set spark.sql.ansi.enabled=false in suite sparkConf so the columnar exec is actually selected under Spark 4.x where ANSI is default true (Gluten falls back the whole plan otherwise). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- ...olumnarAttachDistributedSequenceExec.scala | 52 ++++++++++++------- ...oxAttachDistributedSequenceExecSuite.scala | 15 ++++++ 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala index 770d42a1b33..b57c09b5f46 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala @@ -39,8 +39,11 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} * prefix-sum. The batches are closed immediately; no native data is materialized for the count * pass beyond what the child operator naturally produces. * 2. The per-partition prefix-sum is broadcast and a second pass executes the child plan again, - * prepending the new id column by retaining the input column vectors (zero-copy) and - * allocating one [[ArrowWritableColumnVector]] for the id column. + * prepending the new id column. Each output column (id + copies of the input columns) is a + * freshly allocated [[ArrowWritableColumnVector]] so the output batch has a uniform reference + * count -- required by the downstream `OffloadArrowDataExec`'s `getRefCntHeavy` check. Input + * values are copied via Arrow's `ValueVector.copyFromSafe`; the upstream input batch is left + * untouched and closed by the upstream iterator. * * Why no cache? The natural choice would be to wrap the child output in * [[org.apache.spark.sql.execution.ColumnarCachedBatchSerializer]] and `persist` once, so the child @@ -64,8 +67,10 @@ case class ColumnarAttachDistributedSequenceExec( override def requiredChildConvention(): Seq[ConventionReq] = Seq( ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatchType))) - private val idSchema: StructType = - StructType(Seq(StructField(sequenceAttr.name, LongType, nullable = false))) + private val outputSchema: StructType = + StructType( + StructField(sequenceAttr.name, LongType, nullable = false) +: + child.output.map(a => StructField(a.name, a.dataType, a.nullable))) override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { val childRdd = child.executeColumnar() @@ -107,8 +112,12 @@ case class ColumnarAttachDistributedSequenceExec( /** * Prepends a `Long` id column to each input batch starting from `startOffset` and incrementing by - * row index. The input batches are expected to be Arrow-loaded (heavy) because our - * `requiredChildConvention` requests `ArrowJavaBatchType`. + * row index. The output is a fresh heavy [[ColumnarBatch]] whose columns are all freshly + * allocated [[ArrowWritableColumnVector]]s with reference count 1. Input column values are copied + * via Arrow's `ValueVector.copyFromSafe` rather than retained zero-copy so the resulting batch + * satisfies the uniform-reference-count invariant required by downstream + * `ColumnarBatches.offload` / `getRefCntHeavy`. The original input batch is left untouched and is + * closed by the upstream iterator's recycling logic. */ private def assignIds( batches: Iterator[ColumnarBatch], @@ -122,10 +131,9 @@ case class ColumnarAttachDistributedSequenceExec( val inputCb = batches.next() ColumnarBatches.checkLoaded(inputCb) val numRows = inputCb.numRows() - val idVec = ArrowWritableColumnVector - .allocateColumns(numRows, idSchema) - .head + val outCols = ArrowWritableColumnVector.allocateColumns(numRows, outputSchema) try { + val idVec = outCols(0) var i = 0 while (i < numRows) { idVec.putLong(i, running + i) @@ -133,23 +141,29 @@ case class ColumnarAttachDistributedSequenceExec( } idVec.setValueCount(numRows) - // Retain input columns once so that closing both the input batch (by upstream) and - // the output batch (by the wrapping iterator) leaves the underlying Arrow buffers' - // ref-counts at zero. - ColumnarBatches.retain(inputCb) - - val outCols = new Array[ColumnVector](inputCb.numCols() + 1) - outCols(0) = idVec + // Copy each input column into its corresponding freshly-allocated output column. Using + // Arrow's per-row `copyFromSafe` keeps the implementation type-agnostic and ensures every + // output column has reference count 1, matching the id column. This avoids the uniform + // ref-count check enforced by `ColumnarBatches.getRefCntHeavy` when the planner inserts + // an `OffloadArrowDataExec` between us and the next Velox consumer. var j = 0 while (j < inputCb.numCols()) { - outCols(j + 1) = inputCb.column(j) + val src = inputCb.column(j).asInstanceOf[ArrowWritableColumnVector].getValueVector + val dst = outCols(j + 1).getValueVector + var r = 0 + while (r < numRows) { + dst.copyFromSafe(r, r, src) + r += 1 + } + dst.setValueCount(numRows) j += 1 } + running += numRows - new ColumnarBatch(outCols, numRows) + new ColumnarBatch(outCols.asInstanceOf[Array[ColumnVector]], numRows) } catch { case t: Throwable => - idVec.close() + outCols.foreach(_.close()) throw t } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala index f7eaac2732c..fb326ba4d6d 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.AttachDistributedSequence import org.apache.spark.sql.classic.ClassicDataset import org.apache.spark.sql.execution.python.AttachDistributedSequenceExec +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType class VeloxAttachDistributedSequenceExecSuite extends VeloxWholeStageTransformerSuite { @@ -35,6 +36,7 @@ class VeloxAttachDistributedSequenceExecSuite extends VeloxWholeStageTransformer super.sparkConf .set("spark.sql.shuffle.partitions", "3") .set("spark.default.parallelism", "3") + .set(SQLConf.ANSI_ENABLED.key, "false") } /** @@ -84,6 +86,19 @@ class VeloxAttachDistributedSequenceExecSuite extends VeloxWholeStageTransformer assert(rows == Seq((0L, 0L), (1L, 1L), (2L, 2L), (3L, 3L), (4L, 4L))) } + test("output survives a downstream Velox shuffle (offload path)") { + // Repartition after attach forces ArrowJava -> ArrowNative -> VeloxBatch via + // OffloadArrowDataExec, which calls ColumnarBatches.getRefCntHeavy and + // requires the uniform-refCnt invariant on the output batch. This mirrors + // the vanilla SPARK-36338 inherited test that exposed the bug in CI. + val df = attachSequence(spark.range(0, 20, 1, 4).toDF("v")).repartition(3) + val rows = df.select("id", "v").collect().map(r => (r.getLong(0), r.getLong(1))).toSeq + val ids = rows.map(_._1).sorted + val vs = rows.map(_._2).sorted + assert(ids == (0L until 20L)) + assert(vs == (0L until 20L)) + } + test("falls back to vanilla exec when columnar attach-distributed-sequence is disabled") { withSQLConf( "spark.gluten.sql.columnar.attachDistributedSequence" -> "false" From d0eff7003b74c254fa7e9090ed65477bb8332af1 Mon Sep 17 00:00:00 2001 From: baibaichen Date: Sat, 30 May 2026 02:17:28 +0000 Subject: [PATCH 5/5] [GLUTEN-12187][VL] Gate AttachDistributedSequence offload on backend support The new case in OffloadSingleNodeRules unconditionally called ColumnarAttachDistributedSequenceBaseExec.from(plan) for every backend, which triggers CHSparkPlanExecApi.genColumnarAttachDistributedSequenceExec to throw GlutenNotSupportException on the ClickHouse backend during the inherited SPARK-36338 test. Add a backend-level flag supportColumnarAttachDistributedSequenceExec (default false in BackendSettingsApi, true for Velox). Only attempt the offload when the active backend opts in. CH plans now stay as vanilla AttachDistributedSequenceExec and execute via Spark's row-based path (C2R/R2C), restoring the previous CH behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../org/apache/gluten/backendsapi/velox/VeloxBackend.scala | 2 ++ .../org/apache/gluten/backendsapi/BackendSettingsApi.scala | 2 ++ .../extension/columnar/offload/OffloadSingleNodeRules.scala | 3 ++- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 599b91851ea..a3e53d2bf85 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -562,6 +562,8 @@ object VeloxBackendSettings extends BackendSettingsApi { override def supportColumnarArrowUdf(): Boolean = true + override def supportColumnarAttachDistributedSequenceExec(): Boolean = true + override def needPreComputeRangeFrameBoundary(): Boolean = true override def supportIcebergEqualityDeleteRead(): Boolean = false diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 7f5d5a9c214..807d01f80b1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -149,6 +149,8 @@ trait BackendSettingsApi { def supportColumnarArrowUdf(): Boolean = false + def supportColumnarAttachDistributedSequenceExec(): Boolean = false + def needPreComputeRangeFrameBoundary(): Boolean = false def supportIcebergEqualityDeleteRead(): Boolean = true diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index e80eb7f0574..e7790daabf4 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -305,7 +305,8 @@ object OffloadOthers { } case plan: RangeExec => ColumnarRangeBaseExec.from(plan) - case plan: AttachDistributedSequenceExec => + case plan: AttachDistributedSequenceExec + if BackendsApiManager.getSettings.supportColumnarAttachDistributedSequenceExec() => ColumnarAttachDistributedSequenceBaseExec.from(plan) case plan: SampleExec => val child = plan.child