Skip to content

Commit 00f61e3

Browse files
fix(iceberg): defer identifier field update when replacing columns in schema evolution (#74723)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 4f84262 commit 00f61e3

File tree

4 files changed

+92
-3
lines changed

4 files changed

+92
-3
lines changed

airbyte-cdk/bulk/core/load/changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The Load CDK provides functionality for destination connectors including stream-
99

1010
| Version | Date | Pull Request | Subject |
1111
|---------|------------|--------------|-------------------------------------------------------------------------------------------------|
12+
| 1.0.5 | 2026-03-10 | [#74723](https://github.com/airbytehq/airbyte/pull/74723) | Fix schema evolution: defer identifier field update when replacing columns to avoid Iceberg conflict. |
1213
| 1.0.4 | 2026-03-05 | [#74328](https://github.com/airbytehq/airbyte/pull/74328) | Fix iceberg dedup: map PK NumberType to StringType instead of DecimalType for identifier field compatibility. |
1314
| 1.0.3 | 2026-03-05 | [#74272](https://github.com/airbytehq/airbyte/pull/74272) | Fix iceberg dedup. |
1415
| 1.0.2 | 2026-02-24 | | Bump bulk-cdk-core-base to 1.0.1 to pick up CVE fixes (CVE-2021-47621, CVE-2022-36944). |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=1.0.4
1+
version=1.0.5

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTableSynchronizer.kt

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class IcebergTableSynchronizer(
9494
// 2) Update types => find a supertype for each changed column
9595
val columnsToReplaceInSecondCommit =
9696
mutableMapOf<String, org.apache.iceberg.types.Types.NestedField>()
97+
val replacedColumns = mutableSetOf<String>()
9798

9899
diff.updatedDataTypes.forEach { columnName ->
99100
val existingField =
@@ -134,6 +135,7 @@ class IcebergTableSynchronizer(
134135
update.deleteColumn(columnName)
135136
update.addColumn(columnName, incomingField.type())
136137
}
138+
replacedColumns.add(columnName)
137139
}
138140
}
139141
}
@@ -188,8 +190,17 @@ class IcebergTableSynchronizer(
188190
}
189191

190192
// 5) Update identifier fields
191-
if (diff.identifierFieldsChanged) {
192-
val updatedIdentifierFields = incomingSchema.identifierFieldNames().toList()
193+
// Iceberg's requireColumn() fails for columns pending deletion (even if they're
194+
// being re-added in the same update). When replaced columns are also identifier
195+
// fields, we must defer the identifier field update to a follow-up commit.
196+
val updatedIdentifierFields =
197+
if (diff.identifierFieldsChanged) incomingSchema.identifierFieldNames().toList()
198+
else emptyList()
199+
val hasReplacedIdentifierFields =
200+
replacedColumns.any { it in updatedIdentifierFields.toSet() }
201+
202+
if (diff.identifierFieldsChanged && !hasReplacedIdentifierFields) {
203+
// No conflict: can update identifier fields in the same update
193204
updatedIdentifierFields.forEach { update.requireColumn(it) }
194205
update.setIdentifierFields(updatedIdentifierFields)
195206
}
@@ -211,6 +222,12 @@ class IcebergTableSynchronizer(
211222
addUpdate.addColumn(null, columnName, field.type())
212223
}
213224

225+
// If identifier fields were deferred, handle them now (columns have been re-added)
226+
if (hasReplacedIdentifierFields) {
227+
updatedIdentifierFields.forEach { addUpdate.requireColumn(it) }
228+
addUpdate.setIdentifierFields(updatedIdentifierFields)
229+
}
230+
214231
// Commit or defer the add operation based on columnTypeChangeBehavior
215232
val finalSchema = addUpdate.apply()
216233
return if (columnTypeChangeBehavior.commitImmediately) {
@@ -221,6 +238,25 @@ class IcebergTableSynchronizer(
221238
}
222239
}
223240

241+
// If replaced columns are also identifier fields, commit column replacements first,
242+
// then handle identifier fields in a follow-up update.
243+
if (hasReplacedIdentifierFields) {
244+
update.commit()
245+
table.refresh()
246+
247+
val identifierUpdate = table.updateSchema().allowIncompatibleChanges()
248+
updatedIdentifierFields.forEach { identifierUpdate.requireColumn(it) }
249+
identifierUpdate.setIdentifierFields(updatedIdentifierFields)
250+
251+
val newSchema = identifierUpdate.apply()
252+
return if (columnTypeChangeBehavior.commitImmediately) {
253+
identifierUpdate.commit()
254+
SchemaUpdateResult(newSchema, pendingUpdates = emptyList())
255+
} else {
256+
SchemaUpdateResult(newSchema, pendingUpdates = listOf(identifierUpdate))
257+
}
258+
}
259+
224260
// `apply` just validates that the schema change is valid, it doesn't actually commit().
225261
// It returns the schema that the table _would_ have after committing.
226262
val newSchema: Schema = update.apply()

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/test/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTableSynchronizerTest.kt

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,4 +421,56 @@ class IcebergTableSynchronizerTest {
421421
assertThat(schema).isSameAs(mockNewSchema)
422422
assertThat(pendingUpdates).hasSize(1)
423423
}
424+
425+
@Test
426+
fun `test overwrite with replaced column as identifier field defers identifier update`() {
427+
// Simulates the scenario where a PK column's type changes (e.g. Double -> String)
428+
// and the column is also an identifier field. Iceberg's requireColumn() fails for
429+
// columns pending deletion, so we must commit the column replacement first, then
430+
// handle identifier fields in a follow-up update.
431+
val existingSchema =
432+
buildSchema(Types.NestedField.required(1, "pk_col", Types.DoubleType.get()))
433+
val incomingSchema =
434+
buildSchema(
435+
Types.NestedField.required(1, "pk_col", Types.StringType.get()),
436+
identifierFields = setOf(1)
437+
)
438+
439+
every { mockTable.schema() } returns existingSchema
440+
441+
// After the first commit (column replacement), table.updateSchema() returns a new mock
442+
val mockIdentifierUpdateSchema = mockk<UpdateSchema>(relaxed = true)
443+
val mockIdentifierNewSchema = mockk<Schema>(relaxed = true)
444+
every { mockIdentifierUpdateSchema.apply() } returns mockIdentifierNewSchema
445+
446+
// First call returns mockUpdateSchema, second call (after commit+refresh) returns
447+
// the identifier update mock.
448+
every { mockTable.updateSchema().allowIncompatibleChanges() } returnsMany
449+
listOf(mockUpdateSchema, mockIdentifierUpdateSchema)
450+
451+
val (schema, pendingUpdates) =
452+
synchronizer.maybeApplySchemaChanges(
453+
mockTable,
454+
incomingSchema,
455+
ColumnTypeChangeBehavior.OVERWRITE
456+
)
457+
458+
// First update: delete + add column (committed immediately due to deferred identifiers)
459+
verify { mockUpdateSchema.deleteColumn("pk_col") }
460+
verify { mockUpdateSchema.addColumn("pk_col", Types.StringType.get()) }
461+
verify { mockUpdateSchema.commit() }
462+
463+
// Table is refreshed after the first commit
464+
verify { mockTable.refresh() }
465+
466+
// Second update: identifier fields handled in a follow-up
467+
verify { mockIdentifierUpdateSchema.requireColumn("pk_col") }
468+
verify { mockIdentifierUpdateSchema.setIdentifierFields(listOf("pk_col")) }
469+
// OVERWRITE mode doesn't commit immediately — returns as pending
470+
verify(exactly = 0) { mockIdentifierUpdateSchema.commit() }
471+
472+
assertThat(schema).isSameAs(mockIdentifierNewSchema)
473+
assertThat(pendingUpdates).hasSize(1)
474+
assertThat(pendingUpdates.first()).isSameAs(mockIdentifierUpdateSchema)
475+
}
424476
}

0 commit comments

Comments
 (0)