Skip to content

Commit 0b2e280

Browse files
authored
fix(bulk cdk): cdc termination condition (#76054)
1 parent 7db08ee commit 0b2e280

File tree

4 files changed

+35
-22
lines changed

4 files changed

+35
-22
lines changed

airbyte-cdk/bulk/core/extract/changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The Extract CDK provides functionality for source connectors including schema di
99

1010
| Version | Date | Pull Request | Subject |
1111
|---------|------------|-----------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
12+
| 1.1.1 | 2026-04-02 | [76054](https://github.com/airbytehq/airbyte/pull/76054) | Clarified CDC termination condition. Ensure querySingleValue returns one result or throws. |
1213
| 1.1.0 | 2026-03-30 | [75636](https://github.com/airbytehq/airbyte/pull/75636) | CDC positions are partially ordered. Added optional CDC startup hook. Field is now EmittedField. |
1314
| 1.0.2 | 2026-02-24 | [74007](https://github.com/airbytehq/airbyte/pull/74007) | Fix infinite loop when cursor-based incremental sync encounters an empty table with prior state by caching NULL cursor upper bound values (toolkit). |
1415
| 1.0.1 | 2026-02-24 | | Bump bulk-cdk-core-base to 1.0.1 to pick up CVE fixes (CVE-2021-47621, CVE-2022-36944). |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=1.1.0
1+
version=1.1.1

airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReader.kt

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -435,20 +435,21 @@ class CdcPartitionReader<T : PartiallyOrdered<T>>(
435435
}
436436
}
437437

438-
if (upperBound.isGreater(currentPosition)) {
439-
return null
440-
}
441-
// Close because the current event is past the sync upper bound.
442-
return when (eventType) {
443-
EventType.TOMBSTONE,
444-
EventType.HEARTBEAT -> CloseReason.HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION
445-
EventType.KEY_JSON_INVALID,
446-
EventType.VALUE_JSON_INVALID,
447-
EventType.RECORD_EMITTED,
448-
EventType.RECORD_DISCARDED_BY_DESERIALIZE,
449-
EventType.RECORD_DISCARDED_BY_STREAM_ID ->
450-
CloseReason.RECORD_REACHED_TARGET_POSITION
438+
if (currentPosition.isGreaterOrEqual(upperBound)) {
439+
// Close because the current event is at or past the sync upper bound.
440+
return when (eventType) {
441+
EventType.TOMBSTONE,
442+
EventType.HEARTBEAT ->
443+
CloseReason.HEARTBEAT_OR_TOMBSTONE_REACHED_TARGET_POSITION
444+
EventType.KEY_JSON_INVALID,
445+
EventType.VALUE_JSON_INVALID,
446+
EventType.RECORD_EMITTED,
447+
EventType.RECORD_DISCARDED_BY_DESERIALIZE,
448+
EventType.RECORD_DISCARDED_BY_STREAM_ID ->
449+
CloseReason.RECORD_REACHED_TARGET_POSITION
450+
}
451451
}
452+
return null // Keep processing.
452453
}
453454

454455
private fun position(sourceRecord: SourceRecord?): T? {

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/SelectQuerier.kt

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,13 @@ class JdbcSelectQuerier(
204204
}
205205
}
206206

207+
private val throwNoResultsIllegalState = {
208+
throw IllegalStateException("Query unexpectedly produced no results")
209+
}
210+
private val throwMultipleResultsIllegalState = {
211+
throw IllegalStateException("Query unexpectedly produced multiple results")
212+
}
213+
207214
/**
208215
* Convenience function for executing a query that is expected to return exactly one row and
209216
* extracting a single value from that row.
@@ -222,19 +229,23 @@ fun <T> querySingleValue(
222229
query: String,
223230
bindParameters: ((PreparedStatement) -> Unit)? = null,
224231
withResultSet: (ResultSet) -> T,
225-
noResultsCase: () -> Unit = {
226-
throw IllegalStateException("Query unexpectedly produced no results: [$query]")
227-
},
228-
multipleResultsCase: () -> Unit = {
229-
throw IllegalStateException("Query unexpectedly produced more than one result: [$query]")
230-
},
232+
noResultsCase: () -> Unit = throwNoResultsIllegalState,
233+
multipleResultsCase: () -> Unit = throwMultipleResultsIllegalState,
231234
): T {
232235
jdbcConnectionFactory.get().use { connection ->
233236
connection.prepareStatement(query).use { stmt ->
234237
bindParameters?.invoke(stmt)
235238
stmt.executeQuery().use { rs ->
236-
if (!rs.next()) noResultsCase()
237-
if (!rs.isLast) multipleResultsCase()
239+
if (!rs.next()) {
240+
noResultsCase()
241+
// if non-throwing noResultsCase was supplied, we still need to throw
242+
throwNoResultsIllegalState()
243+
}
244+
if (!rs.isLast) {
245+
multipleResultsCase()
246+
// if non-throwing multipleResultsCase was supplied, we still need to throw
247+
throwMultipleResultsIllegalState()
248+
}
238249
return withResultSet(rs)
239250
}
240251
}

0 commit comments

Comments
 (0)