Skip to content

Commit 5c1f131

Browse files
authored
chore: Add envvars to override writer configs and cometConf minor clean up (#3540)
1 parent b8d8fbe commit 5c1f131

14 files changed

Lines changed: 59 additions & 88 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ object CometConf extends ShimCometConf {
7878

7979
val COMET_PREFIX = "spark.comet";
8080

81-
val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec";
81+
val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec"
8282

83-
val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";
83+
val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression"
8484

85-
val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator";
85+
val COMET_OPERATOR_CONFIG_PREFIX: String = s"$COMET_PREFIX.operator"
8686

8787
val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
8888
.category(CATEGORY_EXEC)
@@ -112,7 +112,7 @@ object CometConf extends ShimCometConf {
112112
"feature is highly experimental and only partially implemented. It should not " +
113113
"be used in production.")
114114
.booleanConf
115-
.createWithDefault(false)
115+
.createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false)
116116

117117
// Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices
118118
// and does not support complex types. Use native_iceberg_compat or auto instead.
@@ -488,13 +488,23 @@ object CometConf extends ShimCometConf {
488488
"Ensure that Comet shuffle memory overhead factor is a double greater than 0")
489489
.createWithDefault(1.0)
490490

491+
val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
492+
.category(CATEGORY_TUNING)
493+
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
494+
.intConf
495+
.checkValue(v => v > 0, "Batch size must be positive")
496+
.createWithDefault(8192)
497+
491498
val COMET_COLUMNAR_SHUFFLE_BATCH_SIZE: ConfigEntry[Int] =
492499
conf("spark.comet.columnar.shuffle.batch.size")
493500
.category(CATEGORY_SHUFFLE)
494501
.doc("Batch size when writing out sorted spill files on the native side. Note that " +
495502
"this should not be larger than batch size (i.e., `spark.comet.batchSize`). Otherwise " +
496503
"it will produce larger batches than expected in the native operator after shuffle.")
497504
.intConf
505+
.checkValue(
506+
v => v <= COMET_BATCH_SIZE.get(),
507+
"Should not be larger than batch size `spark.comet.batchSize`")
498508
.createWithDefault(8192)
499509

500510
val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
@@ -550,6 +560,7 @@ object CometConf extends ShimCometConf {
550560
.booleanConf
551561
.createWithDefault(false)
552562

563+
// Used on native side. Check spark_config.rs how the config is used
553564
val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
554565
conf(s"$COMET_PREFIX.debug.memory")
555566
.category(CATEGORY_TESTING)
@@ -608,12 +619,6 @@ object CometConf extends ShimCometConf {
608619
.booleanConf
609620
.createWithDefault(false)
610621

611-
val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
612-
.category(CATEGORY_TUNING)
613-
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
614-
.intConf
615-
.createWithDefault(8192)
616-
617622
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
618623
conf("spark.comet.parquet.enable.directBuffer")
619624
.category(CATEGORY_PARQUET)
@@ -793,14 +798,6 @@ object CometConf extends ShimCometConf {
793798
.booleanConf
794799
.createWithDefault(false)
795800

796-
val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
797-
conf("spark.comet.regexp.allowIncompatible")
798-
.category(CATEGORY_EXEC)
799-
.doc("Comet is not currently fully compatible with Spark for all regular expressions. " +
800-
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
801-
.booleanConf
802-
.createWithDefault(false)
803-
804801
val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] =
805802
conf("spark.comet.metrics.updateInterval")
806803
.category(CATEGORY_EXEC)
@@ -819,6 +816,7 @@ object CometConf extends ShimCometConf {
819816
.stringConf
820817
.createOptional
821818

819+
// Used on native side. Check spark_config.rs how the config is used
822820
val COMET_MAX_TEMP_DIRECTORY_SIZE: ConfigEntry[Long] =
823821
conf("spark.comet.maxTempDirectorySize")
824822
.category(CATEGORY_EXEC)
@@ -843,6 +841,9 @@ object CometConf extends ShimCometConf {
843841
.booleanConf
844842
.createWithEnvVarOrDefault("ENABLE_COMET_STRICT_TESTING", false)
845843

844+
val COMET_OPERATOR_DATA_WRITING_COMMAND_ALLOW_INCOMPAT: ConfigEntry[Boolean] =
845+
createOperatorIncompatConfig("DataWritingCommandExec")
846+
846847
/** Create a config to enable a specific operator */
847848
private def createExecEnabledConfig(
848849
exec: String,
@@ -858,6 +859,25 @@ object CometConf extends ShimCometConf {
858859
.createWithDefault(defaultValue)
859860
}
860861

862+
/**
863+
* Converts a config key to a valid environment variable name. Example:
864+
* "spark.comet.operator.DataWritingCommandExec.allowIncompatible" ->
865+
* "SPARK_COMET_OPERATOR_DATAWRITINGCOMMANDEXEC_ALLOWINCOMPATIBLE"
866+
*/
867+
private def configKeyToEnvVar(configKey: String): String =
868+
configKey.toUpperCase(Locale.ROOT).replace('.', '_')
869+
870+
private def createOperatorIncompatConfig(name: String): ConfigEntry[Boolean] = {
871+
val configKey = getOperatorAllowIncompatConfigKey(name)
872+
val envVar = configKeyToEnvVar(configKey)
873+
conf(configKey)
874+
.category(CATEGORY_EXEC)
875+
.doc(s"Whether to allow incompatibility for operator: $name. " +
876+
s"False by default. Can be overridden with $envVar env variable")
877+
.booleanConf
878+
.createWithEnvVarOrDefault(envVar, false)
879+
}
880+
861881
def isExprEnabled(name: String, conf: SQLConf = SQLConf.get): Boolean = {
862882
getBooleanConf(getExprEnabledConfigKey(name), defaultValue = true, conf)
863883
}

dev/diffs/3.4.3.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1009,7 +1009,7 @@ index 18123a4d6ec..fbe4c766eee 100644
10091009
- regexp_extract($"a", "(\\d+)-(\\d+)", 1)),
10101010
- Row("num-num", "300", "100") :: Row("num-num", "400", "100") ::
10111011
- Row("num-num", "400-400", "100") :: Nil)
1012-
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
1012+
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
10131013
+ val df = Seq(
10141014
+ ("100-200", "(\\d+)-(\\d+)", "300"),
10151015
+ ("100-200", "(\\d+)-(\\d+)", "400"),

dev/diffs/3.5.8.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -985,7 +985,7 @@ index fa1a64460fc..1d2e215d6a3 100644
985985
- ("100-200", "(\\d+)-(\\d+)", "300"),
986986
- ("100-200", "(\\d+)-(\\d+)", "400"),
987987
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
988-
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
988+
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
989989
+ val df = Seq(
990990
+ ("100-200", "(\\d+)-(\\d+)", "300"),
991991
+ ("100-200", "(\\d+)-(\\d+)", "400"),

dev/diffs/4.0.1.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,7 @@ index 0df7f806272..52d33d67328 100644
11651165
- ("100-200", "(\\d+)-(\\d+)", "300"),
11661166
- ("100-200", "(\\d+)-(\\d+)", "400"),
11671167
- ("100-200", "(\\d+)", "400")).toDF("a", "b", "c")
1168-
+ withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
1168+
+ withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
11691169
+ val df = Seq(
11701170
+ ("100-200", "(\\d+)-(\\d+)", "300"),
11711171
+ ("100-200", "(\\d+)-(\\d+)", "400"),

docs/source/user-guide/latest/compatibility.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on
6262

6363
Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
6464
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
65-
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
65+
this can be overridden by setting `spark.comet.expression.regexp.allowIncompatible=true`.
6666

6767
## Window Functions
6868

spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
6060
case _: ParquetFileFormat =>
6161
if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString
6262
.startsWith("hdfs:")) {
63-
return Unsupported(Some("Only HDFS/local filesystems output paths are supported"))
63+
return Unsupported(Some("Supported output filesystems: local, HDFS"))
6464
}
6565

6666
if (cmd.bucketSpec.isDefined) {

spark/src/main/scala/org/apache/comet/serde/strings.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,11 @@ object CometRLike extends CometExpressionSerde[RLike] {
223223
expr.right match {
224224
case Literal(pattern, DataTypes.StringType) =>
225225
if (!RegExp.isSupportedPattern(pattern.toString) &&
226-
!CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
226+
!CometConf.isExprAllowIncompat("regexp")) {
227227
withInfo(
228228
expr,
229229
s"Regexp pattern $pattern is not compatible with Spark. " +
230-
s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
230+
s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true " +
231231
"to allow it anyway.")
232232
None
233233
} else {
@@ -298,11 +298,11 @@ object CometStringLPad extends CometExpressionSerde[StringLPad] {
298298
object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] {
299299
override def getSupportLevel(expr: RegExpReplace): SupportLevel = {
300300
if (!RegExp.isSupportedPattern(expr.regexp.toString) &&
301-
!CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
301+
!CometConf.isExprAllowIncompat("regexp")) {
302302
withInfo(
303303
expr,
304304
s"Regexp pattern ${expr.regexp} is not compatible with Spark. " +
305-
s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
305+
s"Set ${CometConf.getExprAllowIncompatConfigKey("regexp")}=true " +
306306
"to allow it anyway.")
307307
return Incompatible()
308308
}

spark/src/test/resources/sql-tests/expressions/string/regexp_replace_enabled.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
-- under the License.
1717

1818
-- Test regexp_replace() with regexp allowIncompatible enabled (happy path)
19-
-- Config: spark.comet.regexp.allowIncompatible=true
19+
-- Config: spark.comet.expression.regexp.allowIncompatible=true
2020
-- ConfigMatrix: parquet.enable.dictionary=false,true
2121

2222
statement

spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
-- under the License.
1717

1818
-- Test RLIKE with regexp allowIncompatible enabled (happy path)
19-
-- Config: spark.comet.regexp.allowIncompatible=true
19+
-- Config: spark.comet.expression.regexp.allowIncompatible=true
2020
-- ConfigMatrix: parquet.enable.dictionary=false,true
2121

2222
statement

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
958958
// add repetitive data to trigger dictionary encoding
959959
Range(0, 100).map(_ => "John Smith")
960960
withParquetFile(data.zipWithIndex, withDictionary) { file =>
961-
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
961+
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
962962
spark.read.parquet(file).createOrReplaceTempView(table)
963963
val query = sql(s"select _2 as id, _1 rlike 'R[a-z]+s [Rr]ose' from $table")
964964
checkSparkAnswerAndOperator(query)
@@ -996,7 +996,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
996996
withTable(table) {
997997
sql(s"create table $table(id int, name varchar(20)) using parquet")
998998
sql(s"insert into $table values(1,'James Smith')")
999-
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
999+
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
10001000
val query2 = sql(s"select id from $table where name rlike name")
10011001
val (_, cometPlan) = checkSparkAnswer(query2)
10021002
val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
@@ -1030,7 +1030,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
10301030
// "Smith$",
10311031
"Smith\\Z",
10321032
"Smith\\z")
1033-
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
1033+
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
10341034
patterns.foreach { pattern =>
10351035
val query2 = sql(s"select name, '$pattern', name rlike '$pattern' from $table")
10361036
checkSparkAnswerAndOperator(query2)
@@ -1090,7 +1090,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
10901090
"\\V")
10911091
val qualifiers = Seq("", "+", "*", "?", "{1,}")
10921092

1093-
withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") {
1093+
withSQLConf(CometConf.getExprAllowIncompatConfigKey("regexp") -> "true") {
10941094
// testing every possible combination takes too long, so we pick some
10951095
// random combinations
10961096
for (_ <- 0 until 100) {

0 commit comments

Comments
 (0)