Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3803,6 +3803,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val USE_HASH_AGG = buildConf("spark.sql.execution.useHashAggregateExec")
.internal()
.doc("Decides if we use HashAggregateExec")
.version("4.3.0")
.booleanConf
.createWithDefault(true)

val JSON_GENERATOR_IGNORE_NULL_FIELDS =
buildConf("spark.sql.jsonGenerator.ignoreNullFields")
.doc("Whether to ignore null fields when generating JSON objects in JSON data source and " +
Expand Down Expand Up @@ -8189,6 +8196,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def useObjectHashAggregation: Boolean = getConf(USE_OBJECT_HASH_AGG)

def useHashAggregation: Boolean = getConf(USE_HASH_AGG)

def objectAggSortBasedFallbackThreshold: Int = getConf(OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD)

def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,12 @@ object AggUtils {
initialInputBufferOffset: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan = {
val useHash = Aggregate.supportsHashAggregate(
val useHash = child.conf.useHashAggregation && Aggregate.supportsHashAggregate(
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes), groupingExpressions)

val forceObjHashAggregate = forceApplyObjectHashAggregate(child.conf)
val forceSortAggregate = forceApplySortAggregate(child.conf)

if (useHash && !forceSortAggregate && !forceObjHashAggregate) {
if (useHash && !forceObjHashAggregate) {
HashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
isStreaming = isStreaming,
Expand All @@ -97,7 +96,7 @@ object AggUtils {
val useObjectHash = Aggregate.supportsObjectHashAggregate(
aggregateExpressions, groupingExpressions)

if (forceObjHashAggregate || (objectHashEnabled && useObjectHash && !forceSortAggregate)) {
if (forceObjHashAggregate || (objectHashEnabled && useObjectHash)) {
ObjectHashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
isStreaming = isStreaming,
Expand Down Expand Up @@ -584,15 +583,6 @@ object AggUtils {
}
}

/**
* Returns whether a sort aggregate should be force applied.
* The config key is hard-coded because it's testing only and should not be exposed.
*/
private def forceApplySortAggregate(conf: SQLConf): Boolean = {
Utils.isTesting &&
conf.getConfString("spark.sql.test.forceApplySortAggregate", "false") == "true"
}

/**
* Returns whether a object hash aggregate should be force applied.
* The config key is hard-coded because it's testing only and should not be exposed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3322,7 +3322,7 @@ class DataFrameAggregateSuite extends SharedSparkSession
Seq(
"spark.sql.test.forceApplyObjectHashAggregate" -> "true",
SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "1"),
Seq("spark.sql.test.forceApplySortAggregate" -> "true")
Seq(SQLConf.USE_HASH_AGG.key -> "false")
)

// Make tests faster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class WholeStageCodegenSuite extends SharedSparkSession

test("SortAggregate should be included in WholeStageCodegen") {
val df = spark.range(10).agg(max(col("id")), avg(col("id")))
withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
withSQLConf(SQLConf.USE_HASH_AGG.key -> "false") {
val plan = df.queryExecution.executedPlan
assert(plan.exists(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils

test("SortAggregate metrics") {
// Force use SortAggregateExec instead of HashAggregateExec
withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
withSQLConf(SQLConf.USE_HASH_AGG.key -> "false") {
// Assume the execution plan is
// -> SortAggregate(nodeId = 0)
// -> Sort(nodeId = 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ spark.sql.execution.replaceHashWithSortAgg
spark.sql.execution.reuseSubquery
spark.sql.execution.sortBeforeRepartition
spark.sql.execution.topKSortFallbackThreshold
spark.sql.execution.useHashAggregateExec
spark.sql.execution.useObjectHashAggregateExec
spark.sql.execution.usePartitionEvaluator
spark.sql.extendedExplainProviders
Expand Down