Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 272 additions & 9 deletions docs/content/release-notes/flink-2.3.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,282 @@ These release notes discuss important aspects, such as configuration, behavior o
that changed between Flink 2.2 and Flink 2.3. Please read these notes carefully if you are
planning to upgrade your Flink version to 2.3.

### Table SQL / API

### Core
#### FROM_CHANGELOG and TO_CHANGELOG built-in PTFs

#### Set security.ssl.algorithms default value to modern cipher suite
##### [FLINK-39258](https://issues.apache.org/jira/browse/FLINK-39258) (FLIP-564)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gustavodemorais , can you review this?


### [FLINK-39022](https://issues.apache.org/jira/browse/FLINK-39022)
The DataStream API has long offered `toChangelogStream()` and `fromChangelogStream()` for working
with changelog streams; Flink 2.3 brings equivalent functionality to SQL via two new built-in
Process Table Functions:

A JDK update (affecting JDK 11.0.30+, 17.0.18+, 21.0.10+, and 24+) disabled `TLS_RSA_*` cipher suites.
This was done to support forward-secrecy (RFC 9325) and comply with the IETF Draft on *Deprecating Obsolete Key Exchange Methods in TLS*.
- `FROM_CHANGELOG` converts an append-only stream that carries an operation column (and optional
before/after row descriptors) into a dynamic table. A configurable `op_mapping` makes it
straightforward to plug in custom CDC formats (e.g. Debezium-style `c`/`u`/`d` codes), and
`invalid_op_handling` (`FAIL`/`LOG`/`SKIP`) controls how rows with unmapped operation codes
are treated.
- `TO_CHANGELOG` is the inverse: it materializes a dynamic table back into an append-only
changelog stream. This is the first SQL-level operator that lets users convert
retract/upsert streams into append form, which is useful for archival, audit and writing to
append-only sinks. `produces_full_deletes` controls whether `-D` records carry the full row.

To support these and future JDK versions, the default value for the Flink configuration option `security.ssl.algorithms` has been changed to a modern, widely available cipher suite:
The two PTFs are designed to be symmetric, so `FROM_CHANGELOG(TO_CHANGELOG(table))` round-trips
correctly. Both support `PARTITION BY` for parallel execution and `uid` for query evolution, and
they expose a `state_ttl` parameter for state retention.

`TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384`
#### CREATE/ALTER for MATERIALIZED TABLE aligned with TABLE

This default provides strong security and wide compatibility. You can customize the cipher suites using the `security.ssl.algorithms` configuration option if your environment has different requirements.
If these cipher suites are not supported on your setup, you will see that Flink processes will not be able to connect to each other.
##### [FLINK-38673](https://issues.apache.org/jira/browse/FLINK-38673) (FLIP-550)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snuyanzin , can you review this?


The DDL surface of `MATERIALIZED TABLE` is brought to parity with regular tables. `CREATE
MATERIALIZED TABLE` now accepts an explicit column list (including watermarks and primary keys)
in front of the defining `AS` query. `ALTER MATERIALIZED TABLE` gains `ADD`, `MODIFY` and `DROP`
operations on metadata and computed columns, plus `RENAME TO`, allowing materialized tables to
evolve through the same workflow already used for regular Flink tables.

#### Granular control over data reprocessing during materialized table evolution

##### [FLINK-39301](https://issues.apache.org/jira/browse/FLINK-39301) (FLIP-557)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snuyanzin , can you review this?


When a materialized table's defining query is changed, Flink would previously always reprocess
historical data from the beginning. Flink 2.3 introduces an optional `START_MODE` clause on
`CREATE [OR ALTER]` and `ALTER MATERIALIZED TABLE`, letting users start the refresh pipeline
`FROM_BEGINNING`, `FROM_NOW[(interval)]`, `FROM_TIMESTAMP(timestamp)`, or resume from previous
offsets when available (`RESUME_OR_FROM_BEGINNING`/`RESUME_OR_FROM_NOW`/`RESUME_OR_FROM_TIMESTAMP`).
The default remains `FROM_BEGINNING` for backward compatibility.

#### ARTIFACT keyword in CREATE FUNCTION

##### [FLINK-39081](https://issues.apache.org/jira/browse/FLINK-39081) (FLIP-559)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@autophagy , can you review this?


The `USING` clause of `CREATE FUNCTION` accepts a new `ARTIFACT` keyword as an alternative to
`JAR`. `ARTIFACT` is intentionally generic so that future ecosystem assets (Python wheels, etc.)
can be referenced through the same syntax. Both keywords are interchangeable and may even be
mixed within a single statement; existing `USING JAR` syntax continues to work unchanged.

```sql
CREATE FUNCTION my_func AS 'com.example.MyUdf'
USING ARTIFACT 's3://bucket/path/my-udf.jar';
```

#### SinkUpsertMaterializer improvements and changelog disorder handling

##### [FLINK-38926](https://issues.apache.org/jira/browse/FLINK-38926) (FLIP-558)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dawidwys , @pnowojski , @twalthr , can you review this?


Flink 2.3 reworks how `SinkUpsertMaterializer` handles the case where a query's upsert key
differs from the sink's primary key. Previously this required maintaining the full history of
records and could blow up state. Two changes address this:

- A new `ON CONFLICT` clause with `DO NOTHING`, `DO ERROR` and `DO DEDUPLICATE` strategies makes
the behavior on key conflict explicit. By default, planning now fails when the upsert and
primary keys differ, requiring the user to choose a conflict strategy.
- Watermark-based record compaction is introduced to fix internal changelog disorder. The
trigger and frequency of compaction are controlled by:
- `table.exec.sink.upserts.compaction-mode` (default: `WATERMARK`) — `WATERMARK` or
`CHECKPOINT`.
- `table.exec.sink.upserts.compaction-interval` — optional fallback interval for emitting
watermarks when none arrive naturally.
Comment on lines +91 to +105
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the implementation sub-tasks, https://issues.apache.org/jira/browse/FLINK-38929, hasn't been done. How does that affect this feature?

Also, there's no documentation. Does it make sense to advertise this feature?

@twalthr please advise


#### Process Table Function enhancements

##### [FLINK-39254](https://issues.apache.org/jira/browse/FLINK-39254) (FLIP-565)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twalthr , can you review this?


Process Table Functions (PTFs), introduced in Flink 2.1, gain several capabilities aligning them
with the DataStream API:

- **Late data handling**: late records are no longer silently dropped; PTFs can react to them.
- **`ORDER BY` on table arguments**: `MyPtf(input => TABLE t PARTITION BY k ORDER BY ts)` lets a
PTF receive partitioned rows in deterministic temporal order.
- **`ValueView`**: a new lazy single-value state primitive (`value()`, `update()`, `isEmpty()`,
`clear()`) joins the existing `MapView` and `ListView` for working with single-element state
efficiently.
- **Broadcast state**: PTFs support broadcast state through the new
`@ArgumentHint(ArgumentTrait.BROADCAST_SEMANTIC_TABLE)` and `@StateHint(StateKind.BROADCAST)`
annotations, plus `@ArgumentHint(ArgumentTrait.NOTIFY_STATEFUL_SETS)` for re-evaluating keys
when broadcast state changes.
Comment on lines +117 to +123
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe either of these features made it into 2.3. Searching for ValueView and BROADCAST_SEMANTIC_TABLE in master found nothing, and none of the existing tickets refer to these features.


### Connectors

#### Flink Native S3 FileSystem

##### [FLINK-38592](https://issues.apache.org/jira/browse/FLINK-38592) (FLIP-555)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Samrat002 , can you review this?


Flink 2.3 introduces a new native S3 file system plugin (`flink-s3-fs-native`) implemented
directly on top of the AWS SDK v2, removing the Hadoop and Presto dependencies of the previous
S3 connectors. The unified plugin provides both `FileSystem` and `RecoverableWriter`
implementations (so streaming sinks retain exactly-once semantics), uses non-blocking I/O, and
natively supports modern AWS auth patterns such as IAM Roles for Service Accounts.

The plugin registers the standard `s3://` URI scheme and is deployed via the regular plugins
directory. Configuration uses a new `s3.*` namespace (e.g. `s3.region`, `s3.endpoint`,
`s3.path-style-access`, `s3.access-key`, `s3.secret-key`, `s3.upload.min.part.size`,
`s3.upload.max.concurrent.uploads`, `s3.bulk-copy.enabled`, `s3.async.enabled`,
`s3.read.buffer.size`, `s3.entropy.key`, plus SSE-KMS and chunked-encoding/checksum-validation
controls).

See the [Native S3 FileSystem documentation](https://nightlies.apache.org/flink/flink-docs-release-2.3/docs/deployment/filesystems/s3/)
for setup details.

### Runtime

#### Adaptive Partition Selection for StreamPartitioner

##### [FLINK-31655](https://issues.apache.org/jira/browse/FLINK-31655) (FLIP-339)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RocMarshal , can you review this?


When upstream and downstream parallelism differ, Flink uses `RebalancePartitioner`, which selects
target channels round-robin. For jobs that interact with external RPC services (Redis, HBase,
LLM serving, etc.) round-robin selection causes severe backpressure as soon as a single
downstream subtask slows down — the partitioner keeps feeding it new data even though it is
already overloaded. Flink 2.3 adds an adaptive, load-aware partition-selection mode for
`StreamPartitioner` that routes records to the least-loaded downstream channel instead. In
benchmarks, this delivers up to ~3x throughput improvement under skewed downstream processing.
The feature is opt-in via two new options:

- `taskmanager.network.adaptive-partitioner.enabled` (default: `false`)
- `taskmanager.network.adaptive-partitioner.max-traverse-size` (default: `4`) — number of
channels examined when selecting the idlest target.

#### AdaptiveScheduler rescale history

##### [FLINK-38333](https://issues.apache.org/jira/browse/FLINK-38333) (FLIP-495)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RocMarshal , can you review this?


Streaming jobs running with the adaptive scheduler now record a history of rescale events,
including job-vertex parallelisms, slot allocations, scheduler-state transitions and termination
reasons. Events are kept in memory and on disk following the existing `ExecutionGraphInfoStore`
pattern. The same data is available through new REST endpoints:

- `/jobs/:jobid/rescales/overview`
- `/jobs/:jobid/rescales/history`
- `/jobs/:jobid/rescales/details/:rescaleuuid`
- `/jobs/:jobid/rescales/summary`

The feature is controlled by:

- `web.adaptive-scheduler.rescale-history.size` (default: `0`) — maximum number of rescale
records retained per job. Setting `0` disables the feature.

#### Web UI for AdaptiveScheduler rescale history

##### [FLINK-22258](https://issues.apache.org/jira/browse/FLINK-22258) (FLIP-487)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RocMarshal , can you review this?


Building on FLIP-495, the Flink Web UI gains a new "Rescales" tab for streaming jobs running
with the adaptive scheduler. Subpages expose rescale counts, the latest events, a historical
timeline, duration statistics with percentiles, per-event details, and the adaptive scheduler
configuration in effect. The existing `/jobs/overview` endpoint is extended with `schedulerType`
and `jobType` fields so the UI can render adaptive-scheduler-specific information.

See the [Elastic Scaling documentation](https://nightlies.apache.org/flink/flink-docs-release-2.3/docs/deployment/elastic_scaling/)
for details.

#### Checkpointing during recovery

##### [FLINK-35761](https://issues.apache.org/jira/browse/FLINK-35761) (FLIP-547)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui , can you review this?


When restoring from an unaligned checkpoint, a task previously stayed in `INITIALIZING` until
*all* recovered input buffers had been processed before transitioning to `RUNNING`. For
high-parallelism jobs with large numbers of in-flight buffers, this could leave the task unable
to checkpoint for tens of minutes (30+ minutes in real cases), so any subsequent failure had to
restart from the original checkpoint. Flink 2.3 fixes this by transitioning to `RUNNING` as
soon as all input buffers have been added to `RecoveredInputChannel` and by allowing the
recovered input channels to be snapshotted, so a new unaligned checkpoint can be taken during
recovery. Two opt-in options are introduced:

- `execution.checkpointing.unaligned.recover-output-on-downstream.enabled` (default: `false`)
- `execution.checkpointing.unaligned.during-recovery.enabled` (default: `false`; requires the
option above to be enabled).

#### Application Management

##### [FLINK-38755](https://issues.apache.org/jira/browse/FLINK-38755) (FLIP-549)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eemario , can you review this?


Flink 2.3 introduces a first-class **application** concept that sits above jobs and unifies the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this relate to the application deployment mode that we've had for years now? I.e., https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/deployment/overview/#application-mode

behavior of user code across deployment modes. The cluster-job model is replaced by a
cluster-application-job hierarchy, with two backing implementations
(`PackagedProgramApplication` and `SingleJobApplication`). Application archives are organized
by cluster and application IDs.

New REST APIs:

- `GET /applications/overview` — list applications.
- `GET /applications/:applicationid` — application details.
- `POST /applications/:applicationid/cancel` — cancel an application.
- `POST /jars/:jarid/run-application` — submit an application asynchronously.

New configuration options:

- `execution.terminate-application-on-any-job-terminated-exceptionally` (default: `true`).
- `cluster.id` (default: all-zero UUID).
- `historyserver.archive.clean-expired-applications` (default: `false`).
- `historyserver.archive.retained-applications` (default: `-1`).

The Web UI gains an Applications tab and a redesigned home page; jobs link back to the
application that owns them. See the
[Application Lifecycle documentation](https://nightlies.apache.org/flink/flink-docs-release-2.3/docs/internals/application_lifecycle/)
for details.

#### Application Capability Enhancement

##### [FLINK-38972](https://issues.apache.org/jira/browse/FLINK-38972) (FLIP-560)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eemario , can you review this?


Building on FLIP-549, FLIP-560 hardens the application layer for high availability and improves
error visibility:

- Job-name-based matching enables correct recovery of multi-job applications after a JobManager
failure; job recovery is deferred until the user `main` method reaches the corresponding
submission point.
- Applications can execute zero or multiple jobs (with some limitations for streaming jobs and
certain APIs).
- A new `ApplicationStore` and `ApplicationResultStore` persist application metadata and
cleanup state across HA recovery.
- Application-level exceptions (errors thrown by user `main` code, not just job execution) are
exposed through a new REST endpoint and Web UI subpage.

New REST APIs:

- `GET /applications/:applicationid/exceptions`
- `GET /applications/:applicationid/jobmanager/config`

New configuration options:

- `application-result-store.storage-path`
- `application-result-store.delete-on-commit` (default: `true`)

The application page in the Web UI now shows start/end times and duration, a cancel action,
log links, and a new exceptions subpage.

#### Robust OTel gRPC metric exporter

##### [FLINK-38603](https://issues.apache.org/jira/browse/FLINK-38603) (FLIP-553)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Izeren , can you review this?


Jobs with large numbers of tasks and operators can produce metric payloads big enough for the
OTel gRPC backend to reject them, causing exported metric data to be dropped in production. The
existing exporter had three concrete limitations: gzip compression was not exposed in Flink
configuration, all data points went out in a single gRPC call without pagination, and metric
attributes such as `task_name` could grow to contain hundreds of operator names and bloat the
payload further. Flink 2.3 adds three opt-in robustness features to address these (all backward
compatible):

- `metrics.reporter.otel.exporter.compression` — `gzip` or `none` (default).
- `metrics.reporter.otel.batch.size` — split a single export into multiple gRPC calls; default
`0` (disabled).
- `metrics.reporter.otel.transform.attribute-value-length-limits.<attribute_name>` — per-attribute
truncation; `*` applies a global default.

### Documentation

#### Documentation restructure

##### [FLINK-38945](https://issues.apache.org/jira/browse/FLINK-38945) (FLIP-561)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MartijnVisser , can you review this part?


The Flink documentation has been reorganized to make navigation easier. Highlights:

- Flink SQL gets a dedicated top-level section, separated from the Table API.
- Relational streaming concepts (changelogs, dynamic tables, state, etc.) are promoted to a
top-level Concepts section.
- Python documentation is integrated into the relevant API sections instead of living in a
standalone area.
- Contributor-facing content has been relocated outside the main user-facing docs.

Existing URLs continue to work via redirects. The top-level structure is documented on the
[docs landing page](https://nightlies.apache.org/flink/flink-docs-release-2.3/).