From d30a71881883d7f72ba38df5ef824d9a00c899f7 Mon Sep 17 00:00:00 2001 From: Han You Date: Wed, 29 Apr 2026 13:09:52 -0500 Subject: [PATCH] Flink: Allow custom writer factory in DynamicIcebergSink Introduce DynamicTaskWriterFactoryProvider so callers can supply a custom TaskWriterFactory in place of the default RowDataTaskWriterFactory, while reusing the surrounding table, schema, partition spec, and write-property resolution already done in DynamicWriter. The primary motivation is throughput. Our pipelines have a data pattern tied deeply into business logic that a hand-rolled TaskWriter can exploit to produce files far faster than the generic RowDataTaskWriterFactory. Making the factory pluggable also enables other use cases without forking the sink: - Row-level or file-level audit and metrics: sampling, lineage stamps, metric counters layered around the writer. - Custom file naming and layout: custom prefixes, alternative partition paths, custom filesystem properties such as storage class and permissions. The default provider preserves existing behavior, so callers that do not supply one are unaffected. --- docs/docs/flink-writes.md | 1 + .../sink/dynamic/DynamicIcebergSink.java | 35 +++++++- .../DynamicTaskWriterFactoryProvider.java | 73 ++++++++++++++++ .../flink/sink/dynamic/DynamicWriter.java | 17 ++-- .../sink/dynamic/TestDynamicIcebergSink.java | 1 + .../flink/sink/dynamic/TestDynamicWriter.java | 83 +++++++++++++++---- 6 files changed, 185 insertions(+), 25 deletions(-) create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTaskWriterFactoryProvider.java diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 03795b5beed0..bbf84e2d2cdd 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -550,6 +550,7 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are | `setAll(Map properties)` | Set multiple properties at once | | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. | | `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). | +| `taskWriterFactoryProvider(DynamicTaskWriterFactoryProvider provider)` | Supply a custom `TaskWriterFactory` per write target in place of the default `RowDataTaskWriterFactory`. Useful for throughput-oriented writers that exploit known data pattern, audit/metric layering, custom file naming/layout/properties. | ### Distribution Modes diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index ad430cbf13f8..ade758c1321a 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -79,6 +79,7 @@ public class DynamicIcebergSink private final Map writeProperties; private final Configuration flinkConfig; private final int cacheMaximumSize; + private final DynamicTaskWriterFactoryProvider taskWriterFactoryProvider; // Set by the builder before sinkTo() — forward writer results to union into pre-commit topology private final transient DataStream> forwardWriteResults; @@ -90,6 +91,7 @@ public class DynamicIcebergSink Map writeProperties, Configuration flinkConfig, int cacheMaximumSize, + DynamicTaskWriterFactoryProvider taskWriterFactoryProvider, DataStream> forwardWriteResults) { this.catalogLoader = catalogLoader; this.snapshotProperties = snapshotProperties; @@ -97,6 +99,7 @@ public class DynamicIcebergSink this.writeProperties = writeProperties; this.flinkConfig = flinkConfig; this.cacheMaximumSize = cacheMaximumSize; + this.taskWriterFactoryProvider = taskWriterFactoryProvider; // We generate a random UUID every time when a sink is created. // This is used to separate files generated by different sinks writing the same table. // Also used to generate the aggregator operator name @@ -113,7 +116,8 @@ public SinkWriter createWriter(WriterInitContext context) cacheMaximumSize, new DynamicWriterMetrics(context.metricGroup()), context.getTaskInfo().getIndexOfThisSubtask(), - context.getTaskInfo().getAttemptNumber()); + context.getTaskInfo().getAttemptNumber(), + taskWriterFactoryProvider); } @Override @@ -190,16 +194,19 @@ static class ForwardWriterSink private final Map writeProperties; private final Configuration flinkConfig; private final int cacheMaximumSize; + private final DynamicTaskWriterFactoryProvider taskWriterFactoryProvider; ForwardWriterSink( CatalogLoader catalogLoader, Map writeProperties, Configuration flinkConfig, - int cacheMaximumSize) { + int cacheMaximumSize, + DynamicTaskWriterFactoryProvider taskWriterFactoryProvider) { this.catalogLoader = catalogLoader; this.writeProperties = writeProperties; this.flinkConfig = flinkConfig; this.cacheMaximumSize = cacheMaximumSize; + this.taskWriterFactoryProvider = taskWriterFactoryProvider; } @Override @@ -211,7 +218,8 @@ public SinkWriter createWriter(WriterInitContext context) cacheMaximumSize, new DynamicWriterMetrics(context.metricGroup()), context.getTaskInfo().getIndexOfThisSubtask(), - context.getTaskInfo().getAttemptNumber()); + context.getTaskInfo().getAttemptNumber(), + taskWriterFactoryProvider); } @Override @@ -235,6 +243,8 @@ public static class Builder { private final Map snapshotSummary = Maps.newHashMap(); private ReadableConfig readableConfig = new Configuration(); private TableCreator tableCreator = TableCreator.DEFAULT; + private DynamicTaskWriterFactoryProvider taskWriterFactoryProvider = + DynamicTaskWriterFactoryProvider.DEFAULT; Builder() {} @@ -413,6 +423,18 @@ public Builder caseSensitive(boolean newCaseSensitive) { return this; } + /** + * Provide a custom {@link DynamicTaskWriterFactoryProvider} used to build the underlying {@link + * org.apache.iceberg.flink.sink.TaskWriterFactory} for each write target. When omitted, {@link + * DynamicTaskWriterFactoryProvider#DEFAULT} is used, which produces the standard {@link + * org.apache.iceberg.flink.sink.RowDataTaskWriterFactory}. + */ + public Builder taskWriterFactoryProvider(DynamicTaskWriterFactoryProvider provider) { + Preconditions.checkNotNull(provider, "Task writer factory provider shouldn't be null"); + this.taskWriterFactoryProvider = provider; + return this; + } + private String operatorName(String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } @@ -432,7 +454,11 @@ private DynamicIcebergSink build( // Forward writer: chained with generator via forward edge, no data shuffle ForwardWriterSink forwardWriterSink = new ForwardWriterSink( - catalogLoader, writeOptions, flinkConfig, flinkDynamicSinkConf.cacheMaxSize()); + catalogLoader, + writeOptions, + flinkConfig, + flinkDynamicSinkConf.cacheMaxSize(), + taskWriterFactoryProvider); TypeInformation> writeResultTypeInfo = CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new); @@ -465,6 +491,7 @@ DynamicIcebergSink instantiateSink( writeProperties, flinkWriteConf, flinkDynamicSinkConf.cacheMaxSize(), + taskWriterFactoryProvider, forwardWriteResults); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTaskWriterFactoryProvider.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTaskWriterFactoryProvider.java new file mode 100644 index 000000000000..9a0baaad86c4 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTaskWriterFactoryProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.TaskWriterFactory; + +/** + * Pluggable provider that creates a {@link TaskWriterFactory} for a given write target. {@link + * DynamicWriter} resolves the table, schema, partition spec, write properties and equality fields + * before delegating final factory construction here, allowing callers to swap in a custom + * implementation while keeping the surrounding validation and configuration logic intact. + */ +@FunctionalInterface +public interface DynamicTaskWriterFactoryProvider extends Serializable { + + TaskWriterFactory create( + Table table, + RowType flinkSchema, + long targetFileSizeBytes, + FileFormat format, + Map writeProperties, + List equalityFieldIds, + boolean upsertMode, + Schema schema, + PartitionSpec spec); + + DynamicTaskWriterFactoryProvider DEFAULT = + (table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsertMode, + schema, + spec) -> + new RowDataTaskWriterFactory( + () -> table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsertMode, + schema, + spec); +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index fcd0d082704a..e112e699bf10 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -36,8 +36,8 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkWriteConf; -import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; import org.apache.iceberg.flink.sink.SinkUtil; +import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -56,7 +56,7 @@ class DynamicWriter implements CommittingSinkWriter taskWriterFactories; + private final Map> taskWriterFactories; private final Map> writers; private final Configuration flinkConfig; private final Map commonWriteProperties; @@ -64,6 +64,7 @@ class DynamicWriter implements CommittingSinkWriter(cacheMaximumSize); this.writers = Maps.newHashMap(); @@ -98,7 +101,7 @@ public void write(DynamicRecordInternal element, Context context) element.upsertMode(), element.equalityFields()), writerKey -> { - RowDataTaskWriterFactory taskWriterFactory = + TaskWriterFactory taskWriterFactory = taskWriterFactories.computeIfAbsent( writerKey, factoryKey -> { @@ -130,8 +133,8 @@ public void write(DynamicRecordInternal element, Context context) flinkWriteConf.dataFileFormat(), flinkWriteConf, table); LOG.debug("Creating new writer factory for table '{}'", table.name()); - return new RowDataTaskWriterFactory( - () -> table, + return taskWriterFactoryProvider.create( + table, FlinkSchemaUtil.convert(element.schema()), flinkWriteConf.targetDataFileSize(), flinkWriteConf.dataFileFormat(), @@ -217,7 +220,7 @@ DynamicWriterMetrics getMetrics() { } @VisibleForTesting - Map getTaskWriterFactories() { + Map> getTaskWriterFactories() { return taskWriterFactories; } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 89befb9e8ea2..3302e22fbfc9 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -1644,6 +1644,7 @@ static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { writeProperties, flinkConfig, 100, + DynamicTaskWriterFactoryProvider.DEFAULT, forwardWritten); this.commitHook = commitHook; this.overwriteMode = new FlinkWriteConf(writeProperties, flinkConfig).overwriteMode(); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java index f604f639f217..0903ca9030f7 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java @@ -25,6 +25,7 @@ import java.net.URI; import java.util.Collection; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -162,7 +163,8 @@ void testFlinkConfigOverridesTableProperties() throws Exception { 100, new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), 0, - 0); + 0, + DynamicTaskWriterFactoryProvider.DEFAULT); DynamicRecordInternal record1 = getDynamicRecordInternal(table1); dynamicWriter.write(record1, null); @@ -188,7 +190,8 @@ void testWritePropertiesOverrideFlinkConfig() throws Exception { 100, new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), 0, - 0); + 0, + DynamicTaskWriterFactoryProvider.DEFAULT); DynamicRecordInternal record1 = getDynamicRecordInternal(table1); dynamicWriter.write(record1, null); @@ -214,7 +217,8 @@ void testFlinkConfigFileFormat() throws Exception { 100, new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), 0, - 0); + 0, + DynamicTaskWriterFactoryProvider.DEFAULT); DynamicRecordInternal record1 = getDynamicRecordInternal(table1); dynamicWriter.write(record1, null); @@ -243,7 +247,8 @@ void testFlinkConfigTargetFileSize() throws Exception { 100, new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), 0, - 0); + 0, + DynamicTaskWriterFactoryProvider.DEFAULT); DynamicRecordInternal record1 = getDynamicRecordInternal(table1); dynamicWriter.write(record1, null); @@ -326,18 +331,68 @@ void testUniqueFileSuffixOnFactoryRecreation() throws Exception { assertThat(firstFile.getName()).isNotEqualTo(secondFile.getName()); } + @Test + void testCustomTaskWriterFactoryProvider() throws Exception { + Catalog catalog = CATALOG_EXTENSION.catalog(); + Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA); + Table table2 = catalog.createTable(TABLE2, SimpleDataUtil.SCHEMA); + + AtomicInteger providerInvocations = new AtomicInteger(); + DynamicTaskWriterFactoryProvider countingProvider = + (table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsertMode, + schema, + spec) -> { + providerInvocations.incrementAndGet(); + return DynamicTaskWriterFactoryProvider.DEFAULT.create( + table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsertMode, + schema, + spec); + }; + + DynamicWriter dynamicWriter = createDynamicWriter(catalog, Map.of(), countingProvider); + + dynamicWriter.write(getDynamicRecordInternal(table1), null); + dynamicWriter.write(getDynamicRecordInternal(table2), null); + Collection writeResults = dynamicWriter.prepareCommit(); + + assertThat(providerInvocations).hasValue(2); + assertThat(writeResults).hasSize(2); + assertThat(getNumDataFiles(table1)).isEqualTo(1); + assertThat(getNumDataFiles(table2)).isEqualTo(1); + + dynamicWriter.close(); + } + private static @Nonnull DynamicWriter createDynamicWriter( Catalog catalog, Map properties) { - DynamicWriter dynamicWriter = - new DynamicWriter( - catalog, - properties, - new Configuration(), - 100, - new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), - 0, - 0); - return dynamicWriter; + return createDynamicWriter(catalog, properties, DynamicTaskWriterFactoryProvider.DEFAULT); + } + + private static @Nonnull DynamicWriter createDynamicWriter( + Catalog catalog, + Map properties, + DynamicTaskWriterFactoryProvider taskWriterFactoryProvider) { + return new DynamicWriter( + catalog, + properties, + new Configuration(), + 100, + new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), + 0, + 0, + taskWriterFactoryProvider); } private static @Nonnull DynamicWriter createDynamicWriter(Catalog catalog) {