Skip to content

Commit d27e01b

Browse files
authored
feat(bulk-cdk): changes for source-postgres rewrite (#75636)
1 parent 93d12db commit d27e01b

File tree

82 files changed

+1128
-586
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+1128
-586
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
dependencies {
2-
api "io.airbyte.bulk-cdk:bulk-cdk-core-base:1.0.1"
2+
api "io.airbyte.bulk-cdk:bulk-cdk-core-base:1.0.2"
33
implementation 'org.apache.commons:commons-lang3:3.17.0'
44
implementation 'hu.webarticum:tree-printer:3.2.1'
55

6+
// TODO: Use a published version. Use a variable to share the version with the api dependency.
67
testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base'))
78
}

airbyte-cdk/bulk/core/extract/changelog.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ The Extract CDK provides functionality for source connectors including schema di
77
<details>
88
<summary>Expand to review</summary>
99

10-
| Version | Date | Pull Request | Subject |
11-
|---------|------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
12-
| 1.0.2 | 2026-02-24 | [74007](https://github.com/airbytehq/airbyte/pull/74007) | Fix infinite loop when cursor-based incremental sync encounters an empty table with prior state by caching NULL cursor upper bound values (toolkit). |
13-
| 1.0.1 | 2026-02-24 | | Bump bulk-cdk-core-base to 1.0.1 to pick up CVE fixes (CVE-2021-47621, CVE-2022-36944). |
14-
| 1.0.0 | 2026-02-02 | [#72376](https://github.com/airbytehq/airbyte/pull/72376) | Initial independent release of bulk-cdk-core-extract. Separate versioning for extract package begins. |
10+
| Version | Date | Pull Request | Subject |
11+
|---------|------------|-----------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
12+
| 1.1.0 | 2026-03-30 | [75636](https://github.com/airbytehq/airbyte/pull/75636) | CDC positions are partially ordered. Added optional CDC startup hook. Field is now EmittedField. |
13+
| 1.0.2 | 2026-02-24 | [74007](https://github.com/airbytehq/airbyte/pull/74007) | Fix infinite loop when cursor-based incremental sync encounters an empty table with prior state by caching NULL cursor upper bound values (toolkit). |
14+
| 1.0.1 | 2026-02-24 | | Bump bulk-cdk-core-base to 1.0.1 to pick up CVE fixes (CVE-2021-47621, CVE-2022-36944). |
15+
| 1.0.0 | 2026-02-02 | [#72376](https://github.com/airbytehq/airbyte/pull/72376) | Initial independent release of bulk-cdk-core-extract. Separate versioning for extract package begins. |
1516

1617
</details>
1718

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoverOperation.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class DiscoverOperation(
2828
listOf<String?>(null) + metadataQuerier.streamNamespaces()
2929
for (namespace in namespaces) {
3030
for (streamID in metadataQuerier.streamNames(namespace)) {
31-
val fields: List<Field> = metadataQuerier.fields(streamID)
31+
val fields: List<EmittedField> = metadataQuerier.fields(streamID)
3232
if (fields.isEmpty()) {
3333
log.info {
3434
"Ignoring stream '${streamID.name}' in '${namespace ?: ""}' because no fields were discovered."

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/DiscoveredStream.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ import io.airbyte.cdk.StreamIdentifier
88

99
data class DiscoveredStream(
1010
val id: StreamIdentifier,
11-
val columns: List<Field>,
11+
val columns: List<EmittedField>,
1212
val primaryKeyColumnIDs: List<List<String>>,
1313
)

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/Field.kt

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,16 @@ import io.airbyte.cdk.data.OffsetDateTimeCodec
1313
import java.time.OffsetDateTime
1414

1515
/** Internal equivalent of a [io.airbyte.protocol.models.Field]. */
16-
sealed interface FieldOrMetaField {
16+
sealed interface DataOrMetaField {
1717
val id: String
1818
val type: FieldType
1919
}
2020

21+
@Deprecated(
22+
message = "Use `DataOrMetaField` directly instead.",
23+
replaceWith = ReplaceWith("DataOrMetaField")
24+
)
25+
typealias FieldOrMetaField = DataOrMetaField
2126
/**
2227
* Root of our own type hierarchy for Airbyte record fields.
2328
*
@@ -40,20 +45,32 @@ interface LosslessFieldType : FieldType {
4045
val jsonDecoder: JsonDecoder<*>
4146
}
4247

48+
interface DataField : DataOrMetaField
49+
50+
@Deprecated(
51+
message = "Use `EmittedField` directly instead.",
52+
replaceWith = ReplaceWith("EmittedField")
53+
)
54+
typealias Field = EmittedField
4355
/**
4456
* Internal equivalent of [io.airbyte.protocol.models.Field] for values which come from the source
4557
* itself, instead of being generated by the connector during its operation.
4658
*/
47-
data class Field(
59+
data class EmittedField(
60+
override val id: String,
61+
override val type: FieldType,
62+
) : DataField
63+
64+
data class NonEmittedField(
4865
override val id: String,
4966
override val type: FieldType,
50-
) : FieldOrMetaField
67+
) : DataField
5168

5269
/**
5370
* Internal equivalent of [io.airbyte.protocol.models.Field] for values which are generated by the
5471
* connector itself during its operation, instead of coming from the source.
5572
*/
56-
interface MetaField : FieldOrMetaField {
73+
interface MetaField : DataOrMetaField {
5774
companion object {
5875
const val META_PREFIX = "_ab_"
5976

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetaFieldDecorator.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import java.time.OffsetDateTime
1515
interface MetaFieldDecorator {
1616

1717
/** [MetaField] to use as a global cursor, if applicable. */
18-
val globalCursor: FieldOrMetaField?
18+
val globalCursor: DataOrMetaField?
1919

2020
/**
2121
* All [MetaField]s to be found in [Global] stream records.

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/MetadataQuerier.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ interface MetadataQuerier : AutoCloseable {
1414
fun streamNames(streamNamespace: String?): List<StreamIdentifier>
1515

1616
/** Returns all available fields in the given stream. */
17-
fun fields(streamID: StreamIdentifier): List<Field>
17+
fun fields(streamID: StreamIdentifier): List<EmittedField>
1818

1919
/** Returns the primary key for the given stream, if it exists; empty list otherwise. */
2020
fun primaryKey(streamID: StreamIdentifier): List<List<String>>

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output/NativeRecord.kt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ import io.airbyte.cdk.data.ArrayEncoder
1010
import io.airbyte.cdk.data.BigDecimalIntegerCodec
1111
import io.airbyte.cdk.data.ByteCodec
1212
import io.airbyte.cdk.data.CdcOffsetDateTimeCodec
13+
import io.airbyte.cdk.data.JsonCodec
1314
import io.airbyte.cdk.data.JsonEncoder
1415
import io.airbyte.cdk.data.NullCodec
1516
import io.airbyte.cdk.data.OffsetDateTimeCodec
1617
import io.airbyte.cdk.data.UrlCodec
17-
import io.airbyte.cdk.discover.FieldOrMetaField
18+
import io.airbyte.cdk.discover.DataOrMetaField
1819
import io.airbyte.cdk.protocol.AirbyteValueProtobufEncoder
1920
import io.airbyte.cdk.util.Jsons
2021
import io.airbyte.protocol.protobuf.AirbyteRecordMessage
@@ -43,6 +44,10 @@ fun NativeRecordPayload.toJson(parentNode: ObjectNode = Jsons.objectNode()): Obj
4344
return parentNode
4445
}
4546

47+
interface ProtobufAwareCustomConnectorJsonCodec<T> : JsonCodec<T> {
48+
fun valueForProtobufEncoding(v: T): Any?
49+
}
50+
4651
/**
4752
* Transforms a field value into a protobuf-compatible representation. Handles special conversions
4853
* for types that need preprocessing before protobuf encoding, such as ByteBuffer -> Base64 String,
@@ -57,13 +62,17 @@ fun <R> valueForProtobufEncoding(fve: FieldValueEncoder<R>): Any? {
5762
is CdcOffsetDateTimeCodec ->
5863
(value as OffsetDateTime).format(OffsetDateTimeCodec.formatter)
5964
is ArrayEncoder<*> -> fve.encode().toString()
65+
is ProtobufAwareCustomConnectorJsonCodec<*> ->
66+
(fve.jsonEncoder as ProtobufAwareCustomConnectorJsonCodec).valueForProtobufEncoding(
67+
value
68+
)
6069
else -> value
6170
}
6271
}
6372
}
6473

6574
fun NativeRecordPayload.toProtobuf(
66-
schema: Set<FieldOrMetaField>,
75+
schema: Set<DataOrMetaField>,
6776
recordMessageBuilder: AirbyteRecordMessageProtobuf.Builder,
6877
valueBuilder: AirbyteRecordMessage.AirbyteValueProtobuf.Builder
6978
): AirbyteRecordMessageProtobuf.Builder {

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/output/OutputMessageRouter.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ package io.airbyte.cdk.output
66

77
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
88
import io.airbyte.cdk.StreamIdentifier
9-
import io.airbyte.cdk.discover.Field
9+
import io.airbyte.cdk.discover.EmittedField
1010
import io.airbyte.cdk.output.sockets.NativeRecordPayload
1111
import io.airbyte.cdk.output.sockets.SocketJsonOutputConsumer
1212
import io.airbyte.cdk.output.sockets.SocketProtobufOutputConsumer
@@ -40,7 +40,7 @@ class OutputMessageRouter(
4040
Map<StreamIdentifier, FeedBootstrap<*>.ProtoEfficientStreamRecordConsumer>
4141
private lateinit var simpleEfficientStreamConsumers: Map<StreamIdentifier, StreamRecordConsumer>
4242
var recordAcceptors:
43-
Map<StreamIdentifier, (NativeRecordPayload, Map<Field, FieldValueChange>?) -> Unit>
43+
Map<StreamIdentifier, (NativeRecordPayload, Map<EmittedField, FieldValueChange>?) -> Unit>
4444

4545
init {
4646
when (recordsDataChannelMedium) {
@@ -64,7 +64,7 @@ class OutputMessageRouter(
6464
it.key to
6565
{
6666
record: NativeRecordPayload,
67-
changes: Map<Field, FieldValueChange>? ->
67+
changes: Map<EmittedField, FieldValueChange>? ->
6868
it.value.accept(record, changes)
6969
}
7070
}
@@ -90,7 +90,7 @@ class OutputMessageRouter(
9090
it.key to
9191
{
9292
record: NativeRecordPayload,
93-
changes: Map<Field, FieldValueChange>? ->
93+
changes: Map<EmittedField, FieldValueChange>? ->
9494
it.value.accept(record, changes)
9595
}
9696
}
@@ -106,7 +106,7 @@ class OutputMessageRouter(
106106
it.key to
107107
{
108108
record: NativeRecordPayload,
109-
changes: Map<Field, FieldValueChange>? ->
109+
changes: Map<EmittedField, FieldValueChange>? ->
110110
it.value.accept(record, changes)
111111
}
112112
}

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/Feed.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
package io.airbyte.cdk.read
33

44
import io.airbyte.cdk.StreamIdentifier
5-
import io.airbyte.cdk.discover.Field
6-
import io.airbyte.cdk.discover.FieldOrMetaField
5+
import io.airbyte.cdk.discover.DataOrMetaField
6+
import io.airbyte.cdk.discover.EmittedField
77

88
/**
99
* [Feed] identifies part of the data consumed during a READ operation.
@@ -30,10 +30,10 @@ data class Global(
3030
*/
3131
data class Stream(
3232
val id: StreamIdentifier,
33-
val schema: Set<FieldOrMetaField>,
33+
val schema: Set<DataOrMetaField>,
3434
val configuredSyncMode: ConfiguredSyncMode,
35-
val configuredPrimaryKey: List<Field>?,
36-
val configuredCursor: FieldOrMetaField?,
35+
val configuredPrimaryKey: List<EmittedField>?,
36+
val configuredCursor: DataOrMetaField?,
3737
) : Feed {
3838
val name: String
3939
get() = id.name
@@ -44,8 +44,8 @@ data class Stream(
4444
override val label: String
4545
get() = id.toString()
4646

47-
val fields: List<Field>
48-
get() = schema.filterIsInstance<Field>()
47+
val fields: List<EmittedField>
48+
get() = schema.filterIsInstance<EmittedField>()
4949
}
5050

5151
/** List of [Stream]s this [Feed] emits records for. */

0 commit comments

Comments
 (0)