Skip to content

Commit e7d601c

Browse files
lleadbetdevin-ai-integration[bot]benmoriceauclaude
authored
fix(iceberg): map NumberType to DecimalType to support deduplication (#74272)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Benoit Moriceau <benoit@airbyte.io> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 69475fa commit e7d601c

File tree

11 files changed

+134
-21
lines changed

11 files changed

+134
-21
lines changed

airbyte-cdk/bulk/core/load/changelog.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ The Load CDK provides functionality for destination connectors including stream-
77
<details>
88
<summary>Expand to review</summary>
99

10-
| Version | Date | Pull Request | Subject |
11-
|---------|------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
12-
| 1.0.2 | 2026-02-24 | | Bump bulk-cdk-core-base to 1.0.1 to pick up CVE fixes (CVE-2021-47621, CVE-2022-36944). |
13-
| 1.0.1 | 2026-02-09 | [#72959](https://github.com/airbytehq/airbyte/pull/72959) | Fix: CVE-2026-25526 (Jinjava dependency bump). |
10+
| Version | Date | Pull Request | Subject |
11+
|---------|------------|--------------|-------------------------------------------------------------------------------------------------|
12+
| 1.0.3 | 2026-03-05 | [#74272](https://github.com/airbytehq/airbyte/pull/74272) | Fix iceberg dedup. |
13+
| 1.0.2 | 2026-02-24 | | Bump bulk-cdk-core-base to 1.0.1 to pick up CVE fixes (CVE-2021-47621, CVE-2022-36944). |
14+
| 1.0.1 | 2026-02-09 | [#72959](https://github.com/airbytehq/airbyte/pull/72959) | Fix: CVE-2026-25526 (Jinjava dependency bump). |
1415
| 1.0.0 | 2026-02-02 | [#72376](https://github.com/airbytehq/airbyte/pull/72376) | Initial independent release of bulk-cdk-core-load. Separate versioning for load package begins. |
1516

1617
</details>
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=1.0.2
1+
version=1.0.3

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class AirbyteTypeToIcebergSchema {
6767
is BooleanType -> Types.BooleanType.get()
6868
is DateType -> Types.DateType.get()
6969
is IntegerType -> Types.LongType.get()
70-
is NumberType -> Types.DoubleType.get()
70+
is NumberType -> Types.DecimalType.of(38, 18)
7171
// Schemaless types are converted to string
7272
is ArrayTypeWithoutSchema,
7373
is ObjectTypeWithEmptySchema,

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,12 @@ class AirbyteValueToIcebergRecord {
6363
is DateValue -> return airbyteValue.value
6464
is IntegerValue -> return airbyteValue.value.toLong()
6565
is NullValue -> return null
66-
is NumberValue -> return airbyteValue.value.toDouble()
66+
is NumberValue ->
67+
return when (type.typeId()) {
68+
// NumberType is mapped to DecimalType in Iceberg.
69+
Type.TypeID.DECIMAL -> airbyteValue.value
70+
else -> airbyteValue.value.toDouble()
71+
}
6772
is StringValue -> return airbyteValue.value
6873
is TimeWithTimezoneValue ->
6974
return when (type.typeId()) {

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergSuperTypeFinder.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.iceberg.types.Types.*
2323
*/
2424
@Singleton
2525
class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) {
26-
private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO)
26+
private val unsupportedTypeIds = setOf(BINARY, FIXED, UUID, MAP, TIMESTAMP_NANO)
2727

2828
/**
2929
* Returns a supertype for [existingType] and [incomingType] if one exists.
@@ -64,7 +64,7 @@ class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesCom
6464
}
6565

6666
/**
67-
* Checks whether either type is unsupported or unmapped (e.g. BINARY, DECIMAL, FIXED, etc.).
67+
* Checks whether either type is unsupported or unmapped (e.g. BINARY, FIXED, UUID, etc.).
6868
*
6969
* @throws ConfigErrorException if either type is unsupported.
7070
*/
@@ -106,6 +106,16 @@ class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesCom
106106
// If both are the same type ID, we just use the existing type
107107
if (existingTypeId == incomingTypeId) {
108108
// For timestamps, you'd want to reconcile UTC. This is simplified here.
109+
// For decimals, return the wider precision (Iceberg allows widening precision).
110+
if (existingTypeId == DECIMAL) {
111+
val existingDecimal = existingType as DecimalType
112+
val incomingDecimal = incomingType as DecimalType
113+
return if (incomingDecimal.precision() > existingDecimal.precision()) {
114+
incomingType
115+
} else {
116+
existingType
117+
}
118+
}
109119
return existingType
110120
}
111121

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTypesComparator.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,14 @@ class IcebergTypesComparator {
222222
// but for this function's purpose, we only check the existing fields.
223223
true
224224
}
225+
Type.TypeID.DECIMAL -> {
226+
require(existingType is Types.DecimalType && incomingType is Types.DecimalType) {
227+
"Expected DECIMAL types, got $existingType and $incomingType."
228+
}
229+
existingType.precision() == incomingType.precision() &&
230+
existingType.scale() == incomingType.scale()
231+
}
225232
Type.TypeID.BINARY,
226-
Type.TypeID.DECIMAL,
227233
Type.TypeID.FIXED,
228234
Type.TypeID.UUID,
229235
Type.TypeID.MAP,

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergUtil.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ class IcebergUtil(private val tableIdGenerator: TableIdGenerator) {
149149
StringValue(element.serializeToString()),
150150
changeDescription = null,
151151
)
152-
153152
// otherwise, don't change anything
154153
else -> null
155154
}

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/test/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergSuperTypeFinderTest.kt

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,30 @@ class IcebergSuperTypeFinderTest {
126126
}
127127

128128
@Test
129-
fun testDecimalIsUnsupported() {
129+
fun testIdenticalDecimalTypes() {
130+
val decimalType = Types.DecimalType.of(38, 18)
131+
val result = superTypeFinder.findSuperType(decimalType, decimalType, "column_name")
132+
133+
// Identical decimals => return existing
134+
assertThat(result).isSameAs(decimalType)
135+
}
136+
137+
@Test
138+
fun testDecimalPrecisionWidening() {
139+
val narrowDecimal = Types.DecimalType.of(10, 2)
140+
val wideDecimal = Types.DecimalType.of(20, 2)
141+
142+
val result = superTypeFinder.findSuperType(narrowDecimal, wideDecimal, "column_name")
143+
// Wider precision should be returned
144+
assertThat(result).isSameAs(wideDecimal)
145+
}
146+
147+
@Test
148+
fun testDecimalToIntIsNotAllowed() {
130149
val decimalType = Types.DecimalType.of(10, 2)
131150
val intType = Types.IntegerType.get()
132151

133-
// Fails in validateTypeIds => DECIMAL is not supported
152+
// Decimal to Int promotion is not allowed by Iceberg
134153
assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") }
135154
.isInstanceOf(ConfigErrorException::class.java)
136155
.hasMessageContaining(

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/test/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTypesComparatorTest.kt

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ class IcebergTypesComparatorTest {
391391
}
392392

393393
@Test
394-
fun testUnsupportedTypeDecimal() {
394+
fun testDecimalTypesEqual() {
395395
val existingSchema =
396396
buildSchema(
397397
field("decimal_col", Types.DecimalType.of(10, 2), false),
@@ -401,10 +401,52 @@ class IcebergTypesComparatorTest {
401401
field("decimal_col", Types.DecimalType.of(10, 2), false),
402402
)
403403

404-
// The code in typesAreEqual() throws for TypeID.DECIMAL
405-
assertThatThrownBy { comparator.compareSchemas(incomingSchema, existingSchema) }
406-
.isInstanceOf(IllegalArgumentException::class.java)
407-
.hasMessageContaining("Unsupported or unmapped Iceberg type: DECIMAL")
404+
val diff = comparator.compareSchemas(incomingSchema, existingSchema)
405+
406+
assertThat(diff.newColumns).isEmpty()
407+
assertThat(diff.updatedDataTypes).isEmpty()
408+
assertThat(diff.removedColumns).isEmpty()
409+
assertThat(diff.newlyOptionalColumns).isEmpty()
410+
}
411+
412+
@Test
413+
fun testDecimalTypesDifferentPrecision() {
414+
val existingSchema =
415+
buildSchema(
416+
field("decimal_col", Types.DecimalType.of(10, 2), false),
417+
)
418+
val incomingSchema =
419+
buildSchema(
420+
field("decimal_col", Types.DecimalType.of(20, 2), false),
421+
)
422+
423+
val diff = comparator.compareSchemas(incomingSchema, existingSchema)
424+
425+
// Different precision means the type has been updated
426+
assertThat(diff.updatedDataTypes).containsExactly("decimal_col")
427+
assertThat(diff.newColumns).isEmpty()
428+
assertThat(diff.removedColumns).isEmpty()
429+
assertThat(diff.newlyOptionalColumns).isEmpty()
430+
}
431+
432+
@Test
433+
fun testDecimalTypesDifferentScale() {
434+
val existingSchema =
435+
buildSchema(
436+
field("decimal_col", Types.DecimalType.of(10, 2), false),
437+
)
438+
val incomingSchema =
439+
buildSchema(
440+
field("decimal_col", Types.DecimalType.of(10, 5), false),
441+
)
442+
443+
val diff = comparator.compareSchemas(incomingSchema, existingSchema)
444+
445+
// Different scale means the type has been updated
446+
assertThat(diff.updatedDataTypes).containsExactly("decimal_col")
447+
assertThat(diff.newColumns).isEmpty()
448+
assertThat(diff.removedColumns).isEmpty()
449+
assertThat(diff.newlyOptionalColumns).isEmpty()
408450
}
409451

410452
@Test

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteTypeToIcebergSchemaTest.kt

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class AirbyteTypeToIcebergSchemaTest {
101101
@Test
102102
fun `convert handles NumberType`() {
103103
assertEquals(
104-
Types.DoubleType.get(),
104+
Types.DecimalType.of(38, 18),
105105
converter.convert(NumberType, stringifyObjects = false)
106106
)
107107
}
@@ -213,4 +213,35 @@ class AirbyteTypeToIcebergSchemaTest {
213213
assertEquals(1, identifierFieldIds.size)
214214
assertEquals(true, identifierFieldIds.contains(ageColumn.fieldId()))
215215
}
216+
217+
@Test
218+
fun `toIcebergSchema maps PK NumberType to DecimalType for identifier compatibility`() {
219+
val objectType =
220+
ObjectType(
221+
linkedMapOf(
222+
"id" to FieldType(NumberType, false),
223+
"amount" to FieldType(NumberType, true),
224+
),
225+
)
226+
val schema = objectType.toIcebergSchema(mutableListOf(mutableListOf("id")))
227+
228+
assertEquals(2, schema.columns().size)
229+
val idColumn = schema.findField("id")
230+
val amountColumn = schema.findField("amount")
231+
232+
// PK NumberType field should be DecimalType (for Iceberg identifier compatibility)
233+
assertNotNull(idColumn)
234+
assertFalse(idColumn!!.isOptional)
235+
assertEquals(Types.DecimalType.of(38, 18), idColumn.type())
236+
237+
// Non-PK NumberType field should also be DecimalType
238+
assertNotNull(amountColumn)
239+
assertTrue(amountColumn!!.isOptional)
240+
assertEquals(Types.DecimalType.of(38, 18), amountColumn.type())
241+
242+
// PK field should be in identifier fields (DecimalType is allowed)
243+
val identifierFieldIds = schema.identifierFieldIds()
244+
assertEquals(1, identifierFieldIds.size)
245+
assertTrue(identifierFieldIds.contains(idColumn.fieldId()))
246+
}
216247
}

0 commit comments

Comments
 (0)