Skip to content

Commit 5233c10

Browse files
authored
fix(bulk-cdk): catalog validation failure results in stream failure (#76326)
1 parent 32dbfcc commit 5233c10

6 files changed

Lines changed: 114 additions & 67 deletions

File tree

airbyte-cdk/bulk/core/extract/changelog.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@ The Extract CDK provides functionality for source connectors including schema di
77
<details>
88
<summary>Expand to review</summary>
99

10-
| Version | Date | Pull Request | Subject |
11-
|---------|------------|-----------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
12-
| 1.1.2 | 2026-03-31 | [74124](https://github.com/airbytehq/airbyte/pull/74124) | Change the default implementation of JDBC incremental partition to non splittable |
13-
| 1.1.1 | 2026-04-02 | [76054](https://github.com/airbytehq/airbyte/pull/76054) | Clarified CDC termination condition. Ensure querySingleValue returns one result or throws. |
14-
| 1.1.0 | 2026-03-30 | [75636](https://github.com/airbytehq/airbyte/pull/75636) | CDC positions are partially ordered. Added optional CDC startup hook. Field is now EmittedField. |
15-
| 1.0.2 | 2026-02-24 | [74007](https://github.com/airbytehq/airbyte/pull/74007) | Fix infinite loop when cursor-based incremental sync encounters an empty table with prior state by caching NULL cursor upper bound values (toolkit). |
16-
| 1.0.1 | 2026-02-24 | | Bump bulk-cdk-core-base to 1.0.1 to pick up CVE fixes (CVE-2021-47621, CVE-2022-36944). |
17-
| 1.0.0 | 2026-02-02 | [#72376](https://github.com/airbytehq/airbyte/pull/72376) | Initial independent release of bulk-cdk-core-extract. Separate versioning for extract package begins. |
10+
| Version | Date | Pull Request | Subject |
11+
|---------|------------|------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
12+
| 1.1.3 | 2026-04-14 | [76320](https://github.com/airbytehq/airbyte/pull/76320) | Catalog validation failures are captured as stream-level sync failures. |
13+
| 1.1.2 | 2026-03-31 | [74124](https://github.com/airbytehq/airbyte/pull/74124) | Change the default implementation of JDBC incremental partition to non splittable |
14+
| 1.1.1 | 2026-04-02 | [76054](https://github.com/airbytehq/airbyte/pull/76054) | Clarified CDC termination condition. Ensure querySingleValue returns one result or throws. |
15+
| 1.1.0 | 2026-03-30 | [75636](https://github.com/airbytehq/airbyte/pull/75636) | CDC positions are partially ordered. Added optional CDC startup hook. Field is now EmittedField. |
16+
| 1.0.2 | 2026-02-24 | [74007](https://github.com/airbytehq/airbyte/pull/74007) | Fix infinite loop when cursor-based incremental sync encounters an empty table with prior state by caching NULL cursor upper bound values (toolkit). |
17+
| 1.0.1 | 2026-02-24 | | Bump bulk-cdk-core-base to 1.0.1 to pick up CVE fixes (CVE-2021-47621, CVE-2022-36944). |
18+
| 1.0.0 | 2026-02-02 | [#72376](https://github.com/airbytehq/airbyte/pull/72376) | Initial independent release of bulk-cdk-core-extract. Separate versioning for extract package begins. |
1819

1920
</details>
2021

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output/CatalogValidationFailureHandler.kt

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
package io.airbyte.cdk.output
33

44
import io.airbyte.cdk.StreamIdentifier
5+
import io.airbyte.cdk.asProtocolStreamDescriptor
56
import io.airbyte.cdk.data.AirbyteSchemaType
7+
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
68
import io.github.oshai.kotlinlogging.KotlinLogging
79
import io.micronaut.context.annotation.DefaultImplementation
810
import jakarta.inject.Singleton
@@ -21,81 +23,86 @@ interface CatalogValidationFailureHandler : Consumer<CatalogValidationFailure>
2123
/** Union type for all validation failures. */
2224
sealed interface CatalogValidationFailure {
2325
val streamID: StreamIdentifier
26+
val message: String
27+
28+
fun asErrorTrace(): AirbyteErrorTraceMessage? =
29+
AirbyteErrorTraceMessage()
30+
.withStreamDescriptor(streamID.asProtocolStreamDescriptor())
31+
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
32+
.withMessage(message)
2433
}
2534

2635
data class StreamNotFound(
2736
override val streamID: StreamIdentifier,
28-
) : CatalogValidationFailure
37+
) : CatalogValidationFailure {
38+
override val message = "Stream '$streamID' not found or not accessible in source."
39+
}
2940

3041
data class MultipleStreamsFound(
3142
override val streamID: StreamIdentifier,
32-
) : CatalogValidationFailure
43+
) : CatalogValidationFailure {
44+
override val message = "Multiple matching streams found for '$streamID' in source."
45+
}
3346

3447
data class StreamHasNoFields(
3548
override val streamID: StreamIdentifier,
36-
) : CatalogValidationFailure
49+
) : CatalogValidationFailure {
50+
override val message = "Stream '$streamID' has no accessible data fields."
51+
}
3752

3853
data class FieldNotFound(
3954
override val streamID: StreamIdentifier,
4055
val fieldName: String,
41-
) : CatalogValidationFailure
56+
) : CatalogValidationFailure {
57+
override val message = "Field '$fieldName' not found in stream '$streamID'."
58+
}
4259

4360
data class FieldTypeMismatch(
4461
override val streamID: StreamIdentifier,
4562
val fieldName: String,
4663
val expected: AirbyteSchemaType,
4764
val actual: AirbyteSchemaType,
48-
) : CatalogValidationFailure
65+
) : CatalogValidationFailure {
66+
override val message =
67+
"Field '$fieldName' in stream '$streamID' has type $actual in source but catalog expects $expected."
68+
}
4969

5070
data class InvalidPrimaryKey(
5171
override val streamID: StreamIdentifier,
5272
val primaryKey: List<String>,
53-
) : CatalogValidationFailure
73+
) : CatalogValidationFailure {
74+
override val message = "Primary key $primaryKey not found in stream '$streamID'."
75+
}
5476

5577
data class InvalidCursor(
5678
override val streamID: StreamIdentifier,
5779
val cursor: String,
58-
) : CatalogValidationFailure
80+
) : CatalogValidationFailure {
81+
override val message = "Cursor '$cursor' not found in stream '$streamID'."
82+
}
5983

6084
data class InvalidIncrementalSyncMode(
6185
override val streamID: StreamIdentifier,
62-
) : CatalogValidationFailure
86+
) : CatalogValidationFailure {
87+
override val message =
88+
"Stream '$streamID' has no cursor configured for incremental sync; falling back to full refresh."
89+
}
6390

6491
data class ResetStream(
6592
override val streamID: StreamIdentifier,
66-
) : CatalogValidationFailure
93+
) : CatalogValidationFailure {
94+
override val message = "Resetting stream '$streamID'."
95+
override fun asErrorTrace(): AirbyteErrorTraceMessage? = null
96+
}
6797

6898
private val log = KotlinLogging.logger {}
6999

70100
@Singleton
71-
private class LoggingCatalogValidationFailureHandler : CatalogValidationFailureHandler {
101+
private class LoggingCatalogValidationFailureHandler(
102+
val outputConsumer: OutputConsumer,
103+
) : CatalogValidationFailureHandler {
72104
override fun accept(f: CatalogValidationFailure) {
73-
when (f) {
74-
is FieldNotFound ->
75-
log.warn { "In stream ${f.prettyName()}: field '${f.fieldName}' not found." }
76-
is FieldTypeMismatch ->
77-
log.warn {
78-
"In stream ${f.prettyName()}: " +
79-
"field '${f.fieldName}' is ${f.actual} but catalog expects ${f.expected}."
80-
}
81-
is StreamHasNoFields -> log.warn { "In stream ${f.prettyName()}: no data fields found" }
82-
is InvalidCursor ->
83-
log.warn { "In stream ${f.prettyName()}: invalid cursor '${f.cursor}'." }
84-
is InvalidPrimaryKey ->
85-
log.warn { "In stream ${f.prettyName()}: invalid primary key '${f.primaryKey}'." }
86-
is InvalidIncrementalSyncMode ->
87-
log.warn { "In stream ${f.prettyName()}: incremental sync not possible." }
88-
is MultipleStreamsFound ->
89-
log.warn { "Multiple matching streams found for ${f.prettyName()}." }
90-
is ResetStream -> log.warn { "Resetting stream ${f.prettyName()}." }
91-
is StreamNotFound -> log.warn { "No matching stream found for name ${f.prettyName()}." }
92-
}
105+
log.warn { f.message }
106+
f.asErrorTrace()?.let { outputConsumer.accept(it) }
93107
}
94-
95-
private fun CatalogValidationFailure.prettyName(): String =
96-
if (streamID.namespace == null) {
97-
"'${streamID.name}' in unspecified namespace"
98-
} else {
99-
"'${streamID.name}' in namespace '${streamID.namespace}'"
100-
}
101108
}

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManagerFactory.kt

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ import io.airbyte.cdk.output.OutputConsumer
3434
import io.airbyte.cdk.output.StreamHasNoFields
3535
import io.airbyte.cdk.output.StreamNotFound
3636
import io.airbyte.cdk.output.sockets.DATA_CHANNEL_PROPERTY_PREFIX
37-
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage
3837
import io.airbyte.protocol.models.v0.AirbyteStream
38+
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
39+
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
3940
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
4041
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
4142
import io.airbyte.protocol.models.v0.SyncMode
@@ -61,10 +62,36 @@ class StateManagerFactory(
6162
configuredCatalog: ConfiguredAirbyteCatalog,
6263
inputState: InputState,
6364
): StateManager {
65+
val droppedStreamIDs = mutableListOf<StreamIdentifier>()
6466
val allStreams: List<Stream> =
6567
metadataQuerierFactory.session(config).use { mq ->
66-
configuredCatalog.streams.mapNotNull { toStream(mq, it) }
68+
configuredCatalog.streams.mapNotNull { configuredStream ->
69+
val streamID = StreamIdentifier.from(configuredStream.stream)
70+
toStream(mq, configuredStream)
71+
?: run {
72+
droppedStreamIDs += streamID
73+
outputConsumer.accept(
74+
AirbyteStreamStatusTraceMessage()
75+
.withStreamDescriptor(streamID.asProtocolStreamDescriptor())
76+
.withStatus(AirbyteStreamStatus.STARTED),
77+
)
78+
outputConsumer.accept(
79+
AirbyteStreamStatusTraceMessage()
80+
.withStreamDescriptor(streamID.asProtocolStreamDescriptor())
81+
.withStatus(AirbyteStreamStatus.INCOMPLETE),
82+
)
83+
null
84+
}
85+
}
6786
}
87+
if (config.global && droppedStreamIDs.isNotEmpty()) {
88+
throw ConfigErrorException(
89+
"CDC sync aborted: streams ${droppedStreamIDs.map { it.toString() }} could not be " +
90+
"read due to catalog validation errors. Advancing the global CDC position " +
91+
"would cause permanent data loss for these streams. Please resolve the " +
92+
"errors above and retry.",
93+
)
94+
}
6895
return if (config.global) {
6996
when (inputState) {
7097
is StreamInputState ->
@@ -168,27 +195,14 @@ class StateManagerFactory(
168195
val streamID: StreamIdentifier = StreamIdentifier.from(configuredStream.stream)
169196
val name: String = streamID.name
170197
val namespace: String? = streamID.namespace
171-
val streamLabel: String = streamID.toString()
172198
when (metadataQuerier.streamNames(namespace).filter { it.name == name }.size) {
173199
0 -> {
174200
handler.accept(StreamNotFound(streamID))
175-
outputConsumer.accept(
176-
AirbyteErrorTraceMessage()
177-
.withStreamDescriptor(streamID.asProtocolStreamDescriptor())
178-
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
179-
.withMessage("Stream '$streamLabel' not found or not accessible in source.")
180-
)
181201
return null
182202
}
183203
1 -> Unit
184204
else -> {
185205
handler.accept(MultipleStreamsFound(streamID))
186-
outputConsumer.accept(
187-
AirbyteErrorTraceMessage()
188-
.withStreamDescriptor(streamID.asProtocolStreamDescriptor())
189-
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
190-
.withMessage("Multiple streams '$streamLabel' found in source.")
191-
)
192206
return null
193207
}
194208
}
@@ -232,12 +246,6 @@ class StateManagerFactory(
232246
}
233247
if (streamFields.isEmpty()) {
234248
handler.accept(StreamHasNoFields(streamID))
235-
outputConsumer.accept(
236-
AirbyteErrorTraceMessage()
237-
.withStreamDescriptor(streamID.asProtocolStreamDescriptor())
238-
.withFailureType(AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
239-
.withMessage("Stream '$streamLabel' has no accessible fields.")
240-
)
241249
return null
242250
}
243251

airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/output/DelegatingCatalogValidationFailureHandler.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ class DelegatingCatalogValidationFailureHandler(
2121
outputConsumer.accept(
2222
AirbyteLogMessage().withLevel(AirbyteLogMessage.Level.WARN).withMessage(f.toString()),
2323
)
24+
f.asErrorTrace()?.let { outputConsumer.accept(it) }
2425
}
2526
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=1.1.2
1+
version=1.1.3

airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/resources/h2source/expected-messages-stream-bad-catalog.json

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,35 @@
2020
"failure_type": "config_error"
2121
}
2222
}
23+
},
24+
{
25+
"type": "TRACE",
26+
"trace": {
27+
"type": "STREAM_STATUS",
28+
"emitted_at": 3.1336416e12,
29+
"stream_status": {
30+
"stream_descriptor": {
31+
"name": "FOO",
32+
"namespace": "PUBLIC"
33+
},
34+
"status": "STARTED",
35+
"reasons": []
36+
}
37+
}
38+
},
39+
{
40+
"type": "TRACE",
41+
"trace": {
42+
"type": "STREAM_STATUS",
43+
"emitted_at": 3.1336416e12,
44+
"stream_status": {
45+
"stream_descriptor": {
46+
"name": "FOO",
47+
"namespace": "PUBLIC"
48+
},
49+
"status": "INCOMPLETE",
50+
"reasons": []
51+
}
52+
}
2353
}
2454
]

0 commit comments

Comments
 (0)