diff --git a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java index 106d8b98170..28ed5514a9c 100644 --- a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java +++ b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java @@ -35,9 +35,9 @@ public long rtHandle() { return runtime.getHandle(); } - public static native void clearHashTable(long hashTableData); + public static native void clearHashTable(String cacheKey, long hashTableData); - public static native long cloneHashTable(long hashTableData); + public static native long cloneHashTable(String cacheKey, long hashTableData); public native long nativeBuild( String buildHashTableId, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala index 1554c4ddd3e..2015328bcfb 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala @@ -106,7 +106,7 @@ case class BroadcastHashJoinExecTransformer( isNullAwareAntiJoin) { // Unique ID for built table - lazy val buildBroadcastTableId: String = buildPlan.id.toString + lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan) override protected lazy val substraitJoinType: JoinRel.JoinType = joinType match { case _: InnerLike => @@ -135,9 +135,12 @@ case class BroadcastHashJoinExecTransformer( override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = { val streamedRDD = getColumnarInputRDDs(streamedPlan) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + var cacheKey = "" if (executionId != null) { - GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId) + cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId + GlutenDriverEndpoint.collectResources(executionId, cacheKey) } else { + cacheKey = "Gluten_Execution_" + ":" + buildHashTableId logWarning( s"Can not trace broadcast table data $buildBroadcastTableId" + s" because execution id is null." + @@ -174,7 +177,7 @@ case class BroadcastHashJoinExecTransformer( buildPlan.output, filterBuildColumns, filterPropagatesNulls, - buildBroadcastTableId, + cacheKey, isNullAwareAntiJoin, bloomFilterPushdownSize, metrics.get("buildHashTableTime") diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index 535fd8900e1..c8fcd574cdd 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -103,7 +103,7 @@ object VeloxBroadcastBuildSideCache } } - HashJoinBuilder.clearHashTable(value.pointer) + HashJoinBuilder.clearHashTable(key, value.pointer) } } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index fea9f149745..f640fde9eaa 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -234,7 +234,7 @@ case class ColumnarBuildSideRelation( (hashTableData, this) } else { - (HashJoinBuilder.cloneHashTable(hashTableData), null) + (HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null) } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index fbc329f3606..de783de4892 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -204,7 +204,7 @@ class UnsafeColumnarBuildSideRelation( (hashTableData, this) } else { - (HashJoinBuilder.cloneHashTable(hashTableData), null) + (HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null) } } diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 4ab944898bd..fff50a4df1d 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -42,12 +42,14 @@ struct SparkTaskInfo { int32_t partitionId{0}; // Same as TID. int64_t taskId{0}; + // Same as Spark SQL execution id. -1 means unavailable. + int64_t executionId{-1}; // virtual id for each backend internal use int32_t vId{0}; std::string toString() const { - return "[Stage: " + std::to_string(stageId) + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + - "]"; + return "[Stage: " + std::to_string(stageId) + " Execution: " + std::to_string(executionId) + + " TID: " + std::to_string(taskId) + " VID: " + std::to_string(vId) + "]"; } friend std::ostream& operator<<(std::ostream& os, const SparkTaskInfo& taskInfo) { diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 827c5ad8bdd..fd23bc55a1a 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -468,13 +468,14 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith jint stageId, jint partitionId, jlong taskId, + jlong executionId, jboolean enableDumping, jstring spillDir) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); - ctx->setSparkTaskInfo({stageId, partitionId, taskId}); + ctx->setSparkTaskInfo({stageId, partitionId, taskId, executionId}); if (enableDumping) { ctx->enableDumping(); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index c3ac095cdc7..8533a4ab609 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -72,6 +72,15 @@ const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; } // namespace +namespace { +std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { + if (taskInfo.executionId != -1) { + return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); + } + return fmt::format("Gluten_Execution_{}", ""); +} +} // namespace + WholeStageResultIterator::WholeStageResultIterator( VeloxMemoryManager* memoryManager, const std::shared_ptr& planNode, @@ -111,11 +120,7 @@ WholeStageResultIterator::WholeStageResultIterator( velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; std::shared_ptr queryCtx = createNewVeloxQueryCtx(); task_ = velox::exec::Task::create( - fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo_.stageId), - std::to_string(taskInfo_.taskId), - std::to_string(taskInfo.vId)), + getVeloxTaskId(taskInfo_), std::move(planFragment), 0, std::move(queryCtx), @@ -233,11 +238,7 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ gluten::VeloxBackend::get()->getAsyncDataCache(), memoryManager_->getAggregateMemoryPool(), spillExecutor_, - fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo_.stageId), - std::to_string(taskInfo_.taskId), - std::to_string(taskInfo_.vId))); + getVeloxTaskId(taskInfo_)); return ctx; } diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index aa4d9599435..aab855a31f6 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -46,6 +46,7 @@ #include "velox/common/base/BloomFilter.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/HashTable.h" +#include "velox/exec/HashTableCache.h" #ifdef GLUTEN_ENABLE_GPU #include "cudf/CudfPlanValidator.h" @@ -1060,6 +1061,12 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native nullptr); builder->setHashTable(std::move(mainTable)); + auto* cache = facebook::velox::exec::HashTableCache::instance(); + + if (!cache->hasTable(hashTableId)) { + cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(builder); } @@ -1138,6 +1145,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native } hashTableBuilders[0]->setHashTable(std::move(mainTable)); + + auto* cache = facebook::velox::exec::HashTableCache::instance(); + if (!cache->hasTable(hashTableId)) { + cache->injectTable( + hashTableId, + hashTableBuilders[0]->hashTable(), + hashTableBuilders[0]->joinHasNullKeys(), + defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(hashTableBuilders[0]); JNI_METHOD_END(kInvalidObjectHandle) } @@ -1145,9 +1162,17 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneHashTable( // NOLINT JNIEnv* env, jclass, + jstring cacheKey, jlong tableHandler) { JNI_METHOD_START + auto cacheKeyStr = jStringToCString(env, cacheKey); auto hashTableHandler = ObjectStore::retrieve(tableHandler); + auto* cache = facebook::velox::exec::HashTableCache::instance(); + if (!cache->hasTable(cacheKeyStr)) { + cache->injectTable( + cacheKeyStr, hashTableHandler->hashTable(), hashTableHandler->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(hashTableHandler); JNI_METHOD_END(kInvalidObjectHandle) } @@ -1155,13 +1180,15 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneH JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHashTable( // NOLINT JNIEnv* env, jclass, + jstring cacheKey, jlong tableHandler) { JNI_METHOD_START - auto hashTableHandler = ObjectStore::retrieve(tableHandler); - hashTableHandler->hashTable()->clear(true); + auto cacheKeyStr = jStringToCString(env, cacheKey); + facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr); ObjectStore::release(tableHandler); JNI_METHOD_END() } + #ifdef __cplusplus } #endif diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index b0fc0fc4a30..246b7814660 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -448,29 +448,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } else if ( sJoin.has_advanced_extension() && SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) { - std::string hashTableId = sJoin.hashtableid(); - - std::shared_ptr opaqueSharedHashTable = nullptr; - bool joinHasNullKeys = false; - - try { - auto hashTableBuilder = ObjectStore::retrieve(getJoin(hashTableId)); - joinHasNullKeys = hashTableBuilder->joinHasNullKeys(); - auto originalShared = hashTableBuilder->hashTable(); - opaqueSharedHashTable = std::shared_ptr( - originalShared, reinterpret_cast(originalShared.get())); - - LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId; - } catch (const std::exception& e) { - LOG(WARNING) - << "Error retrieving HashTable from ObjectStore: " << e.what() - << ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false."; - opaqueSharedHashTable = nullptr; - } - + const auto& hashTableId = sJoin.hashtableid(); + const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; // Create HashJoinNode node return std::make_shared( - nextPlanNodeId(), + joinNodeId, joinType, isNullAwareAntiJoin, leftKeys, @@ -479,14 +461,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: leftNode, rightNode, getJoinOutputType(leftNode, rightNode, joinType), - false, - false, - joinHasNullKeys, - opaqueSharedHashTable); + true); } else { // Create HashJoinNode node + auto joinNodeId = nextPlanNodeId(); return std::make_shared( - nextPlanNodeId(), + joinNodeId, joinType, isNullAwareAntiJoin, leftKeys, diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 73e3af15fa3..e34f364aa73 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,9 +17,9 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_06_05 -VELOX_ENHANCED_BRANCH=ibm-2026_06_05 +VELOX_REPO=https://github.com/JkSelf/velox.git +VELOX_BRANCH=dft-2026_06_05-hashtable-cache +VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache VELOX_HOME="" RUN_SETUP_SCRIPT=ON ENABLE_ENHANCED_FEATURES=OFF diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java index 6d2c90896b2..8ddbb4b3d39 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java @@ -34,6 +34,8 @@ public class NativePlanEvaluator { private static final Logger LOGGER = LoggerFactory.getLogger(NativePlanEvaluator.class); private static final AtomicInteger id = new AtomicInteger(0); + private static final long INVALID_EXECUTION_ID = -1L; + private static final String SPARK_EXECUTION_ID_KEY = "spark.sql.execution.id"; private final Runtime runtime; private final PlanEvaluatorJniWrapper jniWrapper; @@ -78,14 +80,16 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator( int partitionIndex, String spillDirPath) throws RuntimeException { + final TaskContext taskContext = TaskContext.get(); final long itrHandle = jniWrapper.nativeCreateKernelWithIterator( wsPlan, splitInfo, iterList, - TaskContext.get().stageId(), + taskContext.stageId(), partitionIndex, // TaskContext.getPartitionId(), - TaskContext.get().taskAttemptId(), + taskContext.taskAttemptId(), + getExecutionId(taskContext), DebugUtil.isDumpingEnabledForTask(), spillDirPath); final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle); @@ -113,4 +117,18 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { private ColumnarBatchOutIterator createOutIterator(Runtime runtime, long itrHandle) { return new ColumnarBatchOutIterator(runtime, itrHandle); } + + private static long getExecutionId(TaskContext taskContext) { + final String executionId = taskContext.getLocalProperty(SPARK_EXECUTION_ID_KEY); + if (executionId == null) { + return INVALID_EXECUTION_ID; + } + try { + return Long.parseLong(executionId); + } catch (NumberFormatException e) { + LOGGER.warn( + "Invalid Spark execution id '{}', fallback to {}", executionId, INVALID_EXECUTION_ID); + return INVALID_EXECUTION_ID; + } + } } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java index a8082906798..f0c3d804bfe 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java @@ -76,6 +76,7 @@ public native long nativeCreateKernelWithIterator( int stageId, int partitionId, long taskId, + long executionId, boolean enableDumping, String spillDir) throws RuntimeException; diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala index 149a0ca729e..3964c12e10e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, ExplainUtils, SparkPlan} +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BaseJoinExec, HashedRelationBroadcastMode, HashJoin} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ @@ -105,6 +107,15 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { (right, left) } + protected def canonicalBuildHashTableId(plan: SparkPlan): String = plan match { + case b: BroadcastQueryStageExec => + canonicalBuildHashTableId(b.plan) + case r: ReusedExchangeExec => + canonicalBuildHashTableId(r.child) + case other => + other.id.toString + } + def sameType(from: DataType, to: DataType): Boolean = { (from, to) match { case (ArrayType(fromElement, _), ArrayType(toElement, _)) => @@ -267,7 +278,7 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { inputBuildOutput, context, operatorId, - buildPlan.id.toString + canonicalBuildHashTableId(buildPlan) ) context.registerJoinParam(operatorId, joinParams) @@ -392,7 +403,7 @@ abstract class BroadcastHashJoinExecTransformerBase( override def hashJoinType: JoinType = joinType // Unique ID for builded hash table - lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id + lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan) override def genJoinParametersInternal(): (Int, Int, String) = { (1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 6ccddac0c62..6f1272b7f77 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -522,7 +522,9 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000", - SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName, + SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false" + ) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT a FROM testData join testData2 ON key = a " + "where value >= (" +