Skip to content

Commit 419b0a7

Browse files
fix(iceberg): map PK NumberType to StringType instead of DecimalType (#74328)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent e92b03e commit 419b0a7

File tree

9 files changed

+28
-98
lines changed

9 files changed

+28
-98
lines changed

airbyte-cdk/bulk/core/load/changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The Load CDK provides functionality for destination connectors including stream-
99

1010
| Version | Date | Pull Request | Subject |
1111
|---------|------------|--------------|-------------------------------------------------------------------------------------------------|
12+
| 1.0.4 | 2026-03-05 | [#74328](https://github.com/airbytehq/airbyte/pull/74328) | Fix iceberg dedup: map PK NumberType to StringType instead of DecimalType for identifier field compatibility. |
1213
| 1.0.3 | 2026-03-05 | [#74272](https://github.com/airbytehq/airbyte/pull/74272) | Fix iceberg dedup. |
1314
| 1.0.2 | 2026-02-24 | | Bump bulk-cdk-core-base to 1.0.1 to pick up CVE fixes (CVE-2021-47621, CVE-2022-36944). |
1415
| 1.0.1 | 2026-02-09 | [#72959](https://github.com/airbytehq/airbyte/pull/72959) | Fix: CVE-2026-25526 (Jinjava dependency bump). |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=1.0.3
1+
version=1.0.4

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class AirbyteTypeToIcebergSchema {
6767
is BooleanType -> Types.BooleanType.get()
6868
is DateType -> Types.DateType.get()
6969
is IntegerType -> Types.LongType.get()
70-
is NumberType -> Types.DecimalType.of(38, 18)
70+
is NumberType -> Types.DoubleType.get()
7171
// Schemaless types are converted to string
7272
is ArrayTypeWithoutSchema,
7373
is ObjectTypeWithEmptySchema,
@@ -108,7 +108,13 @@ fun ObjectType.toIcebergSchema(primaryKeys: List<List<String>>): Schema {
108108
// But we should leave the _airbyte_meta field as an actual object.
109109
val stringifyObjects = name != Meta.COLUMN_NAME_AB_META
110110
val icebergType =
111-
icebergTypeConverter.convert(field.type, stringifyObjects = stringifyObjects)
111+
if (isPrimaryKey && field.type is NumberType) {
112+
// Override PK NumberType fields to StringType so they can be used as
113+
// Iceberg identifier fields (float/double are disallowed as identifiers).
114+
Types.StringType.get()
115+
} else {
116+
icebergTypeConverter.convert(field.type, stringifyObjects = stringifyObjects)
117+
}
112118
fields.add(
113119
NestedField.of(
114120
id,

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ class AirbyteValueToIcebergRecord {
6565
is NullValue -> return null
6666
is NumberValue ->
6767
return when (type.typeId()) {
68-
// NumberType is mapped to DecimalType in Iceberg.
69-
Type.TypeID.DECIMAL -> airbyteValue.value
68+
// PK NumberType fields are mapped to StringType in Iceberg.
69+
Type.TypeID.STRING -> airbyteValue.value.toPlainString()
7070
else -> airbyteValue.value.toDouble()
7171
}
7272
is StringValue -> return airbyteValue.value

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergSuperTypeFinder.kt

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.iceberg.types.Types.*
2323
*/
2424
@Singleton
2525
class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) {
26-
private val unsupportedTypeIds = setOf(BINARY, FIXED, UUID, MAP, TIMESTAMP_NANO)
26+
private val unsupportedTypeIds = setOf(BINARY, FIXED, UUID, MAP, DECIMAL, TIMESTAMP_NANO)
2727

2828
/**
2929
* Returns a supertype for [existingType] and [incomingType] if one exists.
@@ -106,16 +106,6 @@ class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesCom
106106
// If both are the same type ID, we just use the existing type
107107
if (existingTypeId == incomingTypeId) {
108108
// For timestamps, you'd want to reconcile UTC. This is simplified here.
109-
// For decimals, return the wider precision (Iceberg allows widening precision).
110-
if (existingTypeId == DECIMAL) {
111-
val existingDecimal = existingType as DecimalType
112-
val incomingDecimal = incomingType as DecimalType
113-
return if (incomingDecimal.precision() > existingDecimal.precision()) {
114-
incomingType
115-
} else {
116-
existingType
117-
}
118-
}
119109
return existingType
120110
}
121111

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTypesComparator.kt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,7 @@ class IcebergTypesComparator {
222222
// but for this function's purpose, we only check the existing fields.
223223
true
224224
}
225-
Type.TypeID.DECIMAL -> {
226-
require(existingType is Types.DecimalType && incomingType is Types.DecimalType) {
227-
"Expected DECIMAL types, got $existingType and $incomingType."
228-
}
229-
existingType.precision() == incomingType.precision() &&
230-
existingType.scale() == incomingType.scale()
231-
}
225+
Type.TypeID.DECIMAL,
232226
Type.TypeID.BINARY,
233227
Type.TypeID.FIXED,
234228
Type.TypeID.UUID,

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/test/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergSuperTypeFinderTest.kt

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -126,30 +126,11 @@ class IcebergSuperTypeFinderTest {
126126
}
127127

128128
@Test
129-
fun testIdenticalDecimalTypes() {
130-
val decimalType = Types.DecimalType.of(38, 18)
131-
val result = superTypeFinder.findSuperType(decimalType, decimalType, "column_name")
132-
133-
// Identical decimals => return existing
134-
assertThat(result).isSameAs(decimalType)
135-
}
136-
137-
@Test
138-
fun testDecimalPrecisionWidening() {
139-
val narrowDecimal = Types.DecimalType.of(10, 2)
140-
val wideDecimal = Types.DecimalType.of(20, 2)
141-
142-
val result = superTypeFinder.findSuperType(narrowDecimal, wideDecimal, "column_name")
143-
// Wider precision should be returned
144-
assertThat(result).isSameAs(wideDecimal)
145-
}
146-
147-
@Test
148-
fun testDecimalToIntIsNotAllowed() {
129+
fun testDecimalIsUnsupported() {
149130
val decimalType = Types.DecimalType.of(10, 2)
150131
val intType = Types.IntegerType.get()
151132

152-
// Decimal to Int promotion is not allowed by Iceberg
133+
// Fails in validateTypeIds => DECIMAL is not supported
153134
assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") }
154135
.isInstanceOf(ConfigErrorException::class.java)
155136
.hasMessageContaining(

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/test/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTypesComparatorTest.kt

Lines changed: 5 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ class IcebergTypesComparatorTest {
391391
}
392392

393393
@Test
394-
fun testDecimalTypesEqual() {
394+
fun testDecimalTypeIsUnsupported() {
395395
val existingSchema =
396396
buildSchema(
397397
field("decimal_col", Types.DecimalType.of(10, 2), false),
@@ -401,52 +401,10 @@ class IcebergTypesComparatorTest {
401401
field("decimal_col", Types.DecimalType.of(10, 2), false),
402402
)
403403

404-
val diff = comparator.compareSchemas(incomingSchema, existingSchema)
405-
406-
assertThat(diff.newColumns).isEmpty()
407-
assertThat(diff.updatedDataTypes).isEmpty()
408-
assertThat(diff.removedColumns).isEmpty()
409-
assertThat(diff.newlyOptionalColumns).isEmpty()
410-
}
411-
412-
@Test
413-
fun testDecimalTypesDifferentPrecision() {
414-
val existingSchema =
415-
buildSchema(
416-
field("decimal_col", Types.DecimalType.of(10, 2), false),
417-
)
418-
val incomingSchema =
419-
buildSchema(
420-
field("decimal_col", Types.DecimalType.of(20, 2), false),
421-
)
422-
423-
val diff = comparator.compareSchemas(incomingSchema, existingSchema)
424-
425-
// Different precision means the type has been updated
426-
assertThat(diff.updatedDataTypes).containsExactly("decimal_col")
427-
assertThat(diff.newColumns).isEmpty()
428-
assertThat(diff.removedColumns).isEmpty()
429-
assertThat(diff.newlyOptionalColumns).isEmpty()
430-
}
431-
432-
@Test
433-
fun testDecimalTypesDifferentScale() {
434-
val existingSchema =
435-
buildSchema(
436-
field("decimal_col", Types.DecimalType.of(10, 2), false),
437-
)
438-
val incomingSchema =
439-
buildSchema(
440-
field("decimal_col", Types.DecimalType.of(10, 5), false),
441-
)
442-
443-
val diff = comparator.compareSchemas(incomingSchema, existingSchema)
444-
445-
// Different scale means the type has been updated
446-
assertThat(diff.updatedDataTypes).containsExactly("decimal_col")
447-
assertThat(diff.newColumns).isEmpty()
448-
assertThat(diff.removedColumns).isEmpty()
449-
assertThat(diff.newlyOptionalColumns).isEmpty()
404+
// The code in typesAreEqual() throws for TypeID.DECIMAL
405+
assertThatThrownBy { comparator.compareSchemas(incomingSchema, existingSchema) }
406+
.isInstanceOf(IllegalArgumentException::class.java)
407+
.hasMessageContaining("Unsupported or unmapped Iceberg type: DECIMAL")
450408
}
451409

452410
@Test

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteTypeToIcebergSchemaTest.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class AirbyteTypeToIcebergSchemaTest {
101101
@Test
102102
fun `convert handles NumberType`() {
103103
assertEquals(
104-
Types.DecimalType.of(38, 18),
104+
Types.DoubleType.get(),
105105
converter.convert(NumberType, stringifyObjects = false)
106106
)
107107
}
@@ -215,7 +215,7 @@ class AirbyteTypeToIcebergSchemaTest {
215215
}
216216

217217
@Test
218-
fun `toIcebergSchema maps PK NumberType to DecimalType for identifier compatibility`() {
218+
fun `toIcebergSchema maps PK NumberType to StringType for identifier compatibility`() {
219219
val objectType =
220220
ObjectType(
221221
linkedMapOf(
@@ -229,17 +229,17 @@ class AirbyteTypeToIcebergSchemaTest {
229229
val idColumn = schema.findField("id")
230230
val amountColumn = schema.findField("amount")
231231

232-
// PK NumberType field should be DecimalType (for Iceberg identifier compatibility)
232+
// PK NumberType field should be StringType (for Iceberg identifier compatibility)
233233
assertNotNull(idColumn)
234234
assertFalse(idColumn!!.isOptional)
235-
assertEquals(Types.DecimalType.of(38, 18), idColumn.type())
235+
assertEquals(Types.StringType.get(), idColumn.type())
236236

237-
// Non-PK NumberType field should also be DecimalType
237+
// Non-PK NumberType field should remain DoubleType
238238
assertNotNull(amountColumn)
239239
assertTrue(amountColumn!!.isOptional)
240-
assertEquals(Types.DecimalType.of(38, 18), amountColumn.type())
240+
assertEquals(Types.DoubleType.get(), amountColumn.type())
241241

242-
// PK field should be in identifier fields (DecimalType is allowed)
242+
// PK field should be in identifier fields (StringType is allowed)
243243
val identifierFieldIds = schema.identifierFieldIds()
244244
assertEquals(1, identifierFieldIds.size)
245245
assertTrue(identifierFieldIds.contains(idColumn.fieldId()))

0 commit comments

Comments
 (0)