diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 832932f0a49..513c41c93e5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -979,6 +979,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { override def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec = CHRangeExecTransformer(rangeExec.range) + override def genColumnarAttachDistributedSequenceExec( + plan: org.apache.spark.sql.execution.python.AttachDistributedSequenceExec) + : ColumnarAttachDistributedSequenceBaseExec = + throw new GlutenNotSupportException( + "AttachDistributedSequenceExec is not supported in ClickHouse backend yet.") + override def expressionFlattenSupported(expr: Expression): Boolean = expr match { case ca: FlattenedAnd => CHFlattenedExpression.supported(ca.name) case co: FlattenedOr => CHFlattenedExpression.supported(co.name) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 599b91851ea..a3e53d2bf85 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -562,6 +562,8 @@ object VeloxBackendSettings extends BackendSettingsApi { override def supportColumnarArrowUdf(): Boolean = true + override def supportColumnarAttachDistributedSequenceExec(): Boolean = true + override def needPreComputeRangeFrameBoundary(): Boolean = true override def supportIcebergEqualityDeleteRead(): Boolean = false diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 5f1003689dd..13d29994704 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode, SparkHashJoinUtils} import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.execution.python.ArrowEvalPythonExec +import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, AttachDistributedSequenceExec} import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation import org.apache.spark.sql.execution.utils.ExecUtil import org.apache.spark.sql.expression.{UDFExpression, UserDefinedAggregateFunction} @@ -1205,6 +1205,10 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { override def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec = ColumnarRangeExec(rangeExec.range) + override def genColumnarAttachDistributedSequenceExec( + plan: AttachDistributedSequenceExec): ColumnarAttachDistributedSequenceBaseExec = + ColumnarAttachDistributedSequenceExec(plan.sequenceAttr, plan.child) + override def genColumnarTailExec(limit: Int, child: SparkPlan): ColumnarCollectTailBaseExec = ColumnarCollectTailExec(limit, child) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala new file mode 100644 index 00000000000..b57c09b5f46 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarAttachDistributedSequenceExec.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowJavaBatchType +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq} +import org.apache.gluten.iterator.Iterators +import org.apache.gluten.vectorized.ArrowWritableColumnVector + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.{ColumnarAttachDistributedSequenceBaseExec, SparkPlan} +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} + +/** + * Velox implementation of [[ColumnarAttachDistributedSequenceBaseExec]] that prepends a contiguous, + * globally increasing `Long` id column to its child output while keeping the columnar pipeline + * intact. + * + * Mirrors Spark's `AttachDistributedSequenceExec` semantics with two passes over the child: + * 1. A first pass executes the child plan over partitions `[0, numPartitions - 1)` and sums the + * `numRows` of every produced batch -- the last partition's count is not needed for the + * prefix-sum. The batches are closed immediately; no native data is materialized for the count + * pass beyond what the child operator naturally produces. + * 2. The per-partition prefix-sum is broadcast and a second pass executes the child plan again, + * prepending the new id column. Each output column (id + copies of the input columns) is a + * freshly allocated [[ArrowWritableColumnVector]] so the output batch has a uniform reference + * count -- required by the downstream `OffloadArrowDataExec`'s `getRefCntHeavy` check. Input + * values are copied via Arrow's `ValueVector.copyFromSafe`; the upstream input batch is left + * untouched and closed by the upstream iterator. + * + * Why no cache? The natural choice would be to wrap the child output in + * [[org.apache.spark.sql.execution.ColumnarCachedBatchSerializer]] and `persist` once, so the child + * plan is computed only once. That works for ordinary columnar batches but fails for zero-column + * batches that can result from column pruning when only the new id column is selected + * (`df.select("id")` projects away every input column): the cache serializer's + * `ensureVeloxBatch -> isVeloxBatch -> getIndicatorVector` path throws on zero-column input. The + * two-pass approach trades one extra child execution for robustness across all valid plans, and + * matches vanilla Spark's behavior when the pandas-on-Spark cache option is `NONE`. + * + * For the trivial single-partition case the count pass is skipped and the assignment runs directly + * with `startOffset = 0`. + */ +case class ColumnarAttachDistributedSequenceExec( + sequenceAttr: Attribute, + override val child: SparkPlan) + extends ColumnarAttachDistributedSequenceBaseExec(sequenceAttr, child) { + + override def batchType(): Convention.BatchType = ArrowJavaBatchType + + override def requiredChildConvention(): Seq[ConventionReq] = Seq( + ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatchType))) + + private val outputSchema: StructType = + StructType( + StructField(sequenceAttr.name, LongType, nullable = false) +: + child.output.map(a => StructField(a.name, a.dataType, a.nullable))) + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val childRdd = child.executeColumnar() + val numPartitions = childRdd.getNumPartitions + + if (numPartitions <= 1) { + // Fast path: at most one partition, no need to count. + return childRdd.mapPartitions(it => assignIds(it, startOffset = 0L)) + } + + // First pass: execute the child plan and count rows per partition for partitions + // [0, numPartitions - 1). The last partition's count is unused for the prefix-sum. + // Each batch is closed immediately after reading numRows so off-heap buffers are released. + val frontCounts: Array[Long] = sparkContext.runJob( + childRdd, + (it: Iterator[ColumnarBatch]) => { + var sum = 0L + while (it.hasNext) { + val cb = it.next() + sum += cb.numRows().toLong + cb.close() + } + sum + }, + 0 until (numPartitions - 1) + ) + val offsets = frontCounts.scanLeft(0L)(_ + _) + val bcOffsets = sparkContext.broadcast(offsets) + + // Second pass: re-execute the child plan and prepend the id column. + childRdd.mapPartitionsWithIndex { + (pid, it) => assignIds(it, bcOffsets.value(pid)) + } + } + + override protected def withNewChildInternal( + newChild: SparkPlan): ColumnarAttachDistributedSequenceExec = + copy(child = newChild) + + /** + * Prepends a `Long` id column to each input batch starting from `startOffset` and incrementing by + * row index. The output is a fresh heavy [[ColumnarBatch]] whose columns are all freshly + * allocated [[ArrowWritableColumnVector]]s with reference count 1. Input column values are copied + * via Arrow's `ValueVector.copyFromSafe` rather than retained zero-copy so the resulting batch + * satisfies the uniform-reference-count invariant required by downstream + * `ColumnarBatches.offload` / `getRefCntHeavy`. The original input batch is left untouched and is + * closed by the upstream iterator's recycling logic. + */ + private def assignIds( + batches: Iterator[ColumnarBatch], + startOffset: Long): Iterator[ColumnarBatch] = { + val attached = new Iterator[ColumnarBatch] { + private var running: Long = startOffset + + override def hasNext: Boolean = batches.hasNext + + override def next(): ColumnarBatch = { + val inputCb = batches.next() + ColumnarBatches.checkLoaded(inputCb) + val numRows = inputCb.numRows() + val outCols = ArrowWritableColumnVector.allocateColumns(numRows, outputSchema) + try { + val idVec = outCols(0) + var i = 0 + while (i < numRows) { + idVec.putLong(i, running + i) + i += 1 + } + idVec.setValueCount(numRows) + + // Copy each input column into its corresponding freshly-allocated output column. Using + // Arrow's per-row `copyFromSafe` keeps the implementation type-agnostic and ensures every + // output column has reference count 1, matching the id column. This avoids the uniform + // ref-count check enforced by `ColumnarBatches.getRefCntHeavy` when the planner inserts + // an `OffloadArrowDataExec` between us and the next Velox consumer. + var j = 0 + while (j < inputCb.numCols()) { + val src = inputCb.column(j).asInstanceOf[ArrowWritableColumnVector].getValueVector + val dst = outCols(j + 1).getValueVector + var r = 0 + while (r < numRows) { + dst.copyFromSafe(r, r, src) + r += 1 + } + dst.setValueCount(numRows) + j += 1 + } + + running += numRows + new ColumnarBatch(outCols.asInstanceOf[Array[ColumnVector]], numRows) + } catch { + case t: Throwable => + outCols.foreach(_.close()) + throw t + } + } + } + Iterators + .wrap(attached) + .recyclePayload(_.close()) + .create() + } +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala new file mode 100644 index 00000000000..fb326ba4d6d --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxAttachDistributedSequenceExecSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.config.GlutenConfig + +import org.apache.spark.SparkConf +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.AttachDistributedSequence +import org.apache.spark.sql.classic.ClassicDataset +import org.apache.spark.sql.execution.python.AttachDistributedSequenceExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.LongType + +class VeloxAttachDistributedSequenceExecSuite extends VeloxWholeStageTransformerSuite { + + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + override def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.shuffle.partitions", "3") + .set("spark.default.parallelism", "3") + .set(SQLConf.ANSI_ENABLED.key, "false") + } + + /** + * Build a DataFrame that prepends a distributed-sequence id column using a directly constructed + * [[AttachDistributedSequence]] logical node. This avoids depending on pandas-on-Spark / PySpark + * in JVM tests. + */ + private def attachSequence(df: DataFrame, name: String = "id"): DataFrame = { + val attr = AttributeReference(name, LongType, nullable = false)() + ClassicDataset.ofRows(spark, AttachDistributedSequence(attr, df.queryExecution.analyzed)) + } + + test("contiguous ids for a single partition") { + val df = attachSequence(spark.range(0, 7, 1, 1).toDF("v")) + val ids = df.select("id").collect().map(_.getLong(0)).toSeq + assert(ids == Seq(0L, 1L, 2L, 3L, 4L, 5L, 6L)) + } + + test("contiguous ids across multiple partitions of equal size") { + val df = attachSequence(spark.range(0, 12, 1, 4).toDF("v")) + val ids = df.select("id").collect().map(_.getLong(0)).toSeq.sorted + assert(ids == (0L until 12L)) + // Check the offload happened. + val plan = df.queryExecution.executedPlan + val matched = plan.collectFirst { + case e: ColumnarAttachDistributedSequenceExec => e + } + assert(matched.isDefined, s"Expected ColumnarAttachDistributedSequenceExec in:\n$plan") + } + + test("contiguous ids across multiple partitions of unequal size") { + val base = spark.range(0, 100, 1, 8).toDF("v").filter("v % 3 = 0") + val df = attachSequence(base) + val rows = df.collect() + val ids = rows.map(_.getAs[Long]("id")).toSeq.sorted + assert(ids == (0L until rows.length)) + } + + test("empty input produces empty output") { + val df = attachSequence(spark.range(0, 0, 1, 4).toDF("v")) + assert(df.collect().isEmpty) + } + + test("id is paired with the correct row payload") { + val df = attachSequence(spark.range(0, 5, 1, 1).toDF("v")) + val rows = df.select("id", "v").collect().map(r => (r.getLong(0), r.getLong(1))).toSeq + assert(rows == Seq((0L, 0L), (1L, 1L), (2L, 2L), (3L, 3L), (4L, 4L))) + } + + test("output survives a downstream Velox shuffle (offload path)") { + // Repartition after attach forces ArrowJava -> ArrowNative -> VeloxBatch via + // OffloadArrowDataExec, which calls ColumnarBatches.getRefCntHeavy and + // requires the uniform-refCnt invariant on the output batch. This mirrors + // the vanilla SPARK-36338 inherited test that exposed the bug in CI. + val df = attachSequence(spark.range(0, 20, 1, 4).toDF("v")).repartition(3) + val rows = df.select("id", "v").collect().map(r => (r.getLong(0), r.getLong(1))).toSeq + val ids = rows.map(_._1).sorted + val vs = rows.map(_._2).sorted + assert(ids == (0L until 20L)) + assert(vs == (0L until 20L)) + } + + test("falls back to vanilla exec when columnar attach-distributed-sequence is disabled") { + withSQLConf( + "spark.gluten.sql.columnar.attachDistributedSequence" -> "false" + ) { + val df = attachSequence(spark.range(0, 4, 1, 2).toDF("v")) + val plan = df.queryExecution.executedPlan + assert( + plan.find(_.isInstanceOf[ColumnarAttachDistributedSequenceExec]).isEmpty, + s"Expected no ColumnarAttachDistributedSequenceExec in:\n$plan") + val ids = df.select("id").collect().map(_.getLong(0)).toSeq.sorted + assert(ids == Seq(0L, 1L, 2L, 3L)) + } + } + + test("GlutenConfig getter returns default true") { + assert(GlutenConfig.get.enableColumnarAttachDistributedSequence) + } + + test("vanilla exec construction does not break offload pattern") { + // Sanity: confirm vanilla exec class is available and constructible (used in offload). + val attr = AttributeReference("id", LongType, nullable = false)() + val child = spark.range(0, 1, 1, 1).queryExecution.executedPlan + val vanilla = AttachDistributedSequenceExec(attr, child) + assert(vanilla.output.head.name == "id") + } +} diff --git a/docs/Configuration.md b/docs/Configuration.md index 748a200cfbf..080b98e3025 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -47,6 +47,7 @@ nav_order: 15 | spark.gluten.sql.collapseGetJsonObject.enabled | 🔄 Dynamic | false | Collapse nested get_json_object functions as one for optimization. | | spark.gluten.sql.columnar.appendData | 🔄 Dynamic | true | Enable or disable columnar v2 command append data. | | spark.gluten.sql.columnar.arrowUdf | 🔄 Dynamic | true | Enable or disable columnar arrow udf. | +| spark.gluten.sql.columnar.attachDistributedSequence | 🔄 Dynamic | true | Enable or disable columnar AttachDistributedSequenceExec, which prepends a contiguous distributed-sequence id column used by pandas-on-Spark's default index and DataFrame.zipWithIndex. | | spark.gluten.sql.columnar.batchscan | 🔄 Dynamic | true | Enable or disable columnar batchscan. | | spark.gluten.sql.columnar.broadcastExchange | 🔄 Dynamic | true | Enable or disable columnar broadcastExchange. | | spark.gluten.sql.columnar.broadcastJoin | 🔄 Dynamic | true | Enable or disable columnar broadcastJoin. | diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 7f5d5a9c214..807d01f80b1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -149,6 +149,8 @@ trait BackendSettingsApi { def supportColumnarArrowUdf(): Boolean = false + def supportColumnarAttachDistributedSequenceExec(): Boolean = false + def needPreComputeRangeFrameBoundary(): Boolean = false def supportIcebergEqualityDeleteRead(): Boolean = true diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 84e2d865541..4783c95af99 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.python.ArrowEvalPythonExec +import org.apache.spark.sql.execution.python.AttachDistributedSequenceExec import org.apache.spark.sql.execution.window._ import org.apache.spark.sql.hive.HiveUDFTransformer import org.apache.spark.sql.types.{DecimalType, LongType, NullType, StructType} @@ -763,6 +764,9 @@ trait SparkPlanExecApi { def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec + def genColumnarAttachDistributedSequenceExec( + plan: AttachDistributedSequenceExec): ColumnarAttachDistributedSequenceBaseExec + def genColumnarTailExec(limit: Int, plan: SparkPlan): ColumnarCollectTailBaseExec def genColumnarToCarrierRow(plan: SparkPlan): SparkPlan diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 821d7472287..4f39677c99d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -385,6 +385,8 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { } def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED) + def enableColumnarAttachDistributedSequence: Boolean = + getConf(COLUMNAR_ATTACH_DISTRIBUTED_SEQUENCE_ENABLED) def enableColumnarCollectLimit: Boolean = getConf(COLUMNAR_COLLECT_LIMIT_ENABLED) def enableColumnarCollectTail: Boolean = getConf(COLUMNAR_COLLECT_TAIL_ENABLED) def getSupportedFlattenedExpressions: String = getConf(GLUTEN_SUPPORTED_FLATTENED_FUNCTIONS) @@ -1606,6 +1608,15 @@ object GlutenConfig extends ConfigRegistry { .booleanConf .createWithDefault(true) + val COLUMNAR_ATTACH_DISTRIBUTED_SEQUENCE_ENABLED = + buildConf("spark.gluten.sql.columnar.attachDistributedSequence") + .doc( + "Enable or disable columnar AttachDistributedSequenceExec, which prepends a " + + "contiguous distributed-sequence id column used by pandas-on-Spark's default " + + "index and DataFrame.zipWithIndex.") + .booleanConf + .createWithDefault(true) + val COLUMNAR_COLLECT_LIMIT_ENABLED = buildConf("spark.gluten.sql.columnar.collectLimit") .doc("Enable or disable columnar collectLimit.") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 3d844607b39..e7790daabf4 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ -import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, BatchEvalPythonExec, EvalPythonExecTransformer} +import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, AttachDistributedSequenceExec, BatchEvalPythonExec, EvalPythonExecTransformer} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer @@ -305,6 +305,9 @@ object OffloadOthers { } case plan: RangeExec => ColumnarRangeBaseExec.from(plan) + case plan: AttachDistributedSequenceExec + if BackendsApiManager.getSettings.supportColumnarAttachDistributedSequenceExec() => + ColumnarAttachDistributedSequenceBaseExec.from(plan) case plan: SampleExec => val child = plan.child BackendsApiManager.getSparkPlanExecApiInstance.genSampleExecTransformer( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index faa0d9e2e54..729a6306d84 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, BatchScanExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.execution.python.AttachDistributedSequenceExec import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} @@ -216,6 +217,9 @@ object Validators { .supportSampleExec()) => fail(p) case p: RangeExec if !glutenConf.enableColumnarRange => fail(p) + case p: AttachDistributedSequenceExec + if !glutenConf.enableColumnarAttachDistributedSequence => + fail(p) case p: CollectLimitExec if !glutenConf.enableColumnarCollectLimit => fail(p) case p: CollectTailExec if !glutenConf.enableColumnarCollectTail => fail(p) case _ => pass() diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarAttachDistributedSequenceBaseExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarAttachDistributedSequenceBaseExec.scala new file mode 100644 index 00000000000..3b5332f883e --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarAttachDistributedSequenceBaseExec.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.ValidatablePlan +import org.apache.gluten.extension.columnar.transition.Convention + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.execution.python.AttachDistributedSequenceExec + +/** + * Base class for [[AttachDistributedSequenceExec]] transformation that can be implemented by + * supported backends. The exec prepends a contiguous, globally increasing `Long` id column to the + * child output. Used by pandas-on-Spark distributed-sequence default index and + * `DataFrame.zipWithIndex`. + */ +abstract class ColumnarAttachDistributedSequenceBaseExec( + sequenceAttr: Attribute, + override val child: SparkPlan) + extends UnaryExecNode + with ValidatablePlan { + + override def producedAttributes: AttributeSet = AttributeSet(sequenceAttr) + + override val output: Seq[Attribute] = sequenceAttr +: child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def rowType(): Convention.RowType = Convention.RowType.None + + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") + } + + override def simpleString(maxFields: Int): String = { + val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields) + val indexColumn = s"Index: $sequenceAttr" + s"$nodeName$truncatedOutputString $indexColumn" + } + + /** + * Hook for backend implementations to release any resources (e.g. cached RDDs) that were + * materialized during execution. Called from [[cleanupResources]] after children have been + * cleaned up. The default implementation is a no-op. + */ + protected def doColumnarCleanup(): Unit = {} + + override protected[sql] def cleanupResources(): Unit = { + try { + doColumnarCleanup() + } finally { + super.cleanupResources() + } + } +} + +/** + * Companion object for ColumnarAttachDistributedSequenceBaseExec, provides factory methods to + * create instance from existing AttachDistributedSequenceExec plan. + */ +object ColumnarAttachDistributedSequenceBaseExec { + def from(plan: AttachDistributedSequenceExec): ColumnarAttachDistributedSequenceBaseExec = { + BackendsApiManager.getSparkPlanExecApiInstance + .genColumnarAttachDistributedSequenceExec(plan) + } +}