diff --git a/AGENTS.md b/AGENTS.md index daa34497a6628..1c4199d50b8c3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -196,6 +196,7 @@ Every module from the root pom.xml, organized by function. Flink provides three Key separations: - **Planner vs Runtime:** The table planner generates code and execution plans; the runtime executes them. Changes to planning logic live in `flink-table-planner`; changes to runtime operators live in `flink-table-runtime` or `flink-streaming-java`. +- **Codegen vs hand-written operators:** Per-record expression logic (casts, projections, filters, function calls) is generated at planning time by cast rules in `flink-table-planner/.../functions/casting/` and call generators in `flink-table-planner/.../codegen/calls/`, then compiled by Janino into the surrounding operator class. Operators with fixed structure (joins, aggregations, source/sink runtime) are hand-written Java in `flink-table-runtime` or `flink-streaming-java`. New scalar functions usually only need a `BuiltInFunctionDefinitions` entry plus a `BuiltInScalarFunction` subclass - the planner wires up codegen automatically. New cast behaviour or a custom call shape needs a cast rule or call generator. - **API vs Implementation:** Public API surfaces (`flink-core-api`, `flink-datastream-api`, `flink-table-api-java`) are separate from implementation modules. API stability annotations control what users can depend on. - **ArchUnit enforcement:** `flink-architecture-tests/` contains ArchUnit tests that enforce module boundaries. New violations should be avoided; if unavoidable, follow the freeze procedure in `flink-architecture-tests/README.md`. @@ -294,6 +295,7 @@ This section maps common types of Flink changes to the modules they touch and th - Ensure `./mvnw clean verify` passes before opening a PR - Always push to your fork, not directly to `apache/flink` - Rebase onto the latest target branch before submitting +- For user-visible behaviour changes, breaking changes, new SQL features, or new config options: fill in the **Release Notes** field on the JIRA ticket. The release manager consolidates these when cutting a release. The next version's `docs/content/release-notes/flink-X.Y.md` will be generated based of the jira tickets, so make sure to fill them in properly. ### AI-assisted contributions diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 4c60a96746c80..53a1ad5dd1897 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -794,7 +794,7 @@ conditional: conversion: - sql: CAST(value AS type) table: ANY.cast(TYPE) - description: Returns a new value being cast to type type. A CAST error throws an exception and fails the job. When performing a cast operation that may fail, like STRING to INT, one should rather use TRY_CAST, in order to handle errors. If "table.exec.legacy-cast-behaviour" is enabled, CAST behaves like TRY_CAST. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS STRING) returns NULL of type STRING; CAST('non-number' AS INT) throws an exception and fails the job. + description: Returns a new value being cast to type type. A CAST error throws an exception and fails the job. When performing a cast operation that may fail, like STRING to INT, one should rather use TRY_CAST, in order to handle errors. If "table.exec.legacy-cast-behaviour" is enabled, CAST behaves like TRY_CAST. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS STRING) returns NULL of type STRING; CAST('non-number' AS INT) throws an exception and fails the job. Casting BINARY/VARBINARY/BYTES to a CHAR/VARCHAR/STRING type validates that the input is well-formed UTF-8 and throws on invalid sequences. Use MAKE_VALID_UTF8 to substitute the Unicode replacement character U+FFFD for invalid bytes, TRY_CAST to return NULL, or set "table.exec.legacy-bytes-to-string-cast" to "true" to restore the prior silent-substitution behavior. - sql: TRY_CAST(value AS type) table: ANY.tryCast(TYPE) description: Like CAST, but in case of error, returns NULL rather than failing the job. E.g., TRY_CAST('42' AS INT) returns 42; TRY_CAST(NULL AS STRING) returns NULL of type STRING; TRY_CAST('non-number' AS INT) returns NULL of type INT; COALESCE(TRY_CAST('non-number' AS INT), 0) returns 0 of type INT. @@ -818,6 +818,8 @@ conversion: description: | Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. + `MAKE_VALID_UTF8()` can fully replace a `CAST(bytes AS STRING)` which would error in case of invalid UTF-8. + E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index 5cfcac4879137..12a952bd9687a 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -922,8 +922,10 @@ conversion: description: | 返回 value 被转换为类型 type 的新值。CAST错误会抛出异常并导致作业失败。为了处理错误,在使用可能失败的 CAST 操作时,例如 STRING 转换为 INT,建议使用 TRY_CAST 替代。 如果开启了 "table.exec.legacy-cast-behaviour",CAST 行为将变得与 TRY_CAST 一致。 - + 例如, CAST('42' AS INT) 返回 42; CAST(NULL AS STRING) 返回字符串类型的 `NULL`; CAST('non-number' AS INT) 抛出异常且作业失败。 + + Casting BINARY/VARBINARY/BYTES to a CHAR/VARCHAR/STRING type validates that the input is well-formed UTF-8 and throws on invalid sequences. Use MAKE_VALID_UTF8 to substitute the Unicode replacement character U+FFFD for invalid bytes, TRY_CAST to return NULL, or set "table.exec.legacy-bytes-to-string-cast" to "true" to restore the prior silent-substitution behavior. - sql: TRY_CAST(value AS type) table: ANY.tryCast(TYPE) description: | @@ -948,6 +950,8 @@ conversion: description: | Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`. + `MAKE_VALID_UTF8()` can fully replace a `CAST(bytes AS STRING)` which would error in case of invalid UTF-8. + E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character). collection: diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index d043932d56e43..e71bf3d7673a9 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -182,6 +182,12 @@ Duration Specifies a minimum time interval for how long cleanup unmatched records in the interval join operator. Before Flink 1.18, the default value of this param was the half of interval duration. Note: Set this option greater than 0 will cause unmatched records in outer joins to be output later than watermark, leading to possible discarding of these records by downstream watermark-dependent operators, such as window operators. The default value is 0, which means it will clean up unmatched records immediately. + +
table.exec.legacy-bytes-to-string-cast

Batch Streaming + false + Boolean + When true, CAST(bytes AS STRING) for BINARY/VARBINARY/BYTES inputs silently substitutes the Unicode replacement character U+FFFD for invalid UTF-8 sequences. When false (the default), invalid input fails the job; use MAKE_VALID_UTF8 or TRY_CAST to handle malformed bytes. +
table.exec.legacy-cast-behaviour

Batch Streaming DISABLED diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 047b7b36e99f3..a1ed3ec97ff58 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -731,6 +731,18 @@ public class ExecutionConfigOptions { "Determines whether CAST will operate following the legacy behaviour " + "or the new one that introduces various fixes and improvements."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST = + key("table.exec.legacy-bytes-to-string-cast") + .booleanType() + .defaultValue(false) + .withDescription( + "When true, CAST(bytes AS STRING) for BINARY/VARBINARY/BYTES inputs " + + "silently substitutes the Unicode replacement character " + + "U+FFFD for invalid UTF-8 sequences. When false (the default), " + + "invalid input fails the job; use MAKE_VALID_UTF8 or TRY_CAST to handle " + + "malformed bytes."); + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption TABLE_EXEC_RANK_TOPN_CACHE_SIZE = ConfigOptions.key("table.exec.rank.topn-cache-size") diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java index 1cd659571a10a..b6067710ee4d2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.functions.casting; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; @@ -27,12 +28,19 @@ import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName; import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING; +import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_UTF8_BYTES; import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField; import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall; import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall; +import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldPad; +import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldTrim; /** * {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule. + * + *

Strict UTF-8 mode is the default: invalid input bytes throw a {@code TableRuntimeException}. + * Setting {@link ExecutionConfigOptions#TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST} to {@code true} + * restores the prior behavior, where invalid sequences are silently replaced by {@code U+FFFD}. */ class BinaryToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule { @@ -48,14 +56,26 @@ private BinaryToStringCastRule() { /* Example generated code + --- Strict UTF-8 mode fast path: STRING / VARCHAR(MAX) target. No String allocation, no re-encoding. isNull$0 = _myInputIsNull; if (!isNull$0) { - java.lang.String resultString$435; - resultString$435 = new java.lang.String(_myInput, java.nio.charset.StandardCharsets.UTF_8); + result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromUtf8Bytes(_myInput); + isNull$0 = result$1 == null; + } else { + result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; + } + + --- Round-trip path: legacy mode (silent U+FFFD substitution) or strict UTF-8 mode + CHAR(n)/VARCHAR(n) (trim/pad). + --- The decode line below is the legacy variant; in strict UTF-8 mode it becomes: + --- resultString$0 = org.apache.flink.table.data.binary.BinaryStringData.fromUtf8Bytes(_myInput).toString(); + isNull$0 = _myInputIsNull; + if (!isNull$0) { + java.lang.String resultString$0; + resultString$0 = new java.lang.String(_myInput, java.nio.charset.StandardCharsets.UTF_8); java.lang.String resultPadOrTrim$538; - resultPadOrTrim$538 = resultString$435.toString(); - if (resultString$435.length() > 12) { - resultPadOrTrim$538 = resultString$435.substring(0, java.lang.Math.min(resultString$435.length(), 12)); + resultPadOrTrim$538 = resultString$0.toString(); + if (resultString$0.length() > 12) { + resultPadOrTrim$538 = resultString$0.substring(0, java.lang.Math.min(resultString$0.length(), 12)); } else { if (resultPadOrTrim$538.length() < 12) { int padLength$539; @@ -68,8 +88,8 @@ private BinaryToStringCastRule() { resultPadOrTrim$538 = resultPadOrTrim$538 + sbPadding$540.toString(); } } - resultString$435 = resultPadOrTrim$538; - result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$435); + resultString$0 = resultPadOrTrim$538; + result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$0); isNull$0 = result$1 == null; } else { result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; @@ -77,6 +97,12 @@ private BinaryToStringCastRule() { */ + @Override + public boolean canFail(LogicalType inputLogicalType, LogicalType targetLogicalType) { + // Strict UTF-8 mode validates the input and can throw on malformed bytes. + return true; + } + @Override protected String generateCodeBlockInternal( CodeGeneratorCastRule.Context context, @@ -84,6 +110,24 @@ protected String generateCodeBlockInternal( String returnVariable, LogicalType inputLogicalType, LogicalType targetLogicalType) { + final boolean legacy = + context.getCodeGeneratorContext() + .tableConfig() + .get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST); + final int length = LogicalTypeChecks.getLength(targetLogicalType); + final boolean needsTrimOrPad = couldTrim(length) || couldPad(targetLogicalType, length); + + // Strict UTF-8 mode fast path: unbounded target. Wrap the input bytes directly with no + // intermediate String. Legacy mode always needs the round-trip below because the JDK + // decoder is what substitutes U+FFFD for invalid sequences. + if (!context.isPrinting() && !legacy && !needsTrimOrPad) { + return new CastRuleUtils.CodeWriter() + .assignStmt( + returnVariable, + staticCall(BINARY_STRING_DATA_FROM_UTF8_BYTES(), inputTerm)) + .toString(); + } + final String resultStringTerm = newName(context.getCodeGeneratorContext(), "resultString"); final CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter(); @@ -93,19 +137,24 @@ protected String generateCodeBlockInternal( .assignPlusStmt( resultStringTerm, staticCall(EncodingUtils.class, "hex", inputTerm)) .assignPlusStmt(resultStringTerm, "\"'\""); - } else { + } else if (legacy) { + // Legacy mode: lenient JDK decode, invalid sequences become U+FFFD. writer.assignStmt( resultStringTerm, constructorCall( String.class, inputTerm, accessStaticField(StandardCharsets.class, "UTF_8"))); + } else { + // Strict UTF-8 mode: validates, then materializes the String for trim/pad below. + writer.assignStmt( + resultStringTerm, + staticCall(BINARY_STRING_DATA_FROM_UTF8_BYTES(), inputTerm) + ".toString()"); } if (!context.legacyBehaviour() && !context.isPrinting()) { final String resultPadOrTrim = newName(context.getCodeGeneratorContext(), "resultPadOrTrim"); - final int length = LogicalTypeChecks.getLength(targetLogicalType); CharVarCharTrimPadCastRule.padAndTrimStringIfNeeded( writer, targetLogicalType, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala index a6392ae4a4d0a..20af035735068 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala @@ -514,6 +514,9 @@ object BuiltInMethods { val BINARY_STRING_DATA_FROM_STRING = Types.lookupMethod(classOf[BinaryStringData], "fromString", classOf[String]) + val BINARY_STRING_DATA_FROM_UTF8_BYTES = + Types.lookupMethod(classOf[BinaryStringData], "fromUtf8Bytes", classOf[Array[Byte]]) + val STRING_DATA_TO_BOOLEAN = Types.lookupMethod(classOf[BinaryStringDataUtil], "toBoolean", classOf[BinaryStringData]) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java index c7066d42a3ced..218d911e881db 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.types.Row; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Objects; @@ -344,7 +345,23 @@ Stream getTestSetSpecs() { $("f1").tryCast(MAP(INT(), ARRAY(INT()))), "TRY_CAST(f1 AS MAP>)", null, - MAP(INT(), ARRAY(INT())).nullable())); + MAP(INT(), ARRAY(INT())).nullable()), + TestSetSpec.forFunction( + BuiltInFunctionDefinitions.TRY_CAST, + "try cast from BYTES with invalid UTF-8 to STRING returns NULL") + .onFieldsWithData( + new byte[] {(byte) 0x80}, "Hello".getBytes(StandardCharsets.UTF_8)) + .andDataTypes(BYTES(), BYTES()) + .testResult( + $("f0").tryCast(STRING()), + "TRY_CAST(f0 AS STRING)", + null, + STRING().nullable()) + .testResult( + $("f1").tryCast(STRING()), + "TRY_CAST(f1 AS STRING)", + "Hello", + STRING().nullable())); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java index 90d04690e780f..2aa6e198a98fa 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; @@ -47,6 +48,7 @@ import org.junit.jupiter.api.parallel.ExecutionMode; import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDate; import java.time.LocalDateTime; @@ -114,6 +116,14 @@ class CastRulesTest { new CodeGeneratorContext( new Configuration(), Thread.currentThread().getContextClassLoader()); + private static final CodeGeneratorContext CTX_LEGACY_BYTES_TO_STRING = + new CodeGeneratorContext( + new Configuration() + .set( + ExecutionConfigOptions.TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST, + true), + Thread.currentThread().getContextClassLoader()); + private static final CastRule.Context CET_CONTEXT = CastRule.Context.create( false, false, CET, Thread.currentThread().getContextClassLoader(), CTX); @@ -693,6 +703,25 @@ Stream testCases() { BYTES(), new byte[] {70, 108, 105, 110, 107}, fromString("x'466c696e6b'")) + // Strict UTF-8 validation across all BINARY_STRING family roots. + .fail(BINARY(1), new byte[] {(byte) 0x80}, TableRuntimeException.class) + .fail( + VARBINARY(2), + new byte[] {(byte) 0xC0, (byte) 0xAF}, + TableRuntimeException.class) + .fail(BYTES(), new byte[] {(byte) 0x80}, TableRuntimeException.class) + // table.exec.legacy-bytes-to-string-cast=true restores silent substitution. + .fromCaseLegacyBytesToString( + BYTES(), new byte[] {(byte) 0x80}, fromString("�")) + .fromCaseLegacyBytesToString( + VARBINARY(2), + new byte[] {(byte) 0xC0, (byte) 0xAF}, + fromString("��")) + .fromCase( + BYTES(), + "é€😀".getBytes(StandardCharsets.UTF_8), + fromString("é€😀")) + .fromCasePrinting(BYTES(), new byte[] {(byte) 0x80}, fromString("x'80'")) .fromCase(BOOLEAN(), true, StringData.fromString("TRUE")) .fromCase(BOOLEAN(), false, StringData.fromString("FALSE")) .fromCase( @@ -871,6 +900,11 @@ Stream testCases() { .fromCaseLegacy(VARBINARY(1), new byte[] {33}, fromString("\u0021")) .fromCase(BYTES(), new byte[] {32}, fromString(" ")) .fromCaseLegacy(BYTES(), new byte[] {32}, fromString(" ")) + // Strict UTF-8 validation must fire before trim/pad on a CHAR(n) target. + .fail(BYTES(), new byte[] {(byte) 0x80}, TableRuntimeException.class) + // Legacy-bytes-to-string mode: invalid byte becomes U+FFFD then is padded. + .fromCaseLegacyBytesToString( + BYTES(), new byte[] {(byte) 0x80}, fromString("� ")) .fromCase(TINYINT(), (byte) -125, fromString("-125 ")) .fromCaseLegacy(TINYINT(), (byte) -125, fromString("-125")) .fromCase(SMALLINT(), (short) 32767, fromString("32767 ")) @@ -1661,6 +1695,20 @@ private CastTestSpecBuilder fromCaseLegacy( target); } + private CastTestSpecBuilder fromCaseLegacyBytesToString( + DataType srcDataType, Object src, Object target) { + return fromCase( + srcDataType, + CastRule.Context.create( + false, + false, + DateTimeUtils.UTC_ZONE.toZoneId(), + Thread.currentThread().getContextClassLoader(), + CTX_LEGACY_BYTES_TO_STRING), + src, + target); + } + private CastTestSpecBuilder fromCase( DataType srcDataType, CastRule.Context castContext, Object src, Object target) { this.inputTypes.add(srcDataType);