Skip to content

Commit 7db08ee

Browse files
devin-ai-integration[bot]bot_apksophiecuiyclaude
authored
feat(source-slack): add configurable toggle to skip replies for zero-reply messages (#75905)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: bot_apk <apk@cognition.ai> Co-authored-by: Sophie Cui <sophie.cui@airbyte.io> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1b7bbbb commit 7db08ee

File tree

5 files changed

+212
-12
lines changed

5 files changed

+212
-12
lines changed

airbyte-integrations/connectors/source-slack/components.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
InterpolatedString,
1919
)
2020
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
21-
from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter, SubstreamPartitionRouter
2221
from airbyte_cdk.sources.declarative.requesters import HttpRequester
2322
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
2423
InterpolatedRequestOptionsProvider,

airbyte-integrations/connectors/source-slack/manifest.yaml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,16 @@ definitions:
284284
schema:
285285
$ref: "#/schemas/channel_messages"
286286

287+
channel_messages_with_replies_stream:
288+
$ref: "#/definitions/channel_messages_stream"
289+
retriever:
290+
$ref: "#/definitions/channel_messages_stream/retriever"
291+
record_selector:
292+
$ref: "#/definitions/channel_messages_stream/retriever/record_selector"
293+
record_filter:
294+
type: RecordFilter
295+
condition: "{{ (record.reply_count or 0) > 0 if config.get('threads_ignore_no_replies', false) else true }}"
296+
287297
threads_stream:
288298
$ref: "#/definitions/stream_base"
289299
$parameters:
@@ -328,7 +338,7 @@ definitions:
328338
parent_stream_configs:
329339
- type: ParentStreamConfig
330340
stream:
331-
$ref: "#/definitions/channel_messages_stream"
341+
$ref: "#/definitions/channel_messages_with_replies_stream"
332342
# One complication is that threads can be updated at Any time in the future. Therefore, if we wanted to comprehensively sync data
333343
# i.e: get every single response in a thread, we'd have to read every message in the slack instance every time we ran a sync,
334344
# because otherwise there is no way to guarantee that a thread deep in the past didn't receive a new message.
@@ -432,6 +442,11 @@ spec:
432442
examples:
433443
- channel_one
434444
- channel_two
445+
threads_ignore_no_replies:
446+
type: boolean
447+
default: false
448+
title: Ignore messages with no replies in threads stream
449+
description: When enabled, the threads stream will skip messages that have no replies (reply_count is 0, null, or absent), reducing the number of API calls. Disabled by default to make Threads stream contain unthreaded messages in its records.
435450
credentials:
436451
title: Authentication mechanism
437452
description: Choose how to authenticate into Slack

airbyte-integrations/connectors/source-slack/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
13-
dockerImageTag: 3.1.14
13+
dockerImageTag: 3.1.15
1414
dockerRepository: airbyte/source-slack
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/slack
1616
externalDocumentationUrls:

airbyte-integrations/connectors/source-slack/unit_tests/test_streams.py

Lines changed: 182 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def authenticator(token_config):
2727
(
2828
"2020-01-01T00:00:00Z",
2929
"2020-01-02T00:00:00Z",
30-
[{"ts": 1577866844}, {"ts": 1577877406}],
30+
[{"ts": 1577866844, "reply_count": 1}, {"ts": 1577877406, "reply_count": 1}],
3131
{},
3232
[
3333
{
@@ -95,12 +95,12 @@ def test_get_updated_state(requests_mock, authenticator, token_config, current_s
9595
requests_mock.register_uri(
9696
"GET",
9797
"https://slack.com/api/conversations.history?limit=1000&channel=airbyte-for-beginners",
98-
[{"json": {"messages": [{"ts": 1507866847}]}}, {"json": {"messages": []}}],
98+
[{"json": {"messages": [{"ts": 1507866847, "reply_count": 1}]}}, {"json": {"messages": []}}],
9999
)
100100
requests_mock.register_uri(
101101
"GET",
102102
"https://slack.com/api/conversations.history?limit=1000&channel=good-reads",
103-
[{"json": {"messages": [{"ts": 1507866847}]}}, {"json": {"messages": []}}],
103+
[{"json": {"messages": [{"ts": 1507866847, "reply_count": 1}]}}, {"json": {"messages": []}}],
104104
)
105105
requests_mock.register_uri(
106106
"GET",
@@ -142,12 +142,12 @@ def test_threads_parse_response(requests_mock, authenticator, token_config):
142142
requests_mock.register_uri(
143143
"GET",
144144
"https://slack.com/api/conversations.history?limit=1000&channel=airbyte-for-beginners",
145-
[{"json": {"messages": [{"ts": 1507866847}]}}, {"json": {"messages": []}}],
145+
[{"json": {"messages": [{"ts": 1507866847, "reply_count": 1}]}}, {"json": {"messages": []}}],
146146
)
147147
requests_mock.register_uri(
148148
"GET",
149149
"https://slack.com/api/conversations.history?limit=1000&channel=good-reads",
150-
[{"json": {"messages": [{"ts": 1507866847}]}}, {"json": {"messages": []}}],
150+
[{"json": {"messages": [{"ts": 1507866847, "reply_count": 1}]}}, {"json": {"messages": []}}],
151151
)
152152

153153
requests_mock.register_uri(
@@ -237,12 +237,12 @@ def test_backoff(requests_mock, token_config, authenticator, headers, expected_r
237237
requests_mock.register_uri(
238238
"GET",
239239
"https://slack.com/api/conversations.history?limit=1000&channel=airbyte-for-beginners",
240-
[{"json": {"messages": [{"ts": 1507866847}]}}, {"json": {"messages": []}}],
240+
[{"json": {"messages": [{"ts": 1507866847, "reply_count": 1}]}}, {"json": {"messages": []}}],
241241
)
242242
requests_mock.register_uri(
243243
"GET",
244244
"https://slack.com/api/conversations.history?limit=1000&channel=good-reads",
245-
[{"json": {"messages": [{"ts": 1507866847}]}}, {"json": {"messages": []}}],
245+
[{"json": {"messages": [{"ts": 1507866847, "reply_count": 1}]}}, {"json": {"messages": []}}],
246246
)
247247

248248
catalog = ConfiguredAirbyteCatalogSerializer.load(
@@ -263,6 +263,181 @@ def test_backoff(requests_mock, token_config, authenticator, headers, expected_r
263263
assert len(output.records) == expected_result
264264

265265

266+
def test_threads_stream_skips_messages_without_replies_when_enabled(requests_mock, token_config):
267+
"""
268+
Verify that when threads_ignore_no_replies=True, the threads stream only creates
269+
partitions for parent messages with reply_count > 0.
270+
Messages with reply_count=0 or missing reply_count should be filtered out.
271+
"""
272+
token_config["channel_filter"] = []
273+
token_config["threads_ignore_no_replies"] = True
274+
275+
# Channel 1: one message with replies, one with reply_count=0, one with reply_count=None
276+
requests_mock.register_uri(
277+
"GET",
278+
"https://slack.com/api/conversations.history?limit=1000&channel=airbyte-for-beginners",
279+
[
280+
{
281+
"json": {
282+
"messages": [
283+
{"ts": 1577866844, "reply_count": 3},
284+
{"ts": 1577877406, "reply_count": 0},
285+
{"ts": 1577888888, "reply_count": None},
286+
]
287+
}
288+
},
289+
{"json": {"messages": []}},
290+
],
291+
)
292+
# Channel 2: one message with missing reply_count key entirely (should be filtered)
293+
requests_mock.register_uri(
294+
"GET",
295+
"https://slack.com/api/conversations.history?limit=1000&channel=good-reads",
296+
[
297+
{"json": {"messages": [{"ts": 1577866844}]}},
298+
{"json": {"messages": []}},
299+
],
300+
)
301+
302+
stream = get_stream_by_name("threads", token_config)
303+
slices = list(map(lambda partition: partition.to_slice(), stream.generate_partitions()))
304+
305+
# Only the message with reply_count=3 should produce a partition
306+
# reply_count=0, reply_count=None, and missing reply_count should all be filtered out
307+
assert len(slices) == 1
308+
assert slices[0]["float_ts"] == 1577866844
309+
assert slices[0]["parent_slice"]["channel"] == "airbyte-for-beginners"
310+
311+
312+
def test_threads_stream_includes_all_messages_by_default(requests_mock, token_config):
313+
"""
314+
Verify that when threads_ignore_no_replies is not set (default=False),
315+
all messages are passed through as partitions, preserving current behavior.
316+
This includes messages with reply_count=None (null from API).
317+
"""
318+
token_config["channel_filter"] = []
319+
# Do NOT set threads_ignore_no_replies — should default to False
320+
321+
# Channel 1: messages with various reply_count values including None
322+
requests_mock.register_uri(
323+
"GET",
324+
"https://slack.com/api/conversations.history?limit=1000&channel=airbyte-for-beginners",
325+
[
326+
{
327+
"json": {
328+
"messages": [
329+
{"ts": 1577866844, "reply_count": 3},
330+
{"ts": 1577877406, "reply_count": 0},
331+
{"ts": 1577888888, "reply_count": None},
332+
]
333+
}
334+
},
335+
{"json": {"messages": []}},
336+
],
337+
)
338+
# Channel 2: one message with missing reply_count key
339+
requests_mock.register_uri(
340+
"GET",
341+
"https://slack.com/api/conversations.history?limit=1000&channel=good-reads",
342+
[
343+
{"json": {"messages": [{"ts": 1577866844}]}},
344+
{"json": {"messages": []}},
345+
],
346+
)
347+
348+
stream = get_stream_by_name("threads", token_config)
349+
slices = list(map(lambda partition: partition.to_slice(), stream.generate_partitions()))
350+
351+
# All 4 messages should produce partitions (no filtering when disabled)
352+
assert len(slices) == 4
353+
354+
355+
def test_threads_stream_no_replies_api_calls_skipped_when_enabled(requests_mock, token_config):
356+
"""
357+
End-to-end test: when threads_ignore_no_replies=True, verify that conversations.replies
358+
is only called for messages with reply_count > 0 (not for reply_count=0, None, or absent).
359+
Uses requests_mock.request_history to confirm API call reduction.
360+
"""
361+
token_config["channel_filter"] = []
362+
token_config["threads_ignore_no_replies"] = True
363+
364+
# Channel 1: one message with replies (reply_count=3), one without (reply_count=0)
365+
requests_mock.register_uri(
366+
"GET",
367+
"https://slack.com/api/conversations.history?limit=1000&channel=airbyte-for-beginners",
368+
[
369+
{
370+
"json": {
371+
"messages": [
372+
{"ts": "1577866844.000000", "reply_count": 3},
373+
{"ts": "1577877406.000000", "reply_count": 0},
374+
]
375+
}
376+
},
377+
{"json": {"messages": []}},
378+
],
379+
)
380+
# Channel 2: one message with reply_count=None (should be filtered)
381+
requests_mock.register_uri(
382+
"GET",
383+
"https://slack.com/api/conversations.history?limit=1000&channel=good-reads",
384+
[
385+
{"json": {"messages": [{"ts": "1577866844.000000", "reply_count": None}]}},
386+
{"json": {"messages": []}},
387+
],
388+
)
389+
# Only the message with reply_count=3 should trigger a conversations.replies call
390+
requests_mock.register_uri(
391+
"GET",
392+
"https://slack.com/api/conversations.replies?channel=airbyte-for-beginners&limit=1000&ts=1577866844.000000",
393+
json={
394+
"messages": [
395+
{
396+
"type": "message",
397+
"ts": "1577866844.000000",
398+
"thread_ts": "1577866844.000000",
399+
"reply_count": 3,
400+
"text": "parent",
401+
},
402+
{
403+
"type": "message",
404+
"ts": "1577866845.000000",
405+
"thread_ts": "1577866844.000000",
406+
"text": "reply",
407+
},
408+
]
409+
},
410+
)
411+
412+
catalog = ConfiguredAirbyteCatalogSerializer.load(
413+
{
414+
"streams": [
415+
{
416+
"stream": {
417+
"name": "threads",
418+
"json_schema": {},
419+
"supported_sync_modes": ["full_refresh", "incremental"],
420+
},
421+
"sync_mode": "incremental",
422+
"destination_sync_mode": "append",
423+
}
424+
]
425+
}
426+
)
427+
state = StateBuilder().with_stream_state("threads", {}).build()
428+
source_slack = get_source(token_config, "threads", state)
429+
output = read(source_slack, config=token_config, catalog=catalog, state=state)
430+
431+
# Verify records were returned from the one valid thread
432+
assert len(output.records) == 2
433+
434+
# Verify conversations.replies was called exactly once (only for reply_count=3 message)
435+
replies_calls = [req for req in requests_mock.request_history if "conversations.replies" in req.url]
436+
assert len(replies_calls) == 1
437+
assert "channel=airbyte-for-beginners" in replies_calls[0].url
438+
assert "ts=1577866844.000000" in replies_calls[0].url
439+
440+
266441
def test_channels_stream_with_autojoin(token_config, requests_mock) -> None:
267442
"""
268443
The test uses the `conversations_list` fixture(autouse=true) as API mocker.

docs/integrations/sources/slack.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ If you are using an API key to authenticate to Slack, a refresh token is not req
8585
<FieldAnchor field="include_private_channels">
8686
9. (Optional) **Include_private_channels** Toggle on to sync data from private channels. You will need to manually add the bot to private channels even if `join_channels` is toggled on.
8787
</FieldAnchor>
88-
10. Click **Set up source**. You must add the App created in Step 1 to the channels with the data that you want to sync.
88+
<FieldAnchor field="threads_ignore_no_replies">
89+
10. (Optional) **Ignore messages with no replies in threads stream** Toggle on to skip messages with no replies (`reply_count=0`) in the Threads stream. This reduces unnecessary `conversations.replies` API calls and can significantly speed up syncs for workspaces with many messages. Disabled by default to make the Threads stream contain unthreaded messages in its records.
90+
</FieldAnchor>
91+
11. Click **Set up source**. You must add the App created in Step 1 to the channels with the data that you want to sync.
8992
<!-- /env:cloud -->
9093

9194
<!-- env:oss -->
@@ -101,7 +104,8 @@ If you are using an API key to authenticate to Slack, a refresh token is not req
101104
7. **Threads Lookback window (Days)**. This corresponds to the number of days in the past from which you want to sync data.
102105
8. (Optional) **Channel filter** the list of channel names (without leading '#' char) that limits the channels from which you'd like to sync. If no channels are specified, Airbyte will replicate data from all channels.
103106
9. (Optional) **Include_private_channels** Toggle on to sync data from private channels. You will need to manually add the bot to private channels even if `join_channels` is toggled on.
104-
10. Click **Set up source**. You must add the App created in Step 1 to the channels with the data that you want to sync.
107+
10. (Optional) **Ignore messages with no replies in threads stream** Toggle on to skip messages with no replies (`reply_count=0`) in the Threads stream. This reduces unnecessary `conversations.replies` API calls and can significantly speed up syncs for workspaces with many messages. Disabled by default to make the Threads stream contain unthreaded messages in its records.
108+
11. Click **Set up source**. You must add the App created in Step 1 to the channels with the data that you want to sync.
105109
<!-- /env:oss -->
106110

107111
<HideInUI>
@@ -170,6 +174,12 @@ These two streams are effectively limited to **one request per minute**. Conside
170174

171175
- Check out common troubleshooting issues for the Slack source connector on our Airbyte Forum [here](https://github.com/airbytehq/airbyte/discussions).
172176

177+
#### Threads stream performance
178+
179+
If your Threads stream syncs are slow, consider enabling the **Ignore messages with no replies in threads stream** (`threads_ignore_no_replies`) option. By default, the Threads stream calls the `conversations.replies` API for every message, including those with no replies. In many workspaces, the majority of messages have no replies, so these API calls are wasted and consume rate-limit budget.
180+
181+
- **Set to `true`** when you want to optimize sync performance and only need thread replies for messages that actually have threaded conversations. This can reduce API calls by up to 89% depending on your workspace.
182+
- **Keep as `false` (default)** when you need the Threads stream to include records for all messages, including unthreaded ones. This preserves the current behavior where every message appears in the Threads stream output.
173183

174184
</details>
175185

@@ -183,6 +193,7 @@ These two streams are effectively limited to **one request per minute**. Conside
183193

184194
| Version | Date | Pull Request | Subject |
185195
|:-----------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
196+
| 3.1.15 | 2026-03-31 | [75905](https://github.com/airbytehq/airbyte/pull/75905) | Add configurable option to skip conversations.replies API calls for messages with no replies, reducing unnecessary API usage |
186197
| 3.1.14 | 2026-03-27 | [75197](https://github.com/airbytehq/airbyte/pull/75197) | Add declarative OAuth with `oauth_connector_input_specification` and granular scopes |
187198
| 3.1.13 | 2026-03-24 | [75329](https://github.com/airbytehq/airbyte/pull/75329) | Update dependencies |
188199
| 3.1.12 | 2026-03-10 | [74598](https://github.com/airbytehq/airbyte/pull/74598) | Update dependencies |

0 commit comments

Comments
 (0)