Skip to content

Commit 8292bb2

Browse files
fix(extract CDK): User defined cursor Incremental partition (#74124)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
1 parent b22321b commit 8292bb2

File tree

7 files changed

+179
-145
lines changed

7 files changed

+179
-145
lines changed

airbyte-cdk/bulk/core/extract/changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The Extract CDK provides functionality for source connectors including schema di
99

1010
| Version | Date | Pull Request | Subject |
1111
|---------|------------|-----------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
12+
| 1.1.2 | 2026-03-31 | [74124](https://github.com/airbytehq/airbyte/pull/74124) | Change the default implementation of JDBC incremental partition to non splittable |
1213
| 1.1.1 | 2026-04-02 | [76054](https://github.com/airbytehq/airbyte/pull/76054) | Clarified CDC termination condition. Ensure querySingleValue returns one result or throws. |
1314
| 1.1.0 | 2026-03-30 | [75636](https://github.com/airbytehq/airbyte/pull/75636) | CDC positions are partially ordered. Added optional CDC startup hook. Field is now EmittedField. |
1415
| 1.0.2 | 2026-02-24 | [74007](https://github.com/airbytehq/airbyte/pull/74007) | Fix infinite loop when cursor-based incremental sync encounters an empty table with prior state by caching NULL cursor upper bound values (toolkit). |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=1.1.1
1+
version=1.1.2

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/DefaultJdbcPartition.kt

Lines changed: 111 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,65 @@ import io.airbyte.cdk.discover.EmittedField
1010
import io.airbyte.cdk.output.sockets.toJson
1111
import io.airbyte.cdk.util.Jsons
1212

13+
/** Builds a WHERE clause from lower/upper bounds on checkpoint columns. */
14+
fun buildWhereClause(
15+
checkpointColumns: List<EmittedField>,
16+
lowerBound: List<JsonNode>?,
17+
upperBound: List<JsonNode>?,
18+
isLowerBoundIncluded: Boolean,
19+
): WhereNode {
20+
val zippedLowerBound: List<Pair<EmittedField, JsonNode>> =
21+
lowerBound?.let { checkpointColumns.zip(it) } ?: listOf()
22+
val lowerBoundDisj: List<WhereClauseNode> =
23+
zippedLowerBound.mapIndexed { idx: Int, (gtCol: EmittedField, gtValue: JsonNode) ->
24+
val lastLeaf: WhereClauseLeafNode =
25+
if (isLowerBoundIncluded && idx == checkpointColumns.size - 1) {
26+
GreaterOrEqual(gtCol, gtValue)
27+
} else {
28+
Greater(gtCol, gtValue)
29+
}
30+
And(
31+
zippedLowerBound.take(idx).map { (eqCol: EmittedField, eqValue: JsonNode) ->
32+
Equal(eqCol, eqValue)
33+
} + listOf(lastLeaf),
34+
)
35+
}
36+
val zippedUpperBound: List<Pair<EmittedField, JsonNode>> =
37+
upperBound?.let { checkpointColumns.zip(it) } ?: listOf()
38+
val upperBoundDisj: List<WhereClauseNode> =
39+
zippedUpperBound.mapIndexed { idx: Int, (leqCol: EmittedField, leqValue: JsonNode) ->
40+
val lastLeaf: WhereClauseLeafNode =
41+
if (idx < zippedUpperBound.size - 1) {
42+
Lesser(leqCol, leqValue)
43+
} else {
44+
LesserOrEqual(leqCol, leqValue)
45+
}
46+
And(
47+
zippedUpperBound.take(idx).map { (eqCol: EmittedField, eqValue: JsonNode) ->
48+
Equal(eqCol, eqValue)
49+
} + listOf(lastLeaf),
50+
)
51+
}
52+
// Don't create WHERE clauses when there are no bounds
53+
if (lowerBoundDisj.isEmpty() && upperBoundDisj.isEmpty()) {
54+
return NoWhere
55+
}
56+
57+
// Build WHERE clause components only for non-empty bounds
58+
val clauses = mutableListOf<WhereClauseNode>()
59+
if (lowerBoundDisj.isNotEmpty()) {
60+
clauses.add(Or(lowerBoundDisj))
61+
}
62+
if (upperBoundDisj.isNotEmpty()) {
63+
clauses.add(Or(upperBoundDisj))
64+
}
65+
66+
return when (clauses.size) {
67+
1 -> Where(clauses.first())
68+
else -> Where(And(clauses))
69+
}
70+
}
71+
1372
/** Base class for default implementations of [JdbcPartition]. */
1473
sealed class DefaultJdbcPartition(
1574
val selectQueryGenerator: SelectQueryGenerator,
@@ -28,7 +87,7 @@ sealed class DefaultJdbcUnsplittablePartition(
2887
override val nonResumableQuery: SelectQuery
2988
get() = selectQueryGenerator.generate(nonResumableQuerySpec.optimize())
3089

31-
val nonResumableQuerySpec = SelectQuerySpec(SelectColumns(stream.fields), from)
90+
open val nonResumableQuerySpec = SelectQuerySpec(SelectColumns(stream.fields), from)
3291

3392
override fun samplingQuery(sampleRateInvPow2: Int): SelectQuery {
3493
val sampleSize: Int = streamState.sharedState.maxSampleSize
@@ -117,59 +176,7 @@ sealed class DefaultJdbcSplittablePartition(
117176
}
118177

119178
val where: WhereNode
120-
get() {
121-
val zippedLowerBound: List<Pair<EmittedField, JsonNode>> =
122-
lowerBound?.let { checkpointColumns.zip(it) } ?: listOf()
123-
val lowerBoundDisj: List<WhereClauseNode> =
124-
zippedLowerBound.mapIndexed { idx: Int, (gtCol: EmittedField, gtValue: JsonNode) ->
125-
val lastLeaf: WhereClauseLeafNode =
126-
if (isLowerBoundIncluded && idx == checkpointColumns.size - 1) {
127-
GreaterOrEqual(gtCol, gtValue)
128-
} else {
129-
Greater(gtCol, gtValue)
130-
}
131-
And(
132-
zippedLowerBound.take(idx).map { (eqCol: EmittedField, eqValue: JsonNode) ->
133-
Equal(eqCol, eqValue)
134-
} + listOf(lastLeaf),
135-
)
136-
}
137-
val zippedUpperBound: List<Pair<EmittedField, JsonNode>> =
138-
upperBound?.let { checkpointColumns.zip(it) } ?: listOf()
139-
val upperBoundDisj: List<WhereClauseNode> =
140-
zippedUpperBound.mapIndexed { idx: Int, (leqCol: EmittedField, leqValue: JsonNode)
141-
->
142-
val lastLeaf: WhereClauseLeafNode =
143-
if (idx < zippedUpperBound.size - 1) {
144-
Lesser(leqCol, leqValue)
145-
} else {
146-
LesserOrEqual(leqCol, leqValue)
147-
}
148-
And(
149-
zippedUpperBound.take(idx).map { (eqCol: EmittedField, eqValue: JsonNode) ->
150-
Equal(eqCol, eqValue)
151-
} + listOf(lastLeaf),
152-
)
153-
}
154-
// Don't create WHERE clauses when there are no bounds
155-
if (lowerBoundDisj.isEmpty() && upperBoundDisj.isEmpty()) {
156-
return NoWhere
157-
}
158-
159-
// Build WHERE clause components only for non-empty bounds
160-
val clauses = mutableListOf<WhereClauseNode>()
161-
if (lowerBoundDisj.isNotEmpty()) {
162-
clauses.add(Or(lowerBoundDisj))
163-
}
164-
if (upperBoundDisj.isNotEmpty()) {
165-
clauses.add(Or(upperBoundDisj))
166-
}
167-
168-
return when (clauses.size) {
169-
1 -> Where(clauses.first())
170-
else -> Where(And(clauses))
171-
}
172-
}
179+
get() = buildWhereClause(checkpointColumns, lowerBound, upperBound, isLowerBoundIncluded)
173180

174181
open val isLowerBoundIncluded: Boolean = false
175182
}
@@ -272,6 +279,56 @@ class DefaultJdbcSplittableSnapshotWithCursorPartition(
272279
)
273280
}
274281

282+
class DefaultUnsplittableJdbcCursorIncrementalPartition(
283+
selectQueryGenerator: SelectQueryGenerator,
284+
streamState: DefaultJdbcStreamState,
285+
val cursor: EmittedField,
286+
val cursorLowerBound: JsonNode,
287+
val isLowerBoundIncluded: Boolean,
288+
val explicitCursorUpperBound: JsonNode?,
289+
) :
290+
JdbcCursorPartition<DefaultJdbcStreamState>,
291+
DefaultJdbcUnsplittablePartition(selectQueryGenerator, streamState) {
292+
293+
val cursorUpperBound: JsonNode
294+
get() = explicitCursorUpperBound ?: streamState.cursorUpperBound ?: Jsons.nullNode()
295+
296+
override val cursorUpperBoundQuery: SelectQuery
297+
get() = selectQueryGenerator.generate(cursorUpperBoundQuerySpec.optimize())
298+
299+
val cursorUpperBoundQuerySpec = SelectQuerySpec(SelectColumnMaxValue(cursor), from)
300+
301+
val lowerBound: List<JsonNode> = listOf(cursorLowerBound)
302+
val upperBound: List<JsonNode>
303+
get() = listOf(cursorUpperBound)
304+
305+
override val completeState: OpaqueStateValue
306+
get() =
307+
DefaultJdbcStreamStateValue.cursorIncrementalCheckpoint(
308+
cursor,
309+
cursorCheckpoint = cursorUpperBound,
310+
)
311+
312+
override val nonResumableQuerySpec: SelectQuerySpec
313+
get() = SelectQuerySpec(SelectColumns(stream.fields), from, where)
314+
315+
val checkpointColumns: List<EmittedField> = listOf(cursor)
316+
val where: WhereNode
317+
get() = buildWhereClause(checkpointColumns, lowerBound, upperBound, isLowerBoundIncluded)
318+
319+
override fun samplingQuery(sampleRateInvPow2: Int): SelectQuery {
320+
val sampleSize: Int = streamState.sharedState.maxSampleSize
321+
val querySpec =
322+
SelectQuerySpec(
323+
SelectColumns(stream.fields + checkpointColumns),
324+
FromSample(stream.name, stream.namespace, sampleRateInvPow2, sampleSize, where),
325+
NoWhere, // WHERE is already in FromSample, don't duplicate in outer query
326+
OrderBy(checkpointColumns),
327+
)
328+
return selectQueryGenerator.generate(querySpec.optimize())
329+
}
330+
}
331+
275332
/**
276333
* Default implementation of a [JdbcPartition] for a cursor incremental partition. These are always
277334
* splittable.

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactory.kt

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,13 @@ class DefaultJdbcPartitionFactory(
118118
null
119119
} else {
120120
// Incremental ongoing.
121-
DefaultJdbcCursorIncrementalPartition(
121+
DefaultUnsplittableJdbcCursorIncrementalPartition(
122122
selectQueryGenerator,
123123
streamState,
124124
cursor,
125125
cursorLowerBound = cursorCheckpoint,
126126
isLowerBoundIncluded = true,
127-
cursorUpperBound = streamState.cursorUpperBound,
127+
explicitCursorUpperBound = streamState.cursorUpperBound,
128128
)
129129
}
130130
}
@@ -222,10 +222,7 @@ class DefaultJdbcPartitionFactory(
222222
unsplitPartition.split(splitPartitionBoundaries)
223223
is DefaultJdbcSplittableSnapshotWithCursorPartition ->
224224
unsplitPartition.split(splitPartitionBoundaries)
225-
is DefaultJdbcCursorIncrementalPartition ->
226-
unsplitPartition.split(splitPartitionBoundaries)
227-
is DefaultJdbcUnsplittableSnapshotPartition -> listOf(unsplitPartition)
228-
is DefaultJdbcUnsplittableSnapshotWithCursorPartition -> listOf(unsplitPartition)
225+
else -> listOf(unsplitPartition)
229226
}
230227
}
231228

@@ -266,22 +263,4 @@ class DefaultJdbcPartitionFactory(
266263
)
267264
}
268265
}
269-
270-
private fun DefaultJdbcCursorIncrementalPartition.split(
271-
splitPointValues: List<DefaultJdbcStreamStateValue>
272-
): List<DefaultJdbcCursorIncrementalPartition> {
273-
val inners: List<JsonNode> = splitPointValues.mapNotNull { it.cursorPair(stream)?.second }
274-
val lbs: List<JsonNode> = listOf(cursorLowerBound) + inners
275-
val ubs: List<JsonNode> = inners + listOf(cursorUpperBound)
276-
return lbs.zip(ubs).mapIndexed { idx: Int, (lowerBound, upperBound) ->
277-
DefaultJdbcCursorIncrementalPartition(
278-
selectQueryGenerator,
279-
streamState,
280-
cursor,
281-
lowerBound,
282-
isLowerBoundIncluded = idx == 0,
283-
upperBound,
284-
)
285-
}
286-
}
287266
}

airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactoryTest.kt

Lines changed: 13 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,8 @@ class DefaultJdbcPartitionFactoryTest {
386386
// Check snapshot termination criteria and transition to cursor-based incremental
387387
val finalResult = factory.create(stream.bootstrap(opaqueStateValue(cursor = cursorValue)))
388388
factory.assertFailures()
389-
Assertions.assertTrue(finalResult is DefaultJdbcCursorIncrementalPartition)
390-
val finalPartition = finalResult as DefaultJdbcCursorIncrementalPartition
389+
Assertions.assertTrue(finalResult is DefaultUnsplittableJdbcCursorIncrementalPartition)
390+
val finalPartition = finalResult as DefaultUnsplittableJdbcCursorIncrementalPartition
391391
sanityCheck(stream, factory, finalPartition)
392392
Assertions.assertEquals(ts, finalPartition.cursor)
393393
Assertions.assertEquals(LocalDateCodec.encode(cursorValue), finalPartition.cursorLowerBound)
@@ -399,8 +399,8 @@ class DefaultJdbcPartitionFactoryTest {
399399
val factory = sharedState().factory()
400400
val result = factory.create(stream.bootstrap(opaqueStateValue(cursor = cursorValue)))
401401
factory.assertFailures()
402-
Assertions.assertTrue(result is DefaultJdbcCursorIncrementalPartition)
403-
val partition = result as DefaultJdbcCursorIncrementalPartition
402+
Assertions.assertTrue(result is DefaultUnsplittableJdbcCursorIncrementalPartition)
403+
val partition = result as DefaultUnsplittableJdbcCursorIncrementalPartition
404404
val cursorUpperBound = cursorValue.plusMonths(1)
405405
partition.streamState.cursorUpperBound = LocalDateCodec.encode(cursorUpperBound)
406406
// Check partition properties
@@ -428,22 +428,6 @@ class DefaultJdbcPartitionFactoryTest {
428428
),
429429
)
430430
)
431-
partition
432-
.resumableQuery(limit = 10L)
433-
.assertQueryEquals(
434-
SelectQuerySpec(
435-
SelectColumns(id, ts, msg),
436-
From(stream.name, stream.namespace),
437-
Where(
438-
And(
439-
GreaterOrEqual(ts, LocalDateCodec.encode(cursorValue)),
440-
LesserOrEqual(ts, LocalDateCodec.encode(cursorUpperBound))
441-
),
442-
),
443-
OrderBy(ts),
444-
Limit(10L)
445-
)
446-
)
447431
partition
448432
.samplingQuery(sampleRateInvPow2 = 8)
449433
.assertQueryEquals(
@@ -467,15 +451,12 @@ class DefaultJdbcPartitionFactoryTest {
467451
)
468452
// Check state generation
469453
partition.completeState.assertJsonEquals(opaqueStateValue(cursor = cursorUpperBound))
470-
partition
471-
.incompleteState(record(cursor = cursorValue.plusDays(1)))
472-
.assertJsonEquals(opaqueStateValue(cursor = cursorValue.plusDays(1)))
473454
// Check that subsequent non-terminal partition includes the lower bound
474455
val nextResult =
475456
factory.create(stream.bootstrap(opaqueStateValue(cursor = cursorValue.plusDays(1))))
476457
factory.assertFailures()
477-
Assertions.assertTrue(nextResult is DefaultJdbcCursorIncrementalPartition)
478-
val nextPartition = nextResult as DefaultJdbcCursorIncrementalPartition
458+
Assertions.assertTrue(nextResult is DefaultUnsplittableJdbcCursorIncrementalPartition)
459+
val nextPartition = nextResult as DefaultUnsplittableJdbcCursorIncrementalPartition
479460
sanityCheck(stream, factory, nextPartition)
480461
Assertions.assertTrue(nextPartition.isLowerBoundIncluded)
481462
// Check termination criteria
@@ -491,23 +472,13 @@ class DefaultJdbcPartitionFactoryTest {
491472
partition,
492473
listOf(opaqueStateValue(cursor = boundary1), opaqueStateValue(cursor = boundary2)),
493474
)
494-
val splits: List<DefaultJdbcCursorIncrementalPartition> =
495-
rawSplits.filterIsInstance<DefaultJdbcCursorIncrementalPartition>()
496-
Assertions.assertIterableEquals(rawSplits, splits)
497-
splits.forEach {
498-
sanityCheck(stream, factory, it)
499-
Assertions.assertEquals(ts, it.cursor)
500-
}
501-
Assertions.assertEquals(3, splits.size)
502-
Assertions.assertEquals(LocalDateCodec.encode(cursorValue), splits[0].cursorLowerBound)
503-
Assertions.assertTrue(splits[0].isLowerBoundIncluded)
504-
Assertions.assertEquals(LocalDateCodec.encode(boundary1), splits[0].cursorUpperBound)
505-
Assertions.assertEquals(LocalDateCodec.encode(boundary1), splits[1].cursorLowerBound)
506-
Assertions.assertFalse(splits[1].isLowerBoundIncluded)
507-
Assertions.assertEquals(LocalDateCodec.encode(boundary2), splits[1].cursorUpperBound)
508-
Assertions.assertEquals(LocalDateCodec.encode(boundary2), splits[2].cursorLowerBound)
509-
Assertions.assertFalse(splits[2].isLowerBoundIncluded)
510-
Assertions.assertEquals(LocalDateCodec.encode(cursorUpperBound), splits[2].cursorUpperBound)
475+
Assertions.assertEquals(1, rawSplits.size)
476+
val split = rawSplits[0] as DefaultUnsplittableJdbcCursorIncrementalPartition
477+
sanityCheck(stream, factory, split)
478+
Assertions.assertEquals(ts, split.cursor)
479+
Assertions.assertEquals(LocalDateCodec.encode(cursorValue), split.cursorLowerBound)
480+
Assertions.assertTrue(split.isLowerBoundIncluded)
481+
Assertions.assertEquals(LocalDateCodec.encode(cursorUpperBound), split.cursorUpperBound)
511482
}
512483

513484
fun sanityCheck(

0 commit comments

Comments
 (0)