Skip to content

Commit d9c9e7d

Browse files
feat(source-s3): add opt-in skip_full_check_for_parquet to avoid OOM during CHECK (#76162)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 84b1266 commit d9c9e7d

7 files changed

Lines changed: 298 additions & 4 deletions

File tree

airbyte-integrations/connectors/source-s3/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: file
1111
connectorType: source
1212
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
13-
dockerImageTag: 4.15.2
13+
dockerImageTag: 4.15.3
1414
dockerRepository: airbyte/source-s3
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
1616
externalDocumentationUrls:

airbyte-integrations/connectors/source-s3/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "4.15.2"
6+
version = "4.15.3"
77
name = "source-s3"
88
description = "Source implementation for S3."
99
authors = [ "Airbyte <contact@airbyte.io>",]
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
from __future__ import annotations
5+
6+
import logging
7+
import traceback
8+
from typing import Optional, Tuple
9+
10+
from airbyte_cdk import AirbyteTracedException
11+
from airbyte_cdk.sources import Source
12+
from airbyte_cdk.sources.file_based.availability_strategy import (
13+
DefaultFileBasedAvailabilityStrategy,
14+
)
15+
from airbyte_cdk.sources.file_based.exceptions import (
16+
CheckAvailabilityError,
17+
FileBasedSourceError,
18+
)
19+
from airbyte_cdk.sources.file_based.file_types.parquet_parser import ParquetParser
20+
from source_s3.v4.config import S3FileBasedStreamConfig
21+
22+
23+
class SourceS3AvailabilityStrategy(DefaultFileBasedAvailabilityStrategy):
24+
"""
25+
Custom availability strategy that optionally skips the full parse check for Parquet
26+
streams to avoid OOM errors on large files. The skip is gated behind the per-stream
27+
``skip_full_check_for_parquet`` flag (default False). Non-Parquet streams and streams
28+
without the flag always use the default check path.
29+
"""
30+
31+
def check_availability_and_parsability(
32+
self,
33+
stream,
34+
logger: logging.Logger,
35+
source: Optional[Source],
36+
) -> Tuple[bool, Optional[str]]:
37+
parser = stream.get_parser()
38+
39+
skip_flag = isinstance(stream.config, S3FileBasedStreamConfig) and stream.config.skip_full_check_for_parquet
40+
if not (isinstance(parser, ParquetParser) and skip_flag):
41+
return super().check_availability_and_parsability(stream, logger, source)
42+
43+
# Parquet path: validate config, list files, and open the file to verify
44+
# accessibility — but skip the full record parse to avoid loading entire
45+
# row groups into memory.
46+
config_check_result, config_check_error_message = parser.check_config(stream.config)
47+
if config_check_result is False:
48+
return False, config_check_error_message
49+
50+
try:
51+
file = self._check_list_files(stream)
52+
handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
53+
handle.close()
54+
except AirbyteTracedException as ate:
55+
raise ate
56+
except CheckAvailabilityError:
57+
return False, "".join(traceback.format_exc())
58+
except Exception as exc:
59+
raise CheckAvailabilityError(FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri) from exc
60+
61+
return True, None

airbyte-integrations/connectors/source-s3/source_s3/v4/config.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,28 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
from typing import Any, Dict, Optional
5+
from typing import Any, Dict, List, Optional
66

77
import dpath.util
88
from pydantic.v1 import AnyUrl, Field, root_validator
99
from pydantic.v1.error_wrappers import ValidationError
1010

1111
from airbyte_cdk import is_cloud_environment
1212
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec, DeliverRawFiles, DeliverRecords
13+
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
14+
15+
16+
class S3FileBasedStreamConfig(FileBasedStreamConfig):
17+
"""S3-specific stream config that adds a flag to skip the full parse check for Parquet files."""
18+
19+
skip_full_check_for_parquet: bool = Field(
20+
title="Skip Full Check for Parquet",
21+
description=(
22+
"When enabled, the CHECK operation for Parquet streams will verify file accessibility "
23+
"but skip the full record-parse step. This avoids out-of-memory errors on large Parquet files."
24+
),
25+
default=False,
26+
)
1327

1428

1529
class Config(AbstractFileBasedSpec):
@@ -24,6 +38,9 @@ def documentation_url(cls) -> AnyUrl:
2438

2539
bucket: str = Field(title="Bucket", description="Name of the S3 bucket where the file(s) exist.", order=0)
2640

41+
# Use the extended stream config type but keep parent field metadata via schema patching.
42+
streams: List[S3FileBasedStreamConfig]
43+
2744
aws_access_key_id: Optional[str] = Field(
2845
title="AWS Access Key ID",
2946
default=None,
@@ -99,6 +116,15 @@ def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
99116
"""
100117
schema = super().schema(*args, **kwargs)
101118

119+
# Keep parent streams metadata to avoid drift, then inject the S3-specific flag.
120+
parent_schema = AbstractFileBasedSpec.schema()
121+
schema["properties"]["streams"] = parent_schema["properties"]["streams"]
122+
123+
s3_stream_schema = S3FileBasedStreamConfig.schema(*args, **kwargs)
124+
skip_prop = s3_stream_schema["properties"]["skip_full_check_for_parquet"]
125+
stream_item_props = schema["properties"]["streams"]["items"]["properties"]
126+
stream_item_props["skip_full_check_for_parquet"] = skip_prop
127+
102128
# Hide API processing option until https://github.com/airbytehq/airbyte-platform-internal/issues/10354 is fixed
103129
processing_options = dpath.util.get(schema, "properties/streams/items/properties/format/oneOf/4/properties/processing/oneOf")
104130
dpath.util.set(schema, "properties/streams/items/properties/format/oneOf/4/properties/processing/oneOf", processing_options[:1])

airbyte-integrations/connectors/source-s3/source_s3/v4/source.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from airbyte_cdk.sources.file_based.file_based_source import DEFAULT_CONCURRENCY, FileBasedSource
3131
from source_s3.source import SourceS3Spec
3232
from source_s3.utils import airbyte_message_to_json
33+
from source_s3.v4.availability_strategy import SourceS3AvailabilityStrategy
3334
from source_s3.v4.config import Config
3435
from source_s3.v4.cursor import Cursor
3536
from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer
@@ -213,9 +214,12 @@ def create(
213214
# the Airbyte logs, which is not ideal. So we'll just exit with an error code instead.
214215
sys.exit(1)
215216

217+
stream_reader = SourceS3StreamReader()
218+
216219
return cls(
217220
# These are the defaults for the source. No need for a caller to change them:
218-
stream_reader=SourceS3StreamReader(),
221+
stream_reader=stream_reader,
222+
availability_strategy=SourceS3AvailabilityStrategy(stream_reader),
219223
spec_class=Config,
220224
cursor_cls=Cursor,
221225
# This is needed early. (We also will provide it again later.)
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from __future__ import annotations
6+
7+
import logging
8+
from unittest.mock import MagicMock, patch
9+
10+
import pytest
11+
from source_s3.v4.availability_strategy import SourceS3AvailabilityStrategy
12+
from source_s3.v4.config import S3FileBasedStreamConfig
13+
14+
from airbyte_cdk import AirbyteTracedException
15+
from airbyte_cdk.sources.file_based.exceptions import CheckAvailabilityError, FileBasedSourceError
16+
from airbyte_cdk.sources.file_based.file_types.parquet_parser import ParquetParser
17+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
18+
19+
20+
logger = logging.getLogger("test")
21+
22+
23+
# ---------------------------------------------------------------------------
24+
# Helpers
25+
# ---------------------------------------------------------------------------
26+
27+
28+
def _make_stream(skip_full_check_for_parquet: bool = False, parser=None, files=None):
29+
"""Build a mock stream with the minimal surface used by the strategy."""
30+
stream = MagicMock()
31+
stream.name = "test_stream"
32+
stream.config = MagicMock(spec=S3FileBasedStreamConfig)
33+
stream.config.skip_full_check_for_parquet = skip_full_check_for_parquet
34+
35+
if parser is None:
36+
parser = MagicMock(spec=ParquetParser)
37+
parser.file_read_mode = "rb"
38+
parser.check_config.return_value = (True, None)
39+
stream.get_parser.return_value = parser
40+
41+
if files is not None:
42+
stream.get_files.return_value = iter(files)
43+
44+
return stream
45+
46+
47+
def _make_remote_file(uri: str = "s3://bucket/data.parquet") -> RemoteFile:
48+
return MagicMock(spec=RemoteFile, uri=uri)
49+
50+
51+
def _make_strategy():
52+
stream_reader = MagicMock()
53+
return SourceS3AvailabilityStrategy(stream_reader)
54+
55+
56+
# ---------------------------------------------------------------------------
57+
# check_availability_and_parsability – delegation tests
58+
# ---------------------------------------------------------------------------
59+
60+
61+
@pytest.mark.parametrize(
62+
"skip_full_check_for_parquet,parser_cls",
63+
[
64+
pytest.param(False, ParquetParser, id="flag-off-parquet-parser"),
65+
pytest.param(True, None, id="flag-on-non-parquet-parser"),
66+
pytest.param(False, None, id="flag-off-non-parquet-parser"),
67+
],
68+
)
69+
def test_delegates_to_super_when_skip_not_applicable(skip_full_check_for_parquet, parser_cls):
70+
"""When skip_full_check_for_parquet is False or parser is not ParquetParser, super() is called."""
71+
if parser_cls is ParquetParser:
72+
parser = MagicMock(spec=ParquetParser)
73+
parser.check_config.return_value = (True, None)
74+
else:
75+
parser = MagicMock() # not a ParquetParser instance
76+
parser.check_config.return_value = (True, None)
77+
78+
stream = _make_stream(skip_full_check_for_parquet=skip_full_check_for_parquet, parser=parser)
79+
strategy = _make_strategy()
80+
81+
with patch.object(
82+
SourceS3AvailabilityStrategy.__bases__[0],
83+
"check_availability_and_parsability",
84+
return_value=(True, None),
85+
) as super_mock:
86+
result = strategy.check_availability_and_parsability(stream, logger, None)
87+
88+
assert super_mock.called
89+
assert result == (True, None)
90+
91+
92+
# ---------------------------------------------------------------------------
93+
# Parquet skip-check path tests (flag=True)
94+
# ---------------------------------------------------------------------------
95+
96+
97+
def test_parquet_skips_full_parse_and_opens_file():
98+
"""When skip_full_check_for_parquet is True and parser is ParquetParser, the strategy skips _check_parse_record and only opens the file."""
99+
parser = MagicMock(spec=ParquetParser)
100+
parser.file_read_mode = "rb"
101+
parser.check_config.return_value = (True, None)
102+
103+
file = _make_remote_file()
104+
stream = _make_stream(skip_full_check_for_parquet=True, parser=parser, files=[file])
105+
handle_mock = MagicMock()
106+
stream.stream_reader.open_file.return_value = handle_mock
107+
108+
strategy = _make_strategy()
109+
110+
with patch.object(strategy, "_check_list_files", return_value=file) as list_mock:
111+
result = strategy.check_availability_and_parsability(stream, logger, None)
112+
113+
list_mock.assert_called_once_with(stream)
114+
stream.stream_reader.open_file.assert_called_once_with(file, "rb", None, logger)
115+
handle_mock.close.assert_called_once()
116+
assert result == (True, None)
117+
118+
119+
def test_parquet_returns_false_when_config_check_fails():
120+
"""When parser.check_config returns False for a parquet stream with skip enabled, availability returns False."""
121+
parser = MagicMock(spec=ParquetParser)
122+
parser.check_config.return_value = (False, "bad config")
123+
124+
stream = _make_stream(skip_full_check_for_parquet=True, parser=parser)
125+
strategy = _make_strategy()
126+
127+
result = strategy.check_availability_and_parsability(stream, logger, None)
128+
129+
assert result == (False, "bad config")
130+
131+
132+
def test_parquet_returns_false_on_check_availability_error():
133+
"""When _check_list_files raises CheckAvailabilityError, availability returns False."""
134+
parser = MagicMock(spec=ParquetParser)
135+
parser.check_config.return_value = (True, None)
136+
137+
stream = _make_stream(skip_full_check_for_parquet=True, parser=parser)
138+
strategy = _make_strategy()
139+
140+
with patch.object(
141+
strategy, "_check_list_files", side_effect=CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream="test_stream")
142+
):
143+
available, msg = strategy.check_availability_and_parsability(stream, logger, None)
144+
145+
assert available is False
146+
assert msg is not None
147+
148+
149+
def test_parquet_reraises_airbyte_traced_exception():
150+
"""AirbyteTracedException propagates out of the parquet skip path."""
151+
parser = MagicMock(spec=ParquetParser)
152+
parser.check_config.return_value = (True, None)
153+
154+
stream = _make_stream(skip_full_check_for_parquet=True, parser=parser)
155+
strategy = _make_strategy()
156+
157+
exc = AirbyteTracedException(message="traced")
158+
with patch.object(strategy, "_check_list_files", side_effect=exc):
159+
with pytest.raises(AirbyteTracedException):
160+
strategy.check_availability_and_parsability(stream, logger, None)
161+
162+
163+
def test_parquet_wraps_unexpected_exception_in_check_availability_error():
164+
"""Unexpected exceptions from open_file are wrapped in CheckAvailabilityError."""
165+
parser = MagicMock(spec=ParquetParser)
166+
parser.file_read_mode = "rb"
167+
parser.check_config.return_value = (True, None)
168+
169+
file = _make_remote_file()
170+
stream = _make_stream(skip_full_check_for_parquet=True, parser=parser)
171+
stream.stream_reader.open_file.side_effect = RuntimeError("unexpected failure")
172+
173+
strategy = _make_strategy()
174+
175+
with patch.object(strategy, "_check_list_files", return_value=file):
176+
with pytest.raises(CheckAvailabilityError) as exc_info:
177+
strategy.check_availability_and_parsability(stream, logger, None)
178+
179+
assert isinstance(exc_info.value.__cause__, RuntimeError)
180+
181+
182+
# ---------------------------------------------------------------------------
183+
# S3FileBasedStreamConfig – skip_full_check_for_parquet field
184+
# ---------------------------------------------------------------------------
185+
186+
187+
def test_s3_stream_config_skip_full_check_for_parquet_defaults_false():
188+
"""The skip_full_check_for_parquet field defaults to False."""
189+
cfg = S3FileBasedStreamConfig(name="test", format={"filetype": "parquet"}, globs=["**/*.parquet"], validation_policy="Emit Record")
190+
assert cfg.skip_full_check_for_parquet is False
191+
192+
193+
def test_s3_stream_config_skip_full_check_for_parquet_set_true():
194+
"""The skip_full_check_for_parquet field can be set to True."""
195+
cfg = S3FileBasedStreamConfig(
196+
name="test",
197+
format={"filetype": "parquet"},
198+
globs=["**/*.parquet"],
199+
validation_policy="Emit Record",
200+
skip_full_check_for_parquet=True,
201+
)
202+
assert cfg.skip_full_check_for_parquet is True

docs/integrations/sources/s3.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ This connector utilizes the open source [Unstructured](https://unstructured-io.g
356356

357357
| Version | Date | Pull Request | Subject |
358358
|:------------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
359+
| 4.15.3 | 2026-04-08 | [76162](https://github.com/airbytehq/airbyte/pull/76162) | Add opt-in `skip_full_check_for_parquet` stream option to skip the full parse check for Parquet files during CHECK, avoiding OOM on large files |
359360
| 4.15.2 | 2025-11-11 | [69268](https://github.com/airbytehq/airbyte/pull/69268) | Update dependencies |
360361
| 4.15.1 | 2025-11-04 | [68844](https://github.com/airbytehq/airbyte/pull/68844) | Update dependencies |
361362
| 4.15.0 | 2025-10-29 | [68640](https://github.com/airbytehq/airbyte/pull/68640) | Update dependencies |

0 commit comments

Comments
 (0)