Skip to content

Commit 0834beb

Browse files
devin-ai-integration[bot]bot_apkpedroslopez
authored
fix: implement OAuth token expiry tracking in Bulk CDK OAuthAuthenticator [Hubspot Destination] (#74728)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: bot_apk <apk@cognition.ai> Co-authored-by: Pedro Lopez <pedroslopez@me.com>
1 parent 2a24fc5 commit 0834beb

File tree

4 files changed

+111
-5
lines changed

4 files changed

+111
-5
lines changed

airbyte-cdk/bulk/core/load/changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The Load CDK provides functionality for destination connectors including stream-
99

1010
| Version | Date | Pull Request | Subject |
1111
|---------|------------|--------------|-------------------------------------------------------------------------------------------------|
12+
| 1.0.8 | 2026-04-14 | [#74728](https://github.com/airbytehq/airbyte/pull/74728) | Fix OAuthAuthenticator to track token expiry via `expires_in` and refresh expired tokens. |
1213
| 1.0.7 | 2026-03-27 | | Fix: update Iceberg sort order before schema evolution to prevent ValidationException when deleting columns referenced by the sort order. Handles Dedupe-to-Append mode switches and PK changes. |
1314
| 1.0.6 | 2026-03-12 | [#74715](https://github.com/airbytehq/airbyte/pull/74715) | Fix: drop temp table after successful upsert to prevent duplicate records across syncs. |
1415
| 1.0.5 | 2026-03-10 | [#74723](https://github.com/airbytehq/airbyte/pull/74723) | Fix schema evolution: defer identifier field update when replacing columns to avoid Iceberg conflict. |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=1.0.7
1+
version=1.0.8

airbyte-cdk/bulk/toolkits/load-http/src/main/kotlin/io/airbyte/cdk/load/http/authentication/OAuthAuthenticator.kt

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import io.airbyte.cdk.load.http.decoder.JsonDecoder
1414
import io.airbyte.cdk.load.http.okhttp.AirbyteOkHttpClient
1515
import io.micronaut.http.HttpHeaders
1616
import java.lang.IllegalStateException
17+
import java.time.Clock
18+
import java.time.Instant
1719
import okhttp3.Interceptor
1820
import okhttp3.OkHttpClient
1921
import okhttp3.Response as OkHttpResponse
@@ -23,18 +25,21 @@ class OAuthAuthenticator(
2325
private val clientId: String,
2426
private val clientSecret: String,
2527
private val refreshToken: String,
26-
private val httpClient: HttpClient = AirbyteOkHttpClient(OkHttpClient.Builder().build())
28+
private val httpClient: HttpClient = AirbyteOkHttpClient(OkHttpClient.Builder().build()),
29+
private val clock: Clock = Clock.systemUTC()
2730
) : Interceptor {
2831
object Constants {
2932
const val CLIENT_ID_FIELD_NAME: String = "client_id"
3033
const val CLIENT_SECRET_FIELD_NAME: String = "client_secret"
3134
const val GRANT_TYPE_FIELD_NAME: String = "grant_type"
3235
const val GRANT_TYPE: String = "refresh_token"
3336
const val REFRESH_TOKEN_FIELD_NAME: String = "refresh_token"
37+
const val EXPIRY_BUFFER_SECONDS: Long = 60
3438
}
3539

3640
private val decoder: JsonDecoder = JsonDecoder()
3741
private var accessToken: String? = null
42+
private var tokenExpiresAt: Instant? = null
3843

3944
override fun intercept(chain: Interceptor.Chain): OkHttpResponse {
4045
if (needToQueryAccessToken()) {
@@ -55,8 +60,8 @@ class OAuthAuthenticator(
5560
}
5661

5762
private fun isTokenExpired(): Boolean {
58-
return false // TODO as we only supports Salesforce today, the token is keep active until
59-
// there is no activity for a while which should not happen in our context
63+
val expiresAt = tokenExpiresAt ?: return false
64+
return Instant.now(clock).isAfter(expiresAt.minusSeconds(Constants.EXPIRY_BUFFER_SECONDS))
6065
}
6166

6267
/**
@@ -92,6 +97,11 @@ class OAuthAuthenticator(
9297
}
9398

9499
private fun refreshAccessToken() {
95-
accessToken = queryForAccessToken().get("access_token").asText()
100+
val tokenResponse = queryForAccessToken()
101+
accessToken = tokenResponse.get("access_token").asText()
102+
val expiresInNode = tokenResponse.get("expires_in")
103+
if (expiresInNode != null && expiresInNode.isNumber) {
104+
tokenExpiresAt = Instant.now(clock).plusSeconds(expiresInNode.asLong())
105+
}
96106
}
97107
}

airbyte-cdk/bulk/toolkits/load-http/src/test/kotlin/io/airbyte/cdk/load/http/authentication/OAuthAuthenticatorTest.kt

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import io.micronaut.http.HttpHeaders
1010
import io.mockk.every
1111
import io.mockk.mockk
1212
import io.mockk.verify
13+
import java.time.Clock
14+
import java.time.Instant
15+
import java.time.ZoneId
1316
import kotlin.test.assertFailsWith
1417
import okhttp3.Interceptor
1518
import okhttp3.Request
@@ -67,6 +70,84 @@ class OAuthAuthenticatorTest {
6770
verify(exactly = 1) { httpClient.send(any()) }
6871
}
6972

73+
@Test
74+
internal fun `test given token with expires_in when token expires then refresh token`() {
75+
val fixedInstant = Instant.parse("2026-01-01T00:00:00Z")
76+
val clock = Clock.fixed(fixedInstant, ZoneId.of("UTC"))
77+
val authWithClock =
78+
OAuthAuthenticator(
79+
A_ENDPOINT,
80+
A_CLIENT_ID,
81+
A_CLIENT_SECRET,
82+
A_REFRESH_TOKEN,
83+
httpClient,
84+
clock
85+
)
86+
87+
val originalRequest: Request = mockk()
88+
val chain: Interceptor.Chain = mockChain(originalRequest)
89+
mockBuilder(originalRequest)
90+
mockCallWithExpiresIn(1800) // 30 minutes
91+
92+
// First call fetches the token
93+
authWithClock.intercept(chain)
94+
verify(exactly = 1) { httpClient.send(any()) }
95+
96+
// Second call within expiry window should not refresh
97+
authWithClock.intercept(chain)
98+
verify(exactly = 1) { httpClient.send(any()) }
99+
}
100+
101+
@Test
102+
internal fun `test given expired token when performing a request then refresh token`() {
103+
val startInstant = Instant.parse("2026-01-01T00:00:00Z")
104+
// Token expires in 1800s (30 min), buffer is 60s, so expired at startInstant + 1740s
105+
val expiredInstant = startInstant.plusSeconds(1800)
106+
val mutableClock = mockk<Clock>()
107+
every { mutableClock.instant() } returns startInstant
108+
every { mutableClock.zone } returns ZoneId.of("UTC")
109+
110+
val authWithClock =
111+
OAuthAuthenticator(
112+
A_ENDPOINT,
113+
A_CLIENT_ID,
114+
A_CLIENT_SECRET,
115+
A_REFRESH_TOKEN,
116+
httpClient,
117+
mutableClock
118+
)
119+
120+
val originalRequest: Request = mockk()
121+
val chain: Interceptor.Chain = mockChain(originalRequest)
122+
mockBuilder(originalRequest)
123+
mockCallWithExpiresIn(1800)
124+
125+
// First call fetches the token
126+
authWithClock.intercept(chain)
127+
verify(exactly = 1) { httpClient.send(any()) }
128+
129+
// Advance time past expiry
130+
every { mutableClock.instant() } returns expiredInstant
131+
132+
// Second call should refresh because token is expired
133+
authWithClock.intercept(chain)
134+
verify(exactly = 2) { httpClient.send(any()) }
135+
}
136+
137+
@Test
138+
internal fun `test given no expires_in in response when performing requests then do not refresh`() {
139+
val originalRequest: Request = mockk()
140+
val chain: Interceptor.Chain = mockChain(originalRequest)
141+
mockBuilder(originalRequest)
142+
mockCall() // no expires_in in response
143+
144+
authenticator.intercept(chain)
145+
authenticator.intercept(chain)
146+
147+
// Without expires_in, token is never considered expired (backward compatible)
148+
verify(exactly = 1) { httpClient.send(any()) }
149+
}
150+
70151
@Test
71152
internal fun `test given status is not 2XX when performing a request then raise`() {
72153
val originalRequest: Request = mockk()
@@ -91,6 +172,20 @@ class OAuthAuthenticatorTest {
91172
every { httpClient.send(any()) } returns (oauthResponse)
92173
}
93174

175+
private fun mockCallWithExpiresIn(expiresIn: Int) {
176+
every { httpClient.send(any()) } answers
177+
{
178+
val oauthResponse: Response = mockk<Response>()
179+
every { oauthResponse.statusCode } returns 200
180+
every { oauthResponse.body } returns
181+
"{\"access_token\":\"${AN_ACCESS_TOKEN}\",\"expires_in\":$expiresIn}".byteInputStream(
182+
Charsets.UTF_8
183+
)
184+
every { oauthResponse.close() } returns Unit
185+
oauthResponse
186+
}
187+
}
188+
94189
private fun mockBuilder(originalRequest: Request): Request.Builder {
95190
val builder: Request.Builder = mockk()
96191
every { builder.addHeader(any(), any()) } returns (builder)

0 commit comments

Comments
 (0)