Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ Every module from the root pom.xml, organized by function. Flink provides three
Key separations:

- **Planner vs Runtime:** The table planner generates code and execution plans; the runtime executes them. Changes to planning logic live in `flink-table-planner`; changes to runtime operators live in `flink-table-runtime` or `flink-streaming-java`.
- **Codegen vs hand-written operators:** Per-record expression logic (casts, projections, filters, function calls) is generated at planning time by cast rules in `flink-table-planner/.../functions/casting/` and call generators in `flink-table-planner/.../codegen/calls/`, then compiled by Janino into the surrounding operator class. Operators with fixed structure (joins, aggregations, source/sink runtime) are hand-written Java in `flink-table-runtime` or `flink-streaming-java`. New scalar functions usually only need a `BuiltInFunctionDefinitions` entry plus a `BuiltInScalarFunction` subclass - the planner wires up codegen automatically. New cast behaviour or a custom call shape needs a cast rule or call generator.
- **API vs Implementation:** Public API surfaces (`flink-core-api`, `flink-datastream-api`, `flink-table-api-java`) are separate from implementation modules. API stability annotations control what users can depend on.
- **ArchUnit enforcement:** `flink-architecture-tests/` contains ArchUnit tests that enforce module boundaries. New violations should be avoided; if unavoidable, follow the freeze procedure in `flink-architecture-tests/README.md`.

Expand Down Expand Up @@ -294,6 +295,7 @@ This section maps common types of Flink changes to the modules they touch and th
- Ensure `./mvnw clean verify` passes before opening a PR
- Always push to your fork, not directly to `apache/flink`
- Rebase onto the latest target branch before submitting
- For user-visible behaviour changes, breaking changes, new SQL features, or new config options: fill in the **Release Notes** field on the JIRA ticket. The release manager consolidates these when cutting a release. The next version's `docs/content/release-notes/flink-X.Y.md` will be generated based of the jira tickets, so make sure to fill them in properly.

### AI-assisted contributions

Expand Down
4 changes: 3 additions & 1 deletion docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ conditional:
conversion:
- sql: CAST(value AS type)
table: ANY.cast(TYPE)
description: Returns a new value being cast to type type. A CAST error throws an exception and fails the job. When performing a cast operation that may fail, like STRING to INT, one should rather use TRY_CAST, in order to handle errors. If "table.exec.legacy-cast-behaviour" is enabled, CAST behaves like TRY_CAST. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS STRING) returns NULL of type STRING; CAST('non-number' AS INT) throws an exception and fails the job.
description: Returns a new value being cast to type type. A CAST error throws an exception and fails the job. When performing a cast operation that may fail, like STRING to INT, one should rather use TRY_CAST, in order to handle errors. If "table.exec.legacy-cast-behaviour" is enabled, CAST behaves like TRY_CAST. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS STRING) returns NULL of type STRING; CAST('non-number' AS INT) throws an exception and fails the job. Casting BINARY/VARBINARY/BYTES to a CHAR/VARCHAR/STRING type validates that the input is well-formed UTF-8 and throws on invalid sequences. Use MAKE_VALID_UTF8 to substitute the Unicode replacement character U+FFFD for invalid bytes, TRY_CAST to return NULL, or set "table.exec.legacy-bytes-to-string-cast" to "true" to restore the prior silent-substitution behavior.
- sql: TRY_CAST(value AS type)
table: ANY.tryCast(TYPE)
description: Like CAST, but in case of error, returns NULL rather than failing the job. E.g., TRY_CAST('42' AS INT) returns 42; TRY_CAST(NULL AS STRING) returns NULL of type STRING; TRY_CAST('non-number' AS INT) returns NULL of type INT; COALESCE(TRY_CAST('non-number' AS INT), 0) returns 0 of type INT.
Expand All @@ -818,6 +818,8 @@ conversion:
description: |
Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`.

`MAKE_VALID_UTF8()` can fully replace a `CAST(bytes AS STRING)` which would error in case of invalid UTF-8.

E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character).

collection:
Expand Down
6 changes: 5 additions & 1 deletion docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,10 @@ conversion:
description: |
返回 value 被转换为类型 type 的新值。CAST错误会抛出异常并导致作业失败。为了处理错误,在使用可能失败的 CAST 操作时,例如 STRING 转换为 INT,建议使用 TRY_CAST 替代。
如果开启了 "table.exec.legacy-cast-behaviour",CAST 行为将变得与 TRY_CAST 一致。

例如, CAST('42' AS INT) 返回 42; CAST(NULL AS STRING) 返回字符串类型的 `NULL`; CAST('non-number' AS INT) 抛出异常且作业失败。

Casting BINARY/VARBINARY/BYTES to a CHAR/VARCHAR/STRING type validates that the input is well-formed UTF-8 and throws on invalid sequences. Use MAKE_VALID_UTF8 to substitute the Unicode replacement character U+FFFD for invalid bytes, TRY_CAST to return NULL, or set "table.exec.legacy-bytes-to-string-cast" to "true" to restore the prior silent-substitution behavior.
- sql: TRY_CAST(value AS type)
table: ANY.tryCast(TYPE)
description: |
Expand All @@ -948,6 +950,8 @@ conversion:
description: |
Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`.

`MAKE_VALID_UTF8()` can fully replace a `CAST(bytes AS STRING)` which would error in case of invalid UTF-8.

E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character).

collection:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@
<td>Duration</td>
<td>Specifies a minimum time interval for how long cleanup unmatched records in the interval join operator. Before Flink 1.18, the default value of this param was the half of interval duration. Note: Set this option greater than 0 will cause unmatched records in outer joins to be output later than watermark, leading to possible discarding of these records by downstream watermark-dependent operators, such as window operators. The default value is 0, which means it will clean up unmatched records immediately.</td>
</tr>
<tr>
<td><h5>table.exec.legacy-bytes-to-string-cast</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When true, CAST(bytes AS STRING) for BINARY/VARBINARY/BYTES inputs silently substitutes the Unicode replacement character U+FFFD for invalid UTF-8 sequences. When false (the default), invalid input fails the job; use MAKE_VALID_UTF8 or TRY_CAST to handle malformed bytes.</td>
</tr>
<tr>
<td><h5>table.exec.legacy-cast-behaviour</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">DISABLED</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,18 @@ public class ExecutionConfigOptions {
"Determines whether CAST will operate following the legacy behaviour "
+ "or the new one that introduces various fixes and improvements.");

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST =
key("table.exec.legacy-bytes-to-string-cast")
.booleanType()
.defaultValue(false)
.withDescription(
"When true, CAST(bytes AS STRING) for BINARY/VARBINARY/BYTES inputs "
+ "silently substitutes the Unicode replacement character "
+ "U+FFFD for invalid UTF-8 sequences. When false (the default), "
+ "invalid input fails the job; use MAKE_VALID_UTF8 or TRY_CAST to handle "
+ "malformed bytes.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Long> TABLE_EXEC_RANK_TOPN_CACHE_SIZE =
ConfigOptions.key("table.exec.rank.topn-cache-size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.functions.casting;

import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
Expand All @@ -27,12 +28,19 @@

import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_UTF8_BYTES;
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldPad;
import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldTrim;

/**
* {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule.
*
* <p>Strict UTF-8 mode is the default: invalid input bytes throw a {@code TableRuntimeException}.
* Setting {@link ExecutionConfigOptions#TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST} to {@code true}
* restores the prior behavior, where invalid sequences are silently replaced by {@code U+FFFD}.
*/
class BinaryToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<byte[], String> {

Expand All @@ -48,14 +56,26 @@ private BinaryToStringCastRule() {

/* Example generated code

--- Strict UTF-8 mode fast path: STRING / VARCHAR(MAX) target. No String allocation, no re-encoding.
isNull$0 = _myInputIsNull;
if (!isNull$0) {
java.lang.String resultString$435;
resultString$435 = new java.lang.String(_myInput, java.nio.charset.StandardCharsets.UTF_8);
result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromUtf8Bytes(_myInput);
isNull$0 = result$1 == null;
} else {
result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
}

--- Round-trip path: legacy mode (silent U+FFFD substitution) or strict UTF-8 mode + CHAR(n)/VARCHAR(n) (trim/pad).
--- The decode line below is the legacy variant; in strict UTF-8 mode it becomes:
--- resultString$0 = org.apache.flink.table.data.binary.BinaryStringData.fromUtf8Bytes(_myInput).toString();
isNull$0 = _myInputIsNull;
if (!isNull$0) {
java.lang.String resultString$0;
resultString$0 = new java.lang.String(_myInput, java.nio.charset.StandardCharsets.UTF_8);
java.lang.String resultPadOrTrim$538;
resultPadOrTrim$538 = resultString$435.toString();
if (resultString$435.length() > 12) {
resultPadOrTrim$538 = resultString$435.substring(0, java.lang.Math.min(resultString$435.length(), 12));
resultPadOrTrim$538 = resultString$0.toString();
if (resultString$0.length() > 12) {
resultPadOrTrim$538 = resultString$0.substring(0, java.lang.Math.min(resultString$0.length(), 12));
} else {
if (resultPadOrTrim$538.length() < 12) {
int padLength$539;
Expand All @@ -68,22 +88,46 @@ private BinaryToStringCastRule() {
resultPadOrTrim$538 = resultPadOrTrim$538 + sbPadding$540.toString();
}
}
resultString$435 = resultPadOrTrim$538;
result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$435);
resultString$0 = resultPadOrTrim$538;
result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$0);
isNull$0 = result$1 == null;
} else {
result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
}

*/

@Override
public boolean canFail(LogicalType inputLogicalType, LogicalType targetLogicalType) {
// Strict UTF-8 mode validates the input and can throw on malformed bytes.
return true;
}

@Override
protected String generateCodeBlockInternal(
CodeGeneratorCastRule.Context context,
String inputTerm,
String returnVariable,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
final boolean legacy =
context.getCodeGeneratorContext()
.tableConfig()
.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST);
final int length = LogicalTypeChecks.getLength(targetLogicalType);
final boolean needsTrimOrPad = couldTrim(length) || couldPad(targetLogicalType, length);

// Strict UTF-8 mode fast path: unbounded target. Wrap the input bytes directly with no
// intermediate String. Legacy mode always needs the round-trip below because the JDK
// decoder is what substitutes U+FFFD for invalid sequences.
if (!context.isPrinting() && !legacy && !needsTrimOrPad) {
return new CastRuleUtils.CodeWriter()
.assignStmt(
returnVariable,
staticCall(BINARY_STRING_DATA_FROM_UTF8_BYTES(), inputTerm))
.toString();
}

final String resultStringTerm = newName(context.getCodeGeneratorContext(), "resultString");
final CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();

Expand All @@ -93,19 +137,24 @@ protected String generateCodeBlockInternal(
.assignPlusStmt(
resultStringTerm, staticCall(EncodingUtils.class, "hex", inputTerm))
.assignPlusStmt(resultStringTerm, "\"'\"");
} else {
} else if (legacy) {
// Legacy mode: lenient JDK decode, invalid sequences become U+FFFD.
writer.assignStmt(
resultStringTerm,
constructorCall(
String.class,
inputTerm,
accessStaticField(StandardCharsets.class, "UTF_8")));
} else {
// Strict UTF-8 mode: validates, then materializes the String for trim/pad below.
writer.assignStmt(
resultStringTerm,
staticCall(BINARY_STRING_DATA_FROM_UTF8_BYTES(), inputTerm) + ".toString()");
}

if (!context.legacyBehaviour() && !context.isPrinting()) {
final String resultPadOrTrim =
newName(context.getCodeGeneratorContext(), "resultPadOrTrim");
final int length = LogicalTypeChecks.getLength(targetLogicalType);
CharVarCharTrimPadCastRule.padAndTrimStringIfNeeded(
writer,
targetLogicalType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ object BuiltInMethods {
val BINARY_STRING_DATA_FROM_STRING =
Types.lookupMethod(classOf[BinaryStringData], "fromString", classOf[String])

val BINARY_STRING_DATA_FROM_UTF8_BYTES =
Types.lookupMethod(classOf[BinaryStringData], "fromUtf8Bytes", classOf[Array[Byte]])

val STRING_DATA_TO_BOOLEAN =
Types.lookupMethod(classOf[BinaryStringDataUtil], "toBoolean", classOf[BinaryStringData])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.types.Row;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Objects;
Expand Down Expand Up @@ -344,7 +345,23 @@ Stream<TestSetSpec> getTestSetSpecs() {
$("f1").tryCast(MAP(INT(), ARRAY(INT()))),
"TRY_CAST(f1 AS MAP<INT, ARRAY<INT>>)",
null,
MAP(INT(), ARRAY(INT())).nullable()));
MAP(INT(), ARRAY(INT())).nullable()),
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.TRY_CAST,
"try cast from BYTES with invalid UTF-8 to STRING returns NULL")
.onFieldsWithData(
new byte[] {(byte) 0x80}, "Hello".getBytes(StandardCharsets.UTF_8))
.andDataTypes(BYTES(), BYTES())
.testResult(
$("f0").tryCast(STRING()),
"TRY_CAST(f0 AS STRING)",
null,
STRING().nullable())
.testResult(
$("f1").tryCast(STRING()),
"TRY_CAST(f1 AS STRING)",
"Hello",
STRING().nullable()));
}

// --------------------------------------------------------------------------------------------
Expand Down
Loading