diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 7be13dae4525..293feb5947d2 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -552,6 +552,7 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are | `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). | | `shuffleSinkSlotSharingGroup(String ssg)` | Name of the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the shuffle sink. Register the group with its resource spec on the `StreamExecutionEnvironment` via `env.registerSlotSharingGroup(...)`. | | `generatorSlotSharingGroup(String ssg)` | Name of the [slot sharing group](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/finegrained_resource/) for the generator (and forward sink chained to it). Register the group with its resource spec on the `StreamExecutionEnvironment` via `env.registerSlotSharingGroup(...)`. | +| `taskWriterFactoryProvider(DynamicTaskWriterFactoryProvider provider)` | Supply a custom `TaskWriterFactory` per write target in place of the default `RowDataTaskWriterFactory`. Useful for throughput-oriented writers that exploit known data pattern, audit/metric layering, custom file naming/layout/properties. | ### Distribution Modes diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index c8f9fece2392..f02226bf3703 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -79,6 +79,7 @@ public class DynamicIcebergSink private final Map writeProperties; private final Configuration flinkConfig; private final int cacheMaximumSize; + private final DynamicTaskWriterFactoryProvider taskWriterFactoryProvider; // Set by the builder before sinkTo() — forward writer results to union into pre-commit topology private final transient DataStream> forwardWriteResults; @@ -90,6 +91,7 @@ public class DynamicIcebergSink Map writeProperties, Configuration flinkConfig, int cacheMaximumSize, + DynamicTaskWriterFactoryProvider taskWriterFactoryProvider, DataStream> forwardWriteResults) { this.catalogLoader = catalogLoader; this.snapshotProperties = snapshotProperties; @@ -97,6 +99,7 @@ public class DynamicIcebergSink this.writeProperties = writeProperties; this.flinkConfig = flinkConfig; this.cacheMaximumSize = cacheMaximumSize; + this.taskWriterFactoryProvider = taskWriterFactoryProvider; // We generate a random UUID every time when a sink is created. // This is used to separate files generated by different sinks writing the same table. // Also used to generate the aggregator operator name @@ -113,7 +116,8 @@ public SinkWriter createWriter(WriterInitContext context) cacheMaximumSize, new DynamicWriterMetrics(context.metricGroup()), context.getTaskInfo().getIndexOfThisSubtask(), - context.getTaskInfo().getAttemptNumber()); + context.getTaskInfo().getAttemptNumber(), + taskWriterFactoryProvider); } @Override @@ -190,16 +194,19 @@ static class ForwardWriterSink private final Map writeProperties; private final Configuration flinkConfig; private final int cacheMaximumSize; + private final DynamicTaskWriterFactoryProvider taskWriterFactoryProvider; ForwardWriterSink( CatalogLoader catalogLoader, Map writeProperties, Configuration flinkConfig, - int cacheMaximumSize) { + int cacheMaximumSize, + DynamicTaskWriterFactoryProvider taskWriterFactoryProvider) { this.catalogLoader = catalogLoader; this.writeProperties = writeProperties; this.flinkConfig = flinkConfig; this.cacheMaximumSize = cacheMaximumSize; + this.taskWriterFactoryProvider = taskWriterFactoryProvider; } @Override @@ -211,7 +218,8 @@ public SinkWriter createWriter(WriterInitContext context) cacheMaximumSize, new DynamicWriterMetrics(context.metricGroup()), context.getTaskInfo().getIndexOfThisSubtask(), - context.getTaskInfo().getAttemptNumber()); + context.getTaskInfo().getAttemptNumber(), + taskWriterFactoryProvider); } @Override @@ -235,6 +243,8 @@ public static class Builder { private final Map snapshotSummary = Maps.newHashMap(); private ReadableConfig readableConfig = new Configuration(); private TableCreator tableCreator = TableCreator.DEFAULT; + private DynamicTaskWriterFactoryProvider taskWriterFactoryProvider = + DynamicTaskWriterFactoryProvider.DEFAULT; Builder() {} @@ -435,6 +445,18 @@ public Builder caseSensitive(boolean newCaseSensitive) { return this; } + /** + * Provide a custom {@link DynamicTaskWriterFactoryProvider} used to build the underlying {@link + * org.apache.iceberg.flink.sink.TaskWriterFactory} for each write target. When omitted, {@link + * DynamicTaskWriterFactoryProvider#DEFAULT} is used, which produces the standard {@link + * org.apache.iceberg.flink.sink.RowDataTaskWriterFactory}. + */ + public Builder taskWriterFactoryProvider(DynamicTaskWriterFactoryProvider provider) { + Preconditions.checkNotNull(provider, "Task writer factory provider shouldn't be null"); + this.taskWriterFactoryProvider = provider; + return this; + } + private String operatorName(String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } @@ -454,7 +476,11 @@ private DynamicIcebergSink build( // Forward writer: chained with generator via forward edge, no data shuffle ForwardWriterSink forwardWriterSink = new ForwardWriterSink( - catalogLoader, writeOptions, flinkConfig, flinkDynamicSinkConf.cacheMaxSize()); + catalogLoader, + writeOptions, + flinkConfig, + flinkDynamicSinkConf.cacheMaxSize(), + taskWriterFactoryProvider); TypeInformation> writeResultTypeInfo = CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new); @@ -492,6 +518,7 @@ DynamicIcebergSink instantiateSink( writeProperties, flinkWriteConf, flinkDynamicSinkConf.cacheMaxSize(), + taskWriterFactoryProvider, forwardWriteResults); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTaskWriterFactoryProvider.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTaskWriterFactoryProvider.java new file mode 100644 index 000000000000..9a0baaad86c4 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTaskWriterFactoryProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.flink.sink.TaskWriterFactory; + +/** + * Pluggable provider that creates a {@link TaskWriterFactory} for a given write target. {@link + * DynamicWriter} resolves the table, schema, partition spec, write properties and equality fields + * before delegating final factory construction here, allowing callers to swap in a custom + * implementation while keeping the surrounding validation and configuration logic intact. + */ +@FunctionalInterface +public interface DynamicTaskWriterFactoryProvider extends Serializable { + + TaskWriterFactory create( + Table table, + RowType flinkSchema, + long targetFileSizeBytes, + FileFormat format, + Map writeProperties, + List equalityFieldIds, + boolean upsertMode, + Schema schema, + PartitionSpec spec); + + DynamicTaskWriterFactoryProvider DEFAULT = + (table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsertMode, + schema, + spec) -> + new RowDataTaskWriterFactory( + () -> table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsertMode, + schema, + spec); +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index fcd0d082704a..e112e699bf10 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -36,8 +36,8 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkWriteConf; -import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; import org.apache.iceberg.flink.sink.SinkUtil; +import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -56,7 +56,7 @@ class DynamicWriter implements CommittingSinkWriter taskWriterFactories; + private final Map> taskWriterFactories; private final Map> writers; private final Configuration flinkConfig; private final Map commonWriteProperties; @@ -64,6 +64,7 @@ class DynamicWriter implements CommittingSinkWriter(cacheMaximumSize); this.writers = Maps.newHashMap(); @@ -98,7 +101,7 @@ public void write(DynamicRecordInternal element, Context context) element.upsertMode(), element.equalityFields()), writerKey -> { - RowDataTaskWriterFactory taskWriterFactory = + TaskWriterFactory taskWriterFactory = taskWriterFactories.computeIfAbsent( writerKey, factoryKey -> { @@ -130,8 +133,8 @@ public void write(DynamicRecordInternal element, Context context) flinkWriteConf.dataFileFormat(), flinkWriteConf, table); LOG.debug("Creating new writer factory for table '{}'", table.name()); - return new RowDataTaskWriterFactory( - () -> table, + return taskWriterFactoryProvider.create( + table, FlinkSchemaUtil.convert(element.schema()), flinkWriteConf.targetDataFileSize(), flinkWriteConf.dataFileFormat(), @@ -217,7 +220,7 @@ DynamicWriterMetrics getMetrics() { } @VisibleForTesting - Map getTaskWriterFactories() { + Map> getTaskWriterFactories() { return taskWriterFactories; } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 3ba579df490b..1f4ee9468529 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -1707,6 +1707,7 @@ static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { writeProperties, flinkConfig, 100, + DynamicTaskWriterFactoryProvider.DEFAULT, forwardWritten); this.commitHook = commitHook; this.overwriteMode = new FlinkWriteConf(writeProperties, flinkConfig).overwriteMode(); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java index f604f639f217..0903ca9030f7 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java @@ -25,6 +25,7 @@ import java.net.URI; import java.util.Collection; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -162,7 +163,8 @@ void testFlinkConfigOverridesTableProperties() throws Exception { 100, new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), 0, - 0); + 0, + DynamicTaskWriterFactoryProvider.DEFAULT); DynamicRecordInternal record1 = getDynamicRecordInternal(table1); dynamicWriter.write(record1, null); @@ -188,7 +190,8 @@ void testWritePropertiesOverrideFlinkConfig() throws Exception { 100, new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), 0, - 0); + 0, + DynamicTaskWriterFactoryProvider.DEFAULT); DynamicRecordInternal record1 = getDynamicRecordInternal(table1); dynamicWriter.write(record1, null); @@ -214,7 +217,8 @@ void testFlinkConfigFileFormat() throws Exception { 100, new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), 0, - 0); + 0, + DynamicTaskWriterFactoryProvider.DEFAULT); DynamicRecordInternal record1 = getDynamicRecordInternal(table1); dynamicWriter.write(record1, null); @@ -243,7 +247,8 @@ void testFlinkConfigTargetFileSize() throws Exception { 100, new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), 0, - 0); + 0, + DynamicTaskWriterFactoryProvider.DEFAULT); DynamicRecordInternal record1 = getDynamicRecordInternal(table1); dynamicWriter.write(record1, null); @@ -326,18 +331,68 @@ void testUniqueFileSuffixOnFactoryRecreation() throws Exception { assertThat(firstFile.getName()).isNotEqualTo(secondFile.getName()); } + @Test + void testCustomTaskWriterFactoryProvider() throws Exception { + Catalog catalog = CATALOG_EXTENSION.catalog(); + Table table1 = catalog.createTable(TABLE1, SimpleDataUtil.SCHEMA); + Table table2 = catalog.createTable(TABLE2, SimpleDataUtil.SCHEMA); + + AtomicInteger providerInvocations = new AtomicInteger(); + DynamicTaskWriterFactoryProvider countingProvider = + (table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsertMode, + schema, + spec) -> { + providerInvocations.incrementAndGet(); + return DynamicTaskWriterFactoryProvider.DEFAULT.create( + table, + flinkSchema, + targetFileSizeBytes, + format, + writeProperties, + equalityFieldIds, + upsertMode, + schema, + spec); + }; + + DynamicWriter dynamicWriter = createDynamicWriter(catalog, Map.of(), countingProvider); + + dynamicWriter.write(getDynamicRecordInternal(table1), null); + dynamicWriter.write(getDynamicRecordInternal(table2), null); + Collection writeResults = dynamicWriter.prepareCommit(); + + assertThat(providerInvocations).hasValue(2); + assertThat(writeResults).hasSize(2); + assertThat(getNumDataFiles(table1)).isEqualTo(1); + assertThat(getNumDataFiles(table2)).isEqualTo(1); + + dynamicWriter.close(); + } + private static @Nonnull DynamicWriter createDynamicWriter( Catalog catalog, Map properties) { - DynamicWriter dynamicWriter = - new DynamicWriter( - catalog, - properties, - new Configuration(), - 100, - new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), - 0, - 0); - return dynamicWriter; + return createDynamicWriter(catalog, properties, DynamicTaskWriterFactoryProvider.DEFAULT); + } + + private static @Nonnull DynamicWriter createDynamicWriter( + Catalog catalog, + Map properties, + DynamicTaskWriterFactoryProvider taskWriterFactoryProvider) { + return new DynamicWriter( + catalog, + properties, + new Configuration(), + 100, + new DynamicWriterMetrics(UnregisteredMetricsGroup.createSinkWriterMetricGroup()), + 0, + 0, + taskWriterFactoryProvider); } private static @Nonnull DynamicWriter createDynamicWriter(Catalog catalog) {