Skip to content

Commit aa72f25

Browse files
fix(bulk-cdk): drop temp table after successful upsert to prevent duplicate records (#74715)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent a8c22d5 commit aa72f25

4 files changed

Lines changed: 240 additions & 1 deletion

File tree

airbyte-cdk/bulk/core/load/changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The Load CDK provides functionality for destination connectors including stream-
99

1010
| Version | Date | Pull Request | Subject |
1111
|---------|------------|--------------|-------------------------------------------------------------------------------------------------|
12+
| 1.0.6 | 2026-03-12 | [#74715](https://github.com/airbytehq/airbyte/pull/74715) | Fix: drop temp table after successful upsert to prevent duplicate records across syncs. |
1213
| 1.0.5 | 2026-03-10 | [#74723](https://github.com/airbytehq/airbyte/pull/74723) | Fix schema evolution: defer identifier field update when replacing columns to avoid Iceberg conflict. |
1314
| 1.0.4 | 2026-03-05 | [#74328](https://github.com/airbytehq/airbyte/pull/74328) | Fix iceberg dedup: map PK NumberType to StringType instead of DecimalType for identifier field compatibility. |
1415
| 1.0.3 | 2026-03-05 | [#74272](https://github.com/airbytehq/airbyte/pull/74272) | Fix iceberg dedup. |

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/table/directload/DirectLoadTableStreamLoader.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,8 @@ class DirectLoadTableAppendTruncateStreamLoader(
238238
logger.info {
239239
"Overwriting ${tempTableName.toPrettyString()} with ${realTableName.toPrettyString()} for stream ${stream.mappedDescriptor.toPrettyString()}"
240240
}
241+
// overwriteTable consumes the source table (drops/renames it),
242+
// so temp table is already gone after this call.
241243
tableOperationsClient.overwriteTable(
242244
sourceTableName = tempTableName,
243245
targetTableName = realTableName,
@@ -408,5 +410,12 @@ class DirectLoadTableDedupTruncateStreamLoader(
408410
sourceTableName = tempTempTable,
409411
targetTableName = realTableName,
410412
)
413+
414+
// Clean up the original temp table to prevent duplicate records on the next sync.
415+
// Note: overwriteTable above consumed tempTempTable (not tempTableName), so
416+
// tempTableName still exists with all its data. Without this drop, the next
417+
// sync's start() would find a non-empty temp table with matching generation ID
418+
// and reuse it, causing old records to accumulate alongside new ones.
419+
tableOperationsClient.dropTable(tempTableName)
411420
}
412421
}
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright (c) 2026 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.table.directload
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import io.airbyte.cdk.load.component.TableOperationsClient
9+
import io.airbyte.cdk.load.component.TableSchemaEvolutionClient
10+
import io.airbyte.cdk.load.schema.model.TableName
11+
import io.airbyte.cdk.load.table.ColumnNameMapping
12+
import io.airbyte.cdk.load.table.TempTableNameGenerator
13+
import io.airbyte.cdk.load.write.StreamStateStore
14+
import io.mockk.coEvery
15+
import io.mockk.coVerify
16+
import io.mockk.every
17+
import io.mockk.mockk
18+
import kotlinx.coroutines.test.runTest
19+
import org.junit.jupiter.api.Test
20+
21+
class DirectLoadTableStreamLoaderTest {
22+
23+
private val stream =
24+
mockk<DestinationStream>(relaxed = true) {
25+
every { mappedDescriptor } returns
26+
DestinationStream.Descriptor("test_namespace", "test_stream")
27+
every { minimumGenerationId } returns 1L
28+
}
29+
private val realTableName = TableName("real_namespace", "real_table")
30+
private val tempTableName = TableName("temp_namespace", "temp_table")
31+
private val tempTempTableName = TableName("temp_namespace", "temp_temp_table")
32+
private val columnNameMapping = ColumnNameMapping(emptyMap())
33+
private val schemaEvolutionClient = mockk<TableSchemaEvolutionClient>(relaxed = true)
34+
private val tableOperationsClient = mockk<TableOperationsClient>(relaxed = true)
35+
private val streamStateStore = StreamStateStore<DirectLoadTableExecutionConfig>()
36+
private val tempTableNameGenerator =
37+
mockk<TempTableNameGenerator> {
38+
every { generate(tempTableName) } returns tempTempTableName
39+
}
40+
41+
@Test
42+
fun `AppendTruncateStreamLoader teardown overwrites real table with temp on success`() =
43+
runTest {
44+
val initialStatus =
45+
DirectLoadInitialStatus(
46+
realTable = DirectLoadTableStatus(isEmpty = true),
47+
tempTable = DirectLoadTableStatus(isEmpty = true),
48+
)
49+
50+
val loader =
51+
DirectLoadTableAppendTruncateStreamLoader(
52+
stream = stream,
53+
initialStatus = initialStatus,
54+
realTableName = realTableName,
55+
tempTableName = tempTableName,
56+
columnNameMapping = columnNameMapping,
57+
schemaEvolutionClient = schemaEvolutionClient,
58+
tableOperationsClient = tableOperationsClient,
59+
streamStateStore = streamStateStore,
60+
)
61+
62+
loader.start()
63+
loader.teardown(completedSuccessfully = true)
64+
65+
coVerify(exactly = 1) {
66+
tableOperationsClient.overwriteTable(
67+
sourceTableName = tempTableName,
68+
targetTableName = realTableName,
69+
)
70+
}
71+
// No explicit dropTable needed: overwriteTable consumes the source table
72+
// (drops/renames it), so temp is already gone.
73+
coVerify(exactly = 0) { tableOperationsClient.dropTable(any()) }
74+
}
75+
76+
@Test
77+
fun `AppendTruncateStreamLoader teardown does not drop temp table on failure`() = runTest {
78+
val initialStatus =
79+
DirectLoadInitialStatus(
80+
realTable = DirectLoadTableStatus(isEmpty = true),
81+
tempTable = DirectLoadTableStatus(isEmpty = true),
82+
)
83+
84+
val loader =
85+
DirectLoadTableAppendTruncateStreamLoader(
86+
stream = stream,
87+
initialStatus = initialStatus,
88+
realTableName = realTableName,
89+
tempTableName = tempTableName,
90+
columnNameMapping = columnNameMapping,
91+
schemaEvolutionClient = schemaEvolutionClient,
92+
tableOperationsClient = tableOperationsClient,
93+
streamStateStore = streamStateStore,
94+
)
95+
96+
loader.start()
97+
loader.teardown(completedSuccessfully = false)
98+
99+
coVerify(exactly = 0) { tableOperationsClient.overwriteTable(any(), any()) }
100+
coVerify(exactly = 0) { tableOperationsClient.dropTable(any()) }
101+
}
102+
103+
@Test
104+
fun `DedupTruncateStreamLoader performUpsertWithTemporaryTable drops temp table after overwrite`() =
105+
runTest {
106+
// When temp table already exists with matching generation ID,
107+
// shouldCheckRealTableGeneration=false, so performUpsertWithTemporaryTable is called.
108+
val initialStatus =
109+
DirectLoadInitialStatus(
110+
realTable = DirectLoadTableStatus(isEmpty = false),
111+
tempTable = DirectLoadTableStatus(isEmpty = false),
112+
)
113+
114+
coEvery { tableOperationsClient.getGenerationId(tempTableName) } returns 1L
115+
116+
val loader =
117+
DirectLoadTableDedupTruncateStreamLoader(
118+
stream = stream,
119+
initialStatus = initialStatus,
120+
realTableName = realTableName,
121+
tempTableName = tempTableName,
122+
columnNameMapping = columnNameMapping,
123+
schemaEvolutionClient = schemaEvolutionClient,
124+
tableOperationsClient = tableOperationsClient,
125+
streamStateStore = streamStateStore,
126+
tempTableNameGenerator = tempTableNameGenerator,
127+
)
128+
129+
loader.start()
130+
loader.teardown(completedSuccessfully = true)
131+
132+
// Verify the performUpsertWithTemporaryTable path was taken
133+
coVerify(exactly = 1) {
134+
tableOperationsClient.createTable(
135+
stream,
136+
tempTempTableName,
137+
columnNameMapping,
138+
replace = true
139+
)
140+
}
141+
coVerify(exactly = 1) {
142+
tableOperationsClient.upsertTable(
143+
stream,
144+
columnNameMapping,
145+
sourceTableName = tempTableName,
146+
targetTableName = tempTempTableName,
147+
)
148+
}
149+
coVerify(exactly = 1) {
150+
tableOperationsClient.overwriteTable(
151+
sourceTableName = tempTempTableName,
152+
targetTableName = realTableName,
153+
)
154+
}
155+
// The key assertion: temp table is dropped after the overwrite
156+
coVerify(exactly = 1) { tableOperationsClient.dropTable(tempTableName) }
157+
}
158+
159+
@Test
160+
fun `DedupTruncateStreamLoader performDirectUpsert also drops temp table`() = runTest {
161+
// When no temp table exists initially, shouldCheckRealTableGeneration=true.
162+
// When real table doesn't exist, shouldUpsertDirectly=true.
163+
// This triggers the performDirectUpsert path.
164+
val initialStatus =
165+
DirectLoadInitialStatus(
166+
realTable = null,
167+
tempTable = null,
168+
)
169+
170+
val loader =
171+
DirectLoadTableDedupTruncateStreamLoader(
172+
stream = stream,
173+
initialStatus = initialStatus,
174+
realTableName = realTableName,
175+
tempTableName = tempTableName,
176+
columnNameMapping = columnNameMapping,
177+
schemaEvolutionClient = schemaEvolutionClient,
178+
tableOperationsClient = tableOperationsClient,
179+
streamStateStore = streamStateStore,
180+
tempTableNameGenerator = tempTableNameGenerator,
181+
)
182+
183+
loader.start()
184+
loader.teardown(completedSuccessfully = true)
185+
186+
// Verify the performDirectUpsert path was taken (creates real table, upserts, drops temp)
187+
coVerify(exactly = 1) {
188+
tableOperationsClient.upsertTable(
189+
stream,
190+
columnNameMapping,
191+
sourceTableName = tempTableName,
192+
targetTableName = realTableName,
193+
)
194+
}
195+
coVerify(exactly = 1) { tableOperationsClient.dropTable(tempTableName) }
196+
}
197+
198+
@Test
199+
fun `DedupTruncateStreamLoader teardown does not drop temp table on failure`() = runTest {
200+
val initialStatus =
201+
DirectLoadInitialStatus(
202+
realTable = DirectLoadTableStatus(isEmpty = false),
203+
tempTable = DirectLoadTableStatus(isEmpty = false),
204+
)
205+
206+
coEvery { tableOperationsClient.getGenerationId(tempTableName) } returns 1L
207+
208+
val loader =
209+
DirectLoadTableDedupTruncateStreamLoader(
210+
stream = stream,
211+
initialStatus = initialStatus,
212+
realTableName = realTableName,
213+
tempTableName = tempTableName,
214+
columnNameMapping = columnNameMapping,
215+
schemaEvolutionClient = schemaEvolutionClient,
216+
tableOperationsClient = tableOperationsClient,
217+
streamStateStore = streamStateStore,
218+
tempTableNameGenerator = tempTableNameGenerator,
219+
)
220+
221+
loader.start()
222+
loader.teardown(completedSuccessfully = false)
223+
224+
// On failure, nothing should happen - temp table preserved for retry
225+
coVerify(exactly = 0) { tableOperationsClient.upsertTable(any(), any(), any(), any()) }
226+
coVerify(exactly = 0) { tableOperationsClient.overwriteTable(any(), any()) }
227+
coVerify(exactly = 0) { tableOperationsClient.dropTable(any()) }
228+
}
229+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=1.0.5
1+
version=1.0.6

0 commit comments

Comments
 (0)