Skip to content

Commit 32dbfcc

Browse files
authored
fix: load CDK timestamp coercion perf (#76246)
1 parent c82eff3 commit 32dbfcc

File tree

16 files changed

+693
-94
lines changed

16 files changed

+693
-94
lines changed

airbyte-cdk/bulk/core/load/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ dependencies {
1717

1818
api "io.airbyte.bulk-cdk:bulk-cdk-core-base:1.0.1"
1919
implementation 'org.apache.commons:commons-lang3:3.17.0'
20+
implementation 'com.ethlo.time:itu:1.14.0'
2021

2122
// For ranges and rangesets
2223
implementation("com.google.guava:guava:33.3.0-jre")

airbyte-cdk/bulk/core/load/changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The Load CDK provides functionality for destination connectors including stream-
99

1010
| Version | Date | Pull Request | Subject |
1111
|---------|------------|--------------|-------------------------------------------------------------------------------------------------|
12+
| 1.0.9 | 2026-04-15 | [#76246](https://github.com/airbytehq/airbyte/pull/76246) | Perf: fix timestamp coercion using exception-as-control-flow. Use ethlo/itu for ISO-8601 fast path (~25x faster), JDK TemporalQueries fallback for exotic formats. Eliminates ~56% destination CPU on timestamp-heavy workloads. |
1213
| 1.0.8 | 2026-04-14 | [#74728](https://github.com/airbytehq/airbyte/pull/74728) | Fix OAuthAuthenticator to track token expiry via `expires_in` and refresh expired tokens. |
1314
| 1.0.7 | 2026-03-27 | | Fix: update Iceberg sort order before schema evolution to prevent ValidationException when deleting columns referenced by the sort order. Handles Dedupe-to-Append mode switches and PK changes. |
1415
| 1.0.6 | 2026-03-12 | [#74715](https://github.com/airbytehq/airbyte/pull/74715) | Fix: drop temp table after successful upsert to prevent duplicate records across syncs. |

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueCoercer.kt

Lines changed: 122 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,46 @@
44

55
package io.airbyte.cdk.load.data
66

7+
import com.ethlo.time.ITU
8+
import com.ethlo.time.ParseConfig
9+
import io.airbyte.cdk.load.data.TemporalFormatters.DATE_TIME_FORMATTER
710
import io.airbyte.cdk.load.data.json.JsonToAirbyteValue
811
import io.airbyte.cdk.load.util.serializeToString
12+
import io.micronaut.context.annotation.Property
13+
import jakarta.inject.Singleton
914
import java.math.BigDecimal
1015
import java.math.BigInteger
16+
import java.text.ParsePosition
1117
import java.time.LocalDate
1218
import java.time.LocalDateTime
1319
import java.time.LocalTime
1420
import java.time.OffsetDateTime
1521
import java.time.OffsetTime
1622
import java.time.ZoneOffset
1723
import java.time.ZonedDateTime
18-
import java.time.format.DateTimeFormatter
24+
import java.time.temporal.TemporalQueries
1925

2026
/**
2127
* Utility class to coerce AirbyteValue to specific types. Does **not** support recursive coercion.
2228
*
2329
* More specifically: This class coerces the output of [JsonToAirbyteValue] to strongly-typed
2430
* [AirbyteValue]. In particular, this class will parse temporal types, and performs some
2531
* common-sense conversions among numeric types, as well as upcasting any value to StringValue.
32+
*
33+
* The [useFastTimestampParsing] flag controls timestamp parsing behavior:
34+
* - When false (default): uses the original try-ZonedDateTime/catch-fallback-to-LocalDateTime
35+
* pattern. Safe but slow due to exception-as-control-flow.
36+
* - When true: uses ethlo/itu for fast ISO-8601 parsing (~25x faster) with a JDK TemporalQueries
37+
* fallback for exotic formats. No exceptions on the hot path.
38+
*
39+
* Destinations opt in via the Micronaut property
40+
* `airbyte.destination.core.coercion.use-fast-timestamp-parsing`.
2641
*/
27-
object AirbyteValueCoercer {
42+
@Singleton
43+
class AirbyteValueCoercer(
44+
@Property(name = "airbyte.destination.core.coercion.use-fast-timestamp-parsing")
45+
private val useFastTimestampParsing: Boolean = false,
46+
) {
2847
fun coerce(
2948
value: AirbyteValue,
3049
type: AirbyteType,
@@ -64,7 +83,7 @@ object AirbyteValueCoercer {
6483
// leave it unchanged.
6584
is UnknownType -> value
6685
}
67-
} catch (e: Exception) {
86+
} catch (_: Exception) {
6887
null
6988
}
7089
}
@@ -119,7 +138,7 @@ object AirbyteValueCoercer {
119138
is DateValue -> value
120139
else ->
121140
requireType<StringValue, DateValue>(value) {
122-
DateValue(LocalDate.parse(it.value, DATE_TIME_FORMATTER))
141+
DateValue(LocalDate.parse(it.value, TemporalFormatters.DATE_TIME_FORMATTER))
123142
}
124143
}
125144

@@ -129,10 +148,25 @@ object AirbyteValueCoercer {
129148
else ->
130149
requireType<StringValue, TimeWithTimezoneValue>(value) {
131150
val ot =
132-
try {
133-
OffsetTime.parse(it.value, TIME_FORMATTER)
134-
} catch (e: Exception) {
135-
LocalTime.parse(it.value, TIME_FORMATTER).atOffset(ZoneOffset.UTC)
151+
if (useFastTimestampParsing) {
152+
// Single parse + temporal query to detect timezone, avoiding the
153+
// try-OffsetTime.parse/catch-fallback-to-LocalTime pattern.
154+
val parsed = TemporalFormatters.TIME_FORMATTER.parse(it.value)
155+
val offset = parsed.query(TemporalQueries.offset())
156+
if (offset != null) {
157+
// Has timezone offset (e.g. "12:00:00+01:00") — use it directly
158+
OffsetTime.from(parsed)
159+
} else {
160+
// No timezone (e.g. "12:00:00") — assume UTC
161+
LocalTime.from(parsed).atOffset(ZoneOffset.UTC)
162+
}
163+
} else {
164+
try {
165+
OffsetTime.parse(it.value, TemporalFormatters.TIME_FORMATTER)
166+
} catch (_: Exception) {
167+
LocalTime.parse(it.value, TemporalFormatters.TIME_FORMATTER)
168+
.atOffset(ZoneOffset.UTC)
169+
}
136170
}
137171
TimeWithTimezoneValue(ot)
138172
}
@@ -143,7 +177,9 @@ object AirbyteValueCoercer {
143177
is TimeWithoutTimezoneValue -> value
144178
else ->
145179
requireType<StringValue, TimeWithoutTimezoneValue>(value) {
146-
TimeWithoutTimezoneValue(LocalTime.parse(it.value, TIME_FORMATTER))
180+
TimeWithoutTimezoneValue(
181+
LocalTime.parse(it.value, TemporalFormatters.TIME_FORMATTER)
182+
)
147183
}
148184
}
149185

@@ -166,13 +202,84 @@ object AirbyteValueCoercer {
166202
}
167203

168204
private fun offsetDateTime(it: StringValue): OffsetDateTime {
169-
val odt =
170-
try {
171-
ZonedDateTime.parse(it.value, DATE_TIME_FORMATTER).toOffsetDateTime()
172-
} catch (e: Exception) {
173-
LocalDateTime.parse(it.value, DATE_TIME_FORMATTER).atOffset(ZoneOffset.UTC)
205+
return if (useFastTimestampParsing) {
206+
offsetDateTimeFast(it.value)
207+
} else {
208+
offsetDateTimeLegacy(it.value)
209+
}
210+
}
211+
212+
/** Legacy path: try ZonedDateTime.parse, catch, fall back to LocalDateTime.parse. */
213+
private fun offsetDateTimeLegacy(s: String): OffsetDateTime {
214+
return try {
215+
ZonedDateTime.parse(s, DATE_TIME_FORMATTER).toOffsetDateTime()
216+
} catch (e: Exception) {
217+
LocalDateTime.parse(s, DATE_TIME_FORMATTER).atOffset(ZoneOffset.UTC)
218+
}
219+
}
220+
221+
/**
222+
* Fast path: uses ethlo/itu for ISO-8601 (~25x faster, no exceptions), with a JDK
223+
* TemporalQueries fallback for non-ISO formats.
224+
*/
225+
private fun offsetDateTimeFast(s: String): OffsetDateTime {
226+
if (looksLikeIso8601(s)) {
227+
// Fast path: ITU handles ISO-8601/RFC-3339 ~25x faster than JDK DateTimeFormatter,
228+
// and determines with/without timezone via position-based parsing (no exceptions).
229+
val pos = ParsePosition(0)
230+
val parsed =
231+
try {
232+
ITU.parseLenient(s, ParseConfig.DEFAULT, pos)
233+
} catch (_: Exception) {
234+
// ITU can throw on some edge cases that pass looksLikeIso8601 (e.g.
235+
// "2021-01-01 01:01:01 +0000" has dashes but a space-separated compact offset).
236+
null
237+
}
238+
// Only use the ITU result if it successfully consumed the entire input string.
239+
// Partial parses (e.g. ITU parsed the date but stopped at a named timezone like
240+
// "GMT+08:00") must fall through to the JDK formatter which handles those.
241+
if (parsed != null && pos.index == s.length) {
242+
return if (parsed.offset.isPresent) {
243+
// Timestamp included timezone info (Z, +05:30, etc.) — use it directly
244+
parsed.toOffsetDatetime()
245+
} else {
246+
// No timezone info — assume UTC
247+
parsed.toLocalDatetime().atOffset(ZoneOffset.UTC)
248+
}
174249
}
175-
return odt
250+
}
251+
// Non-ISO format (slashes, dots, abbreviated months) or ITU couldn't fully parse
252+
return offsetDateTimeJdk(s)
253+
}
254+
255+
/**
256+
* JDK fallback for non-ISO formats. Uses a single parse + temporal query to determine whether
257+
* timezone info is present, avoiding the old try-ZonedDateTime/catch pattern.
258+
*/
259+
private fun offsetDateTimeJdk(s: String): OffsetDateTime {
260+
val parsed = DATE_TIME_FORMATTER.parse(s)
261+
// Check for explicit offset (+05:30, Z) first, then named zone (UTC, PST, GMT+08:00).
262+
// Named zones like "GMT+08:00" are parsed as a ZoneId, not an offset, so we need both
263+
// checks.
264+
val offset = parsed.query(TemporalQueries.offset())
265+
val zone = parsed.query(TemporalQueries.zone())
266+
return if (offset != null || zone != null) {
267+
// Has timezone — construct ZonedDateTime to resolve the zone to an offset
268+
ZonedDateTime.from(parsed).toOffsetDateTime()
269+
} else {
270+
// No timezone — assume UTC
271+
LocalDateTime.from(parsed).atOffset(ZoneOffset.UTC)
272+
}
273+
}
274+
275+
/**
276+
* Quick check for ISO-8601-ish format. ITU expects dashes as date separators (e.g.
277+
* "2024-01-15T..."). Non-ISO formats like "2024/01/15", "2024.01.15", or "2024 Jan 15" are
278+
* routed directly to the JDK fallback.
279+
*/
280+
private fun looksLikeIso8601(s: String): Boolean {
281+
// Minimum: "yyyy-MM-dd" = 10 chars
282+
return s.length >= 10 && s[4] == '-' && s[7] == '-'
176283
}
177284

178285
// In theory, we could e.g. Jsons.readTree((value as StringValue).value).
@@ -195,13 +302,4 @@ object AirbyteValueCoercer {
195302
null
196303
}
197304
}
198-
199-
val DATE_TIME_FORMATTER: DateTimeFormatter =
200-
DateTimeFormatter.ofPattern(
201-
"[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d][[' '][G]][[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X][[' '][G]]]]"
202-
)
203-
val TIME_FORMATTER: DateTimeFormatter =
204-
DateTimeFormatter.ofPattern(
205-
"HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]"
206-
)
207305
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (c) 2026 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.data
6+
7+
import java.time.format.DateTimeFormatter
8+
9+
/**
10+
* Lenient DateTimeFormatter patterns used across the CDK for parsing temporal strings from sources.
11+
* These support a wide range of formats (ISO-8601, slashes, dots, abbreviated months, eras, etc.)
12+
* to handle the variety of timestamp representations emitted by different source connectors.
13+
*/
14+
object TemporalFormatters {
15+
val DATE_TIME_FORMATTER: DateTimeFormatter =
16+
DateTimeFormatter.ofPattern(
17+
"[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d][[' '][G]][[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X][[' '][G]]]]"
18+
)
19+
val TIME_FORMATTER: DateTimeFormatter =
20+
DateTimeFormatter.ofPattern(
21+
"HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]"
22+
)
23+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/transform/medium/JsonConverter.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.cdk.load.dataflow.transform.medium
66

77
import io.airbyte.cdk.load.data.AirbyteValue
8+
import io.airbyte.cdk.load.data.AirbyteValueCoercer
89
import io.airbyte.cdk.load.dataflow.config.model.MediumConverterConfig
910
import io.airbyte.cdk.load.dataflow.transform.ValueCoercer
1011
import io.airbyte.cdk.load.dataflow.transform.data.ValidationResultHandler
@@ -15,10 +16,12 @@ class JsonConverter(
1516
private val coercer: ValueCoercer,
1617
private val validationResultHandler: ValidationResultHandler,
1718
private val converterConfig: MediumConverterConfig,
19+
private val airbyteValueCoercer: AirbyteValueCoercer,
1820
) : MediumConverter {
1921
override fun convert(input: ConversionInput): Map<String, AirbyteValue> {
2022
val enriched =
2123
input.msg.asEnrichedDestinationRecordAirbyteValue(
24+
coercer = airbyteValueCoercer,
2225
extractedAtAsTimestampWithTimezone =
2326
converterConfig.extractedAtAsTimestampWithTimezone,
2427
)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationRecordRaw.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ data class DestinationRecordRaw(
4141
* [TimestampWithTimezoneValue]).
4242
*/
4343
fun asEnrichedDestinationRecordAirbyteValue(
44+
coercer: AirbyteValueCoercer,
4445
extractedAtAsTimestampWithTimezone: Boolean = false,
4546
respectLegacyUnions: Boolean = false,
4647
): EnrichedDestinationRecordAirbyteValue {
@@ -64,7 +65,8 @@ data class DestinationRecordRaw(
6465
name = fieldName,
6566
airbyteMetaField = null,
6667
)
67-
AirbyteValueCoercer.coerce(
68+
coercer
69+
.coerce(
6870
fieldValue.toAirbyteValue(),
6971
fieldType.type,
7072
respectLegacyUnions = respectLegacyUnions,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/Meta.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package io.airbyte.cdk.load.message
77
import com.fasterxml.jackson.databind.JsonNode
88
import io.airbyte.cdk.load.data.AirbyteType
99
import io.airbyte.cdk.load.data.AirbyteValue
10-
import io.airbyte.cdk.load.data.AirbyteValueCoercer.DATE_TIME_FORMATTER
1110
import io.airbyte.cdk.load.data.ArrayType
1211
import io.airbyte.cdk.load.data.FieldType
1312
import io.airbyte.cdk.load.data.IntegerType
@@ -16,6 +15,7 @@ import io.airbyte.cdk.load.data.ObjectType
1615
import io.airbyte.cdk.load.data.ObjectValue
1716
import io.airbyte.cdk.load.data.StringType
1817
import io.airbyte.cdk.load.data.StringValue
18+
import io.airbyte.cdk.load.data.TemporalFormatters.DATE_TIME_FORMATTER
1919
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
2020
import io.airbyte.cdk.load.data.json.toAirbyteValue
2121
import io.airbyte.cdk.load.util.deserializeToNode

0 commit comments

Comments
 (0)