Skip to content

Commit 8608fd2

Browse files
fix(source-mssql): Sample using state (#74729)
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
1 parent abce445 commit 8608fd2

16 files changed

Lines changed: 138 additions & 123 deletions

airbyte-integrations/connectors/source-mssql/build.gradle

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,10 @@ airbyteBulkConnector {
1515

1616
dependencies {
1717
implementation 'com.microsoft.sqlserver:mssql-jdbc:12.10.1.jre11'
18-
implementation 'com.azure:azure-identity:1.15.3'
1918
implementation 'io.debezium:debezium-embedded:3.3.0.Final'
2019
implementation 'io.debezium:debezium-connector-sqlserver:3.3.0.Final'
21-
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'
22-
api 'org.apache.commons:commons-lang3:3.18.0'
2320
implementation 'org.apache.commons:commons-lang3:3.18.0'
2421

25-
testImplementation 'org.awaitility:awaitility:4.2.0'
26-
testImplementation 'org.hamcrest:hamcrest-all:1.3'
2722
testFixturesImplementation 'org.testcontainers:mssqlserver:1.19.0'
28-
testImplementation 'org.testcontainers:mssqlserver:1.19.0'
29-
testImplementation 'com.zaxxer:HikariCP:5.1.0'
3023
testImplementation("io.mockk:mockk:1.12.0")
31-
api 'com.google.guava:guava:33.4.0-jre'
3224
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
testExecutionConcurrency=1
22
JunitMethodExecutionTimeout=5m
3-
cdkVersion=0.2.4
3+
cdkVersion=1.1.3

airbyte-integrations/connectors/source-mssql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.3.5
12+
dockerImageTag: 4.3.6
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/main/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerCdcInitialSnapshotStateValue.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ package io.airbyte.integrations.source.mssql
77
import com.fasterxml.jackson.annotation.JsonProperty
88
import com.fasterxml.jackson.databind.JsonNode
99
import io.airbyte.cdk.command.OpaqueStateValue
10-
import io.airbyte.cdk.discover.Field
10+
import io.airbyte.cdk.discover.EmittedField
1111
import io.airbyte.cdk.read.Stream
1212
import io.airbyte.cdk.util.Jsons
1313

@@ -34,7 +34,7 @@ data class MsSqlServerCdcInitialSnapshotStateValue(
3434

3535
/** Value representing the progress of an ongoing snapshot. */
3636
fun snapshotCheckpoint(
37-
primaryKey: List<Field>,
37+
primaryKey: List<EmittedField>,
3838
primaryKeyCheckpoint: List<JsonNode>,
3939
): OpaqueStateValue {
4040
val primaryKeyField = primaryKey.first()

airbyte-integrations/connectors/source-mssql/src/main/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerCursorCutoffTimeProvider.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ package io.airbyte.integrations.source.mssql
66

77
import com.fasterxml.jackson.databind.JsonNode
88
import io.airbyte.cdk.data.LeafAirbyteSchemaType
9-
import io.airbyte.cdk.discover.Field
9+
import io.airbyte.cdk.discover.EmittedField
1010
import io.airbyte.cdk.util.Jsons
1111
import io.github.oshai.kotlinlogging.KotlinLogging
1212
import java.time.Instant
@@ -37,7 +37,7 @@ object MsSqlServerCursorCutoffTimeProvider {
3737
* @param nowInstant The current instant (for testing)
3838
* @return The cutoff time as JsonNode, or null if not applicable
3939
*/
40-
fun getCutoffTime(cursorField: Field, nowInstant: Instant = Instant.now()): JsonNode? {
40+
fun getCutoffTime(cursorField: EmittedField, nowInstant: Instant = Instant.now()): JsonNode? {
4141

4242
return when (cursorField.type.airbyteSchemaType) {
4343
is LeafAirbyteSchemaType -> {
@@ -102,7 +102,7 @@ object MsSqlServerCursorCutoffTimeProvider {
102102
}
103103

104104
/** Checks if a cursor field type supports the "Exclude Today's Data" feature. */
105-
fun isTemporalType(cursorField: Field): Boolean {
105+
fun isTemporalType(cursorField: EmittedField): Boolean {
106106
val schemaType = cursorField.type.airbyteSchemaType
107107
return schemaType is LeafAirbyteSchemaType &&
108108
schemaType in

airbyte-integrations/connectors/source-mssql/src/main/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerDebeziumOperations.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import io.airbyte.cdk.read.cdc.DebeziumSchemaHistory
3333
import io.airbyte.cdk.read.cdc.DebeziumWarmStartState
3434
import io.airbyte.cdk.read.cdc.DeserializedRecord
3535
import io.airbyte.cdk.read.cdc.InvalidDebeziumWarmStartState
36+
import io.airbyte.cdk.read.cdc.PartiallyOrdered
3637
import io.airbyte.cdk.read.cdc.ResetDebeziumWarmStartState
3738
import io.airbyte.cdk.read.cdc.ValidDebeziumWarmStartState
3839
import io.airbyte.cdk.ssh.TunnelSession
@@ -57,7 +58,7 @@ import kotlin.collections.plus
5758
import org.apache.kafka.connect.source.SourceRecord
5859
import org.apache.mina.util.Base64
5960

60-
data class MsSqlServerCdcPosition(val lsn: String) : Comparable<MsSqlServerCdcPosition> {
61+
data class MsSqlServerCdcPosition(val lsn: String) : PartiallyOrdered<MsSqlServerCdcPosition> {
6162
override fun compareTo(other: MsSqlServerCdcPosition): Int {
6263
return lsn.compareTo(other.lsn)
6364
}

airbyte-integrations/connectors/source-mssql/src/main/kotlin/io/airbyte/integrations/source/mssql/MsSqlServerJdbcPartition.kt

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ package io.airbyte.integrations.source.mssql
66

77
import com.fasterxml.jackson.databind.JsonNode
88
import com.fasterxml.jackson.databind.node.BinaryNode
9-
import com.fasterxml.jackson.databind.node.ObjectNode
109
import io.airbyte.cdk.command.OpaqueStateValue
1110
import io.airbyte.cdk.data.LeafAirbyteSchemaType
1211
import io.airbyte.cdk.data.OffsetDateTimeCodec
13-
import io.airbyte.cdk.discover.Field
12+
import io.airbyte.cdk.discover.EmittedField
13+
import io.airbyte.cdk.output.sockets.toJson
1414
import io.airbyte.cdk.read.And
1515
import io.airbyte.cdk.read.DefaultJdbcStreamState
1616
import io.airbyte.cdk.read.Equal
@@ -29,6 +29,7 @@ import io.airbyte.cdk.read.Or
2929
import io.airbyte.cdk.read.OrderBy
3030
import io.airbyte.cdk.read.SelectColumnMaxValue
3131
import io.airbyte.cdk.read.SelectColumns
32+
import io.airbyte.cdk.read.SelectQuerier
3233
import io.airbyte.cdk.read.SelectQuery
3334
import io.airbyte.cdk.read.SelectQueryGenerator
3435
import io.airbyte.cdk.read.SelectQuerySpec
@@ -74,7 +75,7 @@ private fun getEffectiveCursorCheckpoint(
7475
* Converts a state value string to a JsonNode based on the field type. This function handles type
7576
* conversions and date formatting for state checkpoints.
7677
*/
77-
fun stateValueToJsonNode(field: Field, stateValue: String?): JsonNode {
78+
fun stateValueToJsonNode(field: EmittedField, stateValue: String?): JsonNode {
7879
when (field.type.airbyteSchemaType) {
7980
is LeafAirbyteSchemaType ->
8081
return when (field.type.airbyteSchemaType as LeafAirbyteSchemaType) {
@@ -185,7 +186,7 @@ class MsSqlServerJdbcNonResumableSnapshotPartition(
185186
class MsSqlServerJdbcNonResumableSnapshotWithCursorPartition(
186187
selectQueryGenerator: SelectQueryGenerator,
187188
streamState: DefaultJdbcStreamState,
188-
val cursor: Field,
189+
val cursor: EmittedField,
189190
val cursorCutoffTime: JsonNode? = null,
190191
) :
191192
MsSqlServerJdbcPartition(selectQueryGenerator, streamState),
@@ -232,7 +233,7 @@ class MsSqlServerJdbcNonResumableSnapshotWithCursorPartition(
232233
sealed class MsSqlServerJdbcResumablePartition(
233234
selectQueryGenerator: SelectQueryGenerator,
234235
streamState: DefaultJdbcStreamState,
235-
val checkpointColumns: List<Field>,
236+
val checkpointColumns: List<EmittedField>,
236237
) :
237238
MsSqlServerJdbcPartition(selectQueryGenerator, streamState),
238239
JdbcSplittablePartition<DefaultJdbcStreamState> {
@@ -262,7 +263,7 @@ sealed class MsSqlServerJdbcResumablePartition(
262263
val querySpec =
263264
SelectQuerySpec(
264265
SelectColumns(stream.fields + checkpointColumns),
265-
FromSample(stream.name, stream.namespace, sampleRateInvPow2, sampleSize),
266+
FromSample(stream.name, stream.namespace, sampleRateInvPow2, sampleSize, where),
266267
NoWhere,
267268
OrderBy(checkpointColumns),
268269
Limit(sampleSize.toLong())
@@ -272,34 +273,35 @@ sealed class MsSqlServerJdbcResumablePartition(
272273

273274
val where: Where
274275
get() {
275-
val zippedLowerBound: List<Pair<Field, JsonNode>> =
276+
val zippedLowerBound: List<Pair<EmittedField, JsonNode>> =
276277
lowerBound?.let { checkpointColumns.zip(it) } ?: listOf()
277278
val lowerBoundDisj: List<WhereClauseNode> =
278-
zippedLowerBound.mapIndexed { idx: Int, (gtCol: Field, gtValue: JsonNode) ->
279+
zippedLowerBound.mapIndexed { idx: Int, (gtCol: EmittedField, gtValue: JsonNode) ->
279280
val lastLeaf: WhereClauseLeafNode =
280281
if (isLowerBoundIncluded && idx == checkpointColumns.size - 1) {
281282
GreaterOrEqual(gtCol, gtValue)
282283
} else {
283284
Greater(gtCol, gtValue)
284285
}
285286
And(
286-
zippedLowerBound.take(idx).map { (eqCol: Field, eqValue: JsonNode) ->
287+
zippedLowerBound.take(idx).map { (eqCol: EmittedField, eqValue: JsonNode) ->
287288
Equal(eqCol, eqValue)
288289
} + listOf(lastLeaf),
289290
)
290291
}
291-
val zippedUpperBound: List<Pair<Field, JsonNode>> =
292+
val zippedUpperBound: List<Pair<EmittedField, JsonNode>> =
292293
upperBound?.let { checkpointColumns.zip(it) } ?: listOf()
293294
val upperBoundDisj: List<WhereClauseNode> =
294-
zippedUpperBound.mapIndexed { idx: Int, (leqCol: Field, leqValue: JsonNode) ->
295+
zippedUpperBound.mapIndexed { idx: Int, (leqCol: EmittedField, leqValue: JsonNode)
296+
->
295297
val lastLeaf: WhereClauseLeafNode =
296298
if (idx < zippedUpperBound.size - 1) {
297299
Lesser(leqCol, leqValue)
298300
} else {
299301
LesserOrEqual(leqCol, leqValue)
300302
}
301303
And(
302-
zippedUpperBound.take(idx).map { (eqCol: Field, eqValue: JsonNode) ->
304+
zippedUpperBound.take(idx).map { (eqCol: EmittedField, eqValue: JsonNode) ->
303305
Equal(eqCol, eqValue)
304306
} + listOf(lastLeaf),
305307
)
@@ -323,7 +325,7 @@ sealed class MsSqlServerJdbcResumablePartition(
323325
class MsSqlServerJdbcRfrSnapshotPartition(
324326
selectQueryGenerator: SelectQueryGenerator,
325327
streamState: DefaultJdbcStreamState,
326-
primaryKey: List<Field>,
328+
primaryKey: List<EmittedField>,
327329
override val lowerBound: List<JsonNode>?,
328330
override val upperBound: List<JsonNode>?,
329331
) : MsSqlServerJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) {
@@ -341,18 +343,19 @@ class MsSqlServerJdbcRfrSnapshotPartition(
341343
)
342344
}
343345

344-
override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue =
346+
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
345347
MsSqlServerJdbcStreamStateValue.snapshotCheckpoint(
346348
primaryKey = checkpointColumns,
347-
primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() },
349+
primaryKeyCheckpoint =
350+
checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
348351
)
349352
}
350353

351354
/** RFR for CDC. */
352355
class MsSqlServerJdbcCdcRfrSnapshotPartition(
353356
selectQueryGenerator: SelectQueryGenerator,
354357
streamState: DefaultJdbcStreamState,
355-
primaryKey: List<Field>,
358+
primaryKey: List<EmittedField>,
356359
override val lowerBound: List<JsonNode>?,
357360
override val upperBound: List<JsonNode>?,
358361
) : MsSqlServerJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) {
@@ -367,10 +370,11 @@ class MsSqlServerJdbcCdcRfrSnapshotPartition(
367370
)
368371
}
369372

370-
override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue =
373+
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
371374
MsSqlServerCdcInitialSnapshotStateValue.snapshotCheckpoint(
372375
primaryKey = checkpointColumns,
373-
primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() },
376+
primaryKeyCheckpoint =
377+
checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
374378
)
375379
}
376380

@@ -381,25 +385,26 @@ class MsSqlServerJdbcCdcRfrSnapshotPartition(
381385
class MsSqlServerJdbcCdcSnapshotPartition(
382386
selectQueryGenerator: SelectQueryGenerator,
383387
streamState: DefaultJdbcStreamState,
384-
primaryKey: List<Field>,
388+
primaryKey: List<EmittedField>,
385389
override val lowerBound: List<JsonNode>?,
386390
) : MsSqlServerJdbcResumablePartition(selectQueryGenerator, streamState, primaryKey) {
387391
override val upperBound: List<JsonNode>? = null
388392
override val completeState: OpaqueStateValue
389393
get() = MsSqlServerCdcInitialSnapshotStateValue.getSnapshotCompletedState(stream)
390394

391-
override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue =
395+
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
392396
MsSqlServerCdcInitialSnapshotStateValue.snapshotCheckpoint(
393397
primaryKey = checkpointColumns,
394-
primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() },
398+
primaryKeyCheckpoint =
399+
checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
395400
)
396401
}
397402

398403
sealed class MsSqlServerJdbcCursorPartition(
399404
selectQueryGenerator: SelectQueryGenerator,
400405
streamState: DefaultJdbcStreamState,
401-
checkpointColumns: List<Field>,
402-
val cursor: Field,
406+
checkpointColumns: List<EmittedField>,
407+
val cursor: EmittedField,
403408
private val explicitCursorUpperBound: JsonNode?,
404409
val cursorCutoffTime: JsonNode? = null,
405410
) :
@@ -433,7 +438,7 @@ sealed class MsSqlServerJdbcCursorPartition(
433438
val querySpec =
434439
SelectQuerySpec(
435440
SelectColumns(stream.fields + checkpointColumns),
436-
from,
441+
FromSample(stream.name, stream.namespace, sampleRateInvPow2, sampleSize, where),
437442
NoWhere,
438443
OrderBy(checkpointColumns),
439444
Limit(sampleSize.toLong())
@@ -454,9 +459,9 @@ sealed class MsSqlServerJdbcCursorPartition(
454459
class MsSqlServerJdbcSnapshotWithCursorPartition(
455460
selectQueryGenerator: SelectQueryGenerator,
456461
streamState: DefaultJdbcStreamState,
457-
primaryKey: List<Field>,
462+
primaryKey: List<EmittedField>,
458463
override val lowerBound: List<JsonNode>?,
459-
cursor: Field,
464+
cursor: EmittedField,
460465
cursorUpperBound: JsonNode?,
461466
cursorCutoffTime: JsonNode? = null,
462467
) :
@@ -478,21 +483,22 @@ class MsSqlServerJdbcSnapshotWithCursorPartition(
478483
getEffectiveCursorCheckpoint(cursorCutoffTime, cursorUpperBound, Jsons.nullNode()),
479484
)
480485

481-
override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue =
486+
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
482487
MsSqlServerJdbcStreamStateValue.snapshotWithCursorCheckpoint(
483488
primaryKey = checkpointColumns,
484-
primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() },
489+
primaryKeyCheckpoint =
490+
checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
485491
cursor,
486492
)
487493
}
488494

489495
class MsSqlServerJdbcSplittableSnapshotWithCursorPartition(
490496
selectQueryGenerator: SelectQueryGenerator,
491497
streamState: DefaultJdbcStreamState,
492-
primaryKey: List<Field>,
498+
primaryKey: List<EmittedField>,
493499
override val lowerBound: List<JsonNode>?,
494500
override val upperBound: List<JsonNode>?,
495-
cursor: Field,
501+
cursor: EmittedField,
496502
cursorUpperBound: JsonNode?,
497503
cursorCutoffTime: JsonNode? = null,
498504
) :
@@ -524,10 +530,11 @@ class MsSqlServerJdbcSplittableSnapshotWithCursorPartition(
524530
)
525531
}
526532

527-
override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue =
533+
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
528534
MsSqlServerJdbcStreamStateValue.snapshotWithCursorCheckpoint(
529535
primaryKey = checkpointColumns,
530-
primaryKeyCheckpoint = checkpointColumns.map { lastRecord[it.id] ?: Jsons.nullNode() },
536+
primaryKeyCheckpoint =
537+
checkpointColumns.map { lastRecord.data.toJson()[it.id] ?: Jsons.nullNode() },
531538
cursor,
532539
)
533540
}
@@ -539,7 +546,7 @@ class MsSqlServerJdbcSplittableSnapshotWithCursorPartition(
539546
class MsSqlServerJdbcCursorIncrementalPartition(
540547
selectQueryGenerator: SelectQueryGenerator,
541548
streamState: DefaultJdbcStreamState,
542-
cursor: Field,
549+
cursor: EmittedField,
543550
val cursorLowerBound: JsonNode,
544551
override val isLowerBoundIncluded: Boolean,
545552
cursorUpperBound: JsonNode?,
@@ -564,10 +571,10 @@ class MsSqlServerJdbcCursorIncrementalPartition(
564571
getEffectiveCursorCheckpoint(cursorCutoffTime, cursorUpperBound, cursorLowerBound),
565572
)
566573

567-
override fun incompleteState(lastRecord: ObjectNode): OpaqueStateValue =
574+
override fun incompleteState(lastRecord: SelectQuerier.ResultRow): OpaqueStateValue =
568575
MsSqlServerJdbcStreamStateValue.cursorIncrementalCheckpoint(
569576
cursor,
570-
cursorCheckpoint = lastRecord[cursor.id] ?: Jsons.nullNode(),
577+
cursorCheckpoint = lastRecord.data.toJson()[cursor.id] ?: Jsons.nullNode(),
571578
)
572579
}
573580

0 commit comments

Comments
 (0)