Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/content/release-notes/flink-2.4.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
title: "Release Notes - Flink 2.4"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be done by release managers

the release notes for the PR/jira should be put in Release notes of jira issue field and RM could incorporate them into final release notes doc

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But does it harm to start early? I actually like this idea, it makes a RM work easier later.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess the main issue is that when RM prepares release he/she goes through steps defined in https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release

there is a step to look into release notes in jira and nothing about this....
i'm not against this approach if it will be documented

Copy link
Copy Markdown
Contributor Author

@gustavodemorais gustavodemorais May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the note, @snuyanzin. I've added the release notes to the jira ticket and removed from the PR. Also added a short note with this info in Agents.md f860662

I think having it in the PR is nice to give the chance for the release notes to be reviewed directly. However, it's of course a pain to change an existing working process. Do you personally have a preference as someone who has had more experience with releases?

---

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Release notes - Flink 2.4

These release notes discuss important aspects, such as configuration, behavior or dependencies,
that changed between Flink 2.3 and Flink 2.4. Please read these notes carefully if you are
planning to upgrade your Flink version to 2.4.

### Table & SQL

#### Strict UTF-8 validation in CAST(BYTES AS STRING)

`CAST(value AS STRING)` (and the equivalent `VARCHAR`/`CHAR(n)` targets) now validates that the input bytes are well-formed UTF-8 when the source type is `BINARY`, `VARBINARY`, or `BYTES`. Invalid byte sequences fail the job with a `TableRuntimeException` instead of being silently substituted with the Unicode replacement character `U+FFFD` as before.

Migration options:

- Use `MAKE_VALID_UTF8(bytes)` to keep the lenient decode (replaces invalid sequences with `U+FFFD`).
- Use `TRY_CAST(bytes AS STRING)` to return `NULL` on invalid input.
- Filter at ingest with `WHERE IS_VALID_UTF8(bytes)` to route well-formed records, or `WHERE NOT IS_VALID_UTF8(bytes)` for a dead-letter sink.
- Restore the prior behavior across the job by setting `table.exec.legacy-bytes-to-string-cast` to `true`.
4 changes: 3 additions & 1 deletion docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ conditional:
conversion:
- sql: CAST(value AS type)
table: ANY.cast(TYPE)
description: Returns a new value being cast to type type. A CAST error throws an exception and fails the job. When performing a cast operation that may fail, like STRING to INT, one should rather use TRY_CAST, in order to handle errors. If "table.exec.legacy-cast-behaviour" is enabled, CAST behaves like TRY_CAST. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS STRING) returns NULL of type STRING; CAST('non-number' AS INT) throws an exception and fails the job.
description: Returns a new value being cast to type type. A CAST error throws an exception and fails the job. When performing a cast operation that may fail, like STRING to INT, one should rather use TRY_CAST, in order to handle errors. If "table.exec.legacy-cast-behaviour" is enabled, CAST behaves like TRY_CAST. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS STRING) returns NULL of type STRING; CAST('non-number' AS INT) throws an exception and fails the job. Casting BINARY/VARBINARY/BYTES to a CHAR/VARCHAR/STRING type validates that the input is well-formed UTF-8 and throws on invalid sequences. Use MAKE_VALID_UTF8 to substitute the Unicode replacement character `U+FFFD` for invalid bytes, TRY_CAST to return NULL, or set "table.exec.legacy-bytes-to-string-cast" to "true" to restore the prior silent-substitution behavior.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have sql more consistent: in some places it is with back ticks, in some without

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

- sql: TRY_CAST(value AS type)
table: ANY.tryCast(TYPE)
description: Like CAST, but in case of error, returns NULL rather than failing the job. E.g., TRY_CAST('42' AS INT) returns 42; TRY_CAST(NULL AS STRING) returns NULL of type STRING; TRY_CAST('non-number' AS INT) returns NULL of type INT; COALESCE(TRY_CAST('non-number' AS INT), 0) returns 0 of type INT.
Expand All @@ -818,6 +818,8 @@ conversion:
description: |
Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`.

`MAKE_VALID_UTF8()` can fully replace a `CAST(bytes AS STRING)` which would error in case of invalid UTF-8.

E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character).

collection:
Expand Down
6 changes: 5 additions & 1 deletion docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,10 @@ conversion:
description: |
返回 value 被转换为类型 type 的新值。CAST错误会抛出异常并导致作业失败。为了处理错误,在使用可能失败的 CAST 操作时,例如 STRING 转换为 INT,建议使用 TRY_CAST 替代。
如果开启了 "table.exec.legacy-cast-behaviour",CAST 行为将变得与 TRY_CAST 一致。

例如, CAST('42' AS INT) 返回 42; CAST(NULL AS STRING) 返回字符串类型的 `NULL`; CAST('non-number' AS INT) 抛出异常且作业失败。

Casting BINARY/VARBINARY/BYTES to a CHAR/VARCHAR/STRING type validates that the input is well-formed UTF-8 and throws on invalid sequences. Use MAKE_VALID_UTF8 to substitute the Unicode replacement character `U+FFFD` for invalid bytes, TRY_CAST to return NULL, or set "table.exec.legacy-bytes-to-string-cast" to "true" to restore the prior silent-substitution behavior.
- sql: TRY_CAST(value AS type)
table: ANY.tryCast(TYPE)
description: |
Expand All @@ -948,6 +950,8 @@ conversion:
description: |
Decodes the input as UTF-8, replacing each invalid sequence with the Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is lossy and irreversible. Returns `NULL` if the input is `NULL`.

`MAKE_VALID_UTF8()` can fully replace a `CAST(bytes AS STRING)` which would error in case of invalid UTF-8.

E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; `MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character).

collection:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@
<td>Duration</td>
<td>Specifies a minimum time interval for how long cleanup unmatched records in the interval join operator. Before Flink 1.18, the default value of this param was the half of interval duration. Note: Set this option greater than 0 will cause unmatched records in outer joins to be output later than watermark, leading to possible discarding of these records by downstream watermark-dependent operators, such as window operators. The default value is 0, which means it will clean up unmatched records immediately.</td>
</tr>
<tr>
<td><h5>table.exec.legacy-bytes-to-string-cast</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When true, CAST(bytes AS STRING) for BINARY/VARBINARY/BYTES inputs silently substitutes the Unicode replacement character U+FFFD for invalid UTF-8 sequences. When false (the default), invalid input fails the job; use MAKE_VALID_UTF8 or TRY_CAST to handle malformed bytes.</td>
</tr>
<tr>
<td><h5>table.exec.legacy-cast-behaviour</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">DISABLED</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,18 @@ public class ExecutionConfigOptions {
"Determines whether CAST will operate following the legacy behaviour "
+ "or the new one that introduces various fixes and improvements.");

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST =
key("table.exec.legacy-bytes-to-string-cast")
.booleanType()
.defaultValue(false)
.withDescription(
"When true, CAST(bytes AS STRING) for BINARY/VARBINARY/BYTES inputs "
+ "silently substitutes the Unicode replacement character "
+ "U+FFFD for invalid UTF-8 sequences. When false (the default), "
+ "invalid input fails the job; use MAKE_VALID_UTF8 or TRY_CAST to handle "
+ "malformed bytes.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Long> TABLE_EXEC_RANK_TOPN_CACHE_SIZE =
ConfigOptions.key("table.exec.rank.topn-cache-size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.functions.casting;

import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
Expand All @@ -27,12 +28,19 @@

import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
import static org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_UTF8_BYTES;
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldPad;
import static org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldTrim;

/**
* {@link LogicalTypeFamily#BINARY_STRING} to {@link LogicalTypeFamily#CHARACTER_STRING} cast rule.
*
* <p>Strict UTF-8 mode is the default: invalid input bytes throw a {@code TableRuntimeException}.
* Setting {@link ExecutionConfigOptions#TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST} to {@code true}
* restores the prior behavior, where invalid sequences are silently replaced by {@code U+FFFD}.
*/
class BinaryToStringCastRule extends AbstractNullAwareCodeGeneratorCastRule<byte[], String> {

Expand All @@ -48,6 +56,18 @@ private BinaryToStringCastRule() {

/* Example generated code

--- Strict UTF-8 mode fast path: STRING / VARCHAR(MAX) target. No String allocation, no re-encoding.
isNull$0 = _myInputIsNull;
if (!isNull$0) {
result$1 = org.apache.flink.table.data.binary.BinaryStringData.fromUtf8Bytes(_myInput);
isNull$0 = result$1 == null;
} else {
result$1 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
}

--- Round-trip path: legacy mode (silent U+FFFD substitution) or strict UTF-8 mode + CHAR(n)/VARCHAR(n) (trim/pad).
--- The decode line below is the legacy variant; in strict UTF-8 mode it becomes:
--- resultString$435 = org.apache.flink.table.data.binary.BinaryStringData.fromUtf8Bytes(_myInput).toString();
isNull$0 = _myInputIsNull;
if (!isNull$0) {
java.lang.String resultString$435;
Expand Down Expand Up @@ -77,13 +97,37 @@ private BinaryToStringCastRule() {

*/

@Override
public boolean canFail(LogicalType inputLogicalType, LogicalType targetLogicalType) {
// Strict UTF-8 mode validates the input and can throw on malformed bytes.
return true;
}

@Override
protected String generateCodeBlockInternal(
CodeGeneratorCastRule.Context context,
String inputTerm,
String returnVariable,
LogicalType inputLogicalType,
LogicalType targetLogicalType) {
final boolean legacy =
context.getCodeGeneratorContext()
.tableConfig()
.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST);
final int length = LogicalTypeChecks.getLength(targetLogicalType);
final boolean needsTrimOrPad = couldTrim(length) || couldPad(targetLogicalType, length);

// Strict UTF-8 mode fast path: unbounded target. Wrap the input bytes directly with no
// intermediate String. Legacy mode always needs the round-trip below because the JDK
// decoder is what substitutes U+FFFD for invalid sequences.
if (!context.isPrinting() && !legacy && !needsTrimOrPad) {
return new CastRuleUtils.CodeWriter()
.assignStmt(
returnVariable,
staticCall(BINARY_STRING_DATA_FROM_UTF8_BYTES(), inputTerm))
.toString();
}

final String resultStringTerm = newName(context.getCodeGeneratorContext(), "resultString");
final CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();

Expand All @@ -93,19 +137,24 @@ protected String generateCodeBlockInternal(
.assignPlusStmt(
resultStringTerm, staticCall(EncodingUtils.class, "hex", inputTerm))
.assignPlusStmt(resultStringTerm, "\"'\"");
} else {
} else if (legacy) {
// Legacy mode: lenient JDK decode, invalid sequences become U+FFFD.
writer.assignStmt(
resultStringTerm,
constructorCall(
String.class,
inputTerm,
accessStaticField(StandardCharsets.class, "UTF_8")));
} else {
// Strict UTF-8 mode: validates, then materializes the String for trim/pad below.
writer.assignStmt(
resultStringTerm,
staticCall(BINARY_STRING_DATA_FROM_UTF8_BYTES(), inputTerm) + ".toString()");
}

if (!context.legacyBehaviour() && !context.isPrinting()) {
final String resultPadOrTrim =
newName(context.getCodeGeneratorContext(), "resultPadOrTrim");
final int length = LogicalTypeChecks.getLength(targetLogicalType);
CharVarCharTrimPadCastRule.padAndTrimStringIfNeeded(
writer,
targetLogicalType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ object BuiltInMethods {
val BINARY_STRING_DATA_FROM_STRING =
Types.lookupMethod(classOf[BinaryStringData], "fromString", classOf[String])

val BINARY_STRING_DATA_FROM_UTF8_BYTES =
Types.lookupMethod(classOf[BinaryStringData], "fromUtf8Bytes", classOf[Array[Byte]])

val STRING_DATA_TO_BOOLEAN =
Types.lookupMethod(classOf[BinaryStringDataUtil], "toBoolean", classOf[BinaryStringData])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.types.Row;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Objects;
Expand Down Expand Up @@ -344,7 +345,23 @@ Stream<TestSetSpec> getTestSetSpecs() {
$("f1").tryCast(MAP(INT(), ARRAY(INT()))),
"TRY_CAST(f1 AS MAP<INT, ARRAY<INT>>)",
null,
MAP(INT(), ARRAY(INT())).nullable()));
MAP(INT(), ARRAY(INT())).nullable()),
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.TRY_CAST,
"try cast from BYTES with invalid UTF-8 to STRING returns NULL")
.onFieldsWithData(
new byte[] {(byte) 0x80}, "Hello".getBytes(StandardCharsets.UTF_8))
.andDataTypes(BYTES(), BYTES())
.testResult(
$("f0").tryCast(STRING()),
"TRY_CAST(f0 AS STRING)",
null,
STRING().nullable())
.testResult(
$("f1").tryCast(STRING()),
"TRY_CAST(f1 AS STRING)",
"Hello",
STRING().nullable()));
}

// --------------------------------------------------------------------------------------------
Expand Down
Loading