@@ -7,13 +7,17 @@ package io.airbyte.cdk.load.toolkits.iceberg.parquet
77import io.airbyte.cdk.ConfigErrorException
88import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.PARENT_CHILD_SEPARATOR
99import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.splitIntoParentAndLeaf
10+ import io.github.oshai.kotlinlogging.KotlinLogging
1011import jakarta.inject.Singleton
1112import org.apache.iceberg.Schema
13+ import org.apache.iceberg.SortDirection
1214import org.apache.iceberg.Table
1315import org.apache.iceberg.UpdateSchema
1416import org.apache.iceberg.types.Type
1517import org.apache.iceberg.types.Type.PrimitiveType
1618
19+ private val logger = KotlinLogging .logger {}
20+
1721/* * Describes how the [IcebergTableSynchronizer] handles column type changes. */
1822enum class ColumnTypeChangeBehavior {
1923 /* *
@@ -86,6 +90,27 @@ class IcebergTableSynchronizer(
8690 return SchemaUpdateResult (existingSchema, pendingUpdates = emptyList())
8791 }
8892
93+ // Update the sort order before creating the UpdateSchema, because:
94+ // 1. Deleting a column referenced by the sort order will cause
95+ // SortOrder.checkCompatibility to throw ValidationException on commit.
96+ // 2. UpdateSchema captures the table's metadata version at creation time.
97+ // If we replace the sort order after creating it, the commit would fail
98+ // with a stale metadata error.
99+ val columnsBeingDeleted = buildList {
100+ addAll(diff.removedColumns)
101+ if (columnTypeChangeBehavior == ColumnTypeChangeBehavior .OVERWRITE ) {
102+ // In OVERWRITE mode, type-changed columns are deleted and re-added
103+ // with new field IDs. The old sort field references become invalid.
104+ addAll(diff.updatedDataTypes)
105+ }
106+ }
107+ replaceSortOrderIfNeeded(
108+ table = table,
109+ columnsBeingDeleted = columnsBeingDeleted,
110+ identifierFieldsChanged = diff.identifierFieldsChanged,
111+ incomingIdentifierFieldNames = incomingSchema.identifierFieldNames(),
112+ )
113+
89114 val update: UpdateSchema = table.updateSchema().allowIncompatibleChanges()
90115
91116 // 1) Remove columns that no longer exist in the incoming schema
@@ -267,6 +292,90 @@ class IcebergTableSynchronizer(
267292 return SchemaUpdateResult (newSchema, pendingUpdates = listOf (update))
268293 }
269294 }
295+
296+ /* *
297+ * Update the table's sort order if it would conflict with pending schema changes.
298+ *
299+ * Sort orders are set at table creation from identifier fields (PKs) and never updated. This
300+ * causes [org.apache.iceberg.exceptions.ValidationException] when schema evolution deletes a
301+ * column referenced by the sort order.
302+ *
303+ * This method handles three cases:
304+ * 1. Identifier fields changed → rebuild sort order from new identifiers (covers
305+ * ```
306+ * Dedupe→Append, PK changes within Dedupe)
307+ * ```
308+ * 2. Columns being deleted conflict with sort order → remove those fields
309+ * 3. Neither → no-op
310+ *
311+ * Must be called BEFORE creating the [UpdateSchema], since this commits a metadata change and
312+ * the subsequent UpdateSchema needs the refreshed metadata version.
313+ */
314+ private fun replaceSortOrderIfNeeded (
315+ table : Table ,
316+ columnsBeingDeleted : List <String >,
317+ identifierFieldsChanged : Boolean ,
318+ incomingIdentifierFieldNames : Set <String >,
319+ ) {
320+ val currentSortOrder = table.sortOrder()
321+
322+ // If the table has no sort order, there's nothing to conflict and nothing to update.
323+ // (Append→Dedupe would need a sort order added, but that case requires a reset.)
324+ if (currentSortOrder.isUnsorted) {
325+ return
326+ }
327+
328+ if (identifierFieldsChanged) {
329+ // Rebuild sort order from the new identifier fields.
330+ // For Dedupe→Append: incoming identifiers are empty → unsorted.
331+ // For PK changes within Dedupe: new identifiers → new sort order.
332+ val builder = table.replaceSortOrder()
333+ for (fieldName in incomingIdentifierFieldNames) {
334+ // Only include fields that exist in the current schema. Fields being
335+ // added in the same schema change can't be referenced yet.
336+ if (table.schema().findField(fieldName) != null ) {
337+ builder.asc(fieldName)
338+ }
339+ }
340+ logger.info {
341+ " Replacing sort order due to identifier field change. " +
342+ " New sort fields: ${incomingIdentifierFieldNames.ifEmpty { setOf (" (unsorted)" ) }} "
343+ }
344+ builder.commit()
345+ table.refresh()
346+ return
347+ }
348+
349+ // No identifier change — check if any deleted columns conflict with the sort order.
350+ if (columnsBeingDeleted.isEmpty()) {
351+ return
352+ }
353+
354+ val schema = table.schema()
355+ val fieldIdsBeingDeleted =
356+ columnsBeingDeleted.mapNotNull { schema.findField(it)?.fieldId() }.toSet()
357+
358+ val hasConflict = currentSortOrder.fields().any { it.sourceId() in fieldIdsBeingDeleted }
359+ if (! hasConflict) {
360+ return
361+ }
362+
363+ // Rebuild the sort order, keeping only fields that aren't being deleted.
364+ val builder = table.replaceSortOrder()
365+ for (sortField in currentSortOrder.fields()) {
366+ if (sortField.sourceId() !in fieldIdsBeingDeleted) {
367+ val fieldName = schema.findColumnName(sortField.sourceId())
368+ when (sortField.direction()) {
369+ SortDirection .ASC -> builder.asc(fieldName, sortField.nullOrder())
370+ SortDirection .DESC -> builder.desc(fieldName, sortField.nullOrder())
371+ else -> builder.asc(fieldName, sortField.nullOrder())
372+ }
373+ }
374+ }
375+ logger.info { " Replacing sort order to remove fields being deleted: $columnsBeingDeleted " }
376+ builder.commit()
377+ table.refresh()
378+ }
270379}
271380
272381data class SchemaUpdateResult (val schema : Schema , val pendingUpdates : List <UpdateSchema >)
0 commit comments