-
Notifications
You must be signed in to change notification settings - Fork 507
REST: Add retry and timeout configuration for REST catalog #3418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
9d85042
6fb87ff
afb1f51
47a5382
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| from __future__ import annotations | ||
|
|
||
| from collections import deque | ||
| from collections.abc import Mapping | ||
| from enum import Enum | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
|
|
@@ -25,9 +26,11 @@ | |
| from urllib.parse import quote, unquote | ||
|
|
||
| from pydantic import ConfigDict, Field, TypeAdapter, field_validator | ||
| from requests import HTTPError, Session | ||
| from requests import HTTPError, PreparedRequest, Response, Session | ||
| from requests.adapters import HTTPAdapter | ||
| from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt | ||
| from typing_extensions import override | ||
| from urllib3.util.retry import Retry | ||
|
|
||
| from pyiceberg import __version__ | ||
| from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary | ||
|
|
@@ -255,6 +258,14 @@ class ScanPlanningMode(Enum): | |
| SIGV4_SERVICE = "rest.signing-name" | ||
| SIGV4_MAX_RETRIES = "rest.sigv4.max-retries" | ||
| SIGV4_MAX_RETRIES_DEFAULT = 10 | ||
| CONNECTION = "connection" | ||
| CONNECTION_TIMEOUT = "timeout" | ||
| CONNECTION_RETRIES = "retries" | ||
| CONNECTION_BACKOFF_FACTOR = "backoff-factor" | ||
| # Hard-coded internally so users cannot misconfigure the retry policy | ||
| # (e.g. setting raise_on_status=False would swallow 4xx errors silently). | ||
| _CONNECTION_RETRY_STATUS_FORCELIST = (429, 500, 502, 503, 504) | ||
| _CONNECTION_RETRY_ALLOWED_METHODS = frozenset({"GET", "HEAD", "OPTIONS"}) | ||
| EMPTY_BODY_SHA256: str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" | ||
| OAUTH2_SERVER_URI = "oauth2-server-uri" | ||
| SNAPSHOT_LOADING_MODE = "snapshot-loading-mode" | ||
|
|
@@ -392,6 +403,89 @@ class ListViewsResponse(IcebergBaseModel): | |
| _PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse) | ||
|
|
||
|
|
||
| class _RetryTimeoutHTTPAdapter(HTTPAdapter): | ||
| """HTTPAdapter that applies a default per-request timeout. | ||
|
|
||
| requests does not provide a way to set a default timeout on a Session; | ||
| without this adapter, every call would have to thread `timeout=` through. | ||
| The adapter applies `self._timeout` whenever a per-call timeout is not set. | ||
| """ | ||
|
|
||
| def __init__(self, timeout: float | None = None, max_retries: Retry | int | None = None) -> None: | ||
| self._timeout = timeout | ||
| if max_retries is not None: | ||
| super().__init__(max_retries=max_retries) | ||
| else: | ||
| super().__init__() | ||
|
|
||
| def send( | ||
| self, | ||
| request: PreparedRequest, | ||
| stream: bool = False, | ||
| timeout: None | float | tuple[float, float] | tuple[float, None] = None, | ||
| verify: bool | str = True, | ||
| cert: None | bytes | str | tuple[bytes | str, bytes | str] = None, | ||
| proxies: Mapping[str, str] | None = None, | ||
| ) -> Response: | ||
| if timeout is None: | ||
| timeout = self._timeout | ||
| return super().send(request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies) | ||
|
|
||
|
|
||
| def _create_connection_adapter(properties: Properties) -> _RetryTimeoutHTTPAdapter | None: | ||
| """Build a connection adapter from the optional `connection.*` properties. | ||
|
|
||
| Returns None when no `connection` block is supplied, leaving the default | ||
| Session behavior unchanged. Raises ValueError on invalid input. | ||
| """ | ||
| connection_config = properties.get(CONNECTION) | ||
| if not connection_config: | ||
| return None | ||
| if not isinstance(connection_config, dict): | ||
| raise ValueError(f"`{CONNECTION}` must be a mapping, got: {type(connection_config).__name__}") | ||
|
|
||
| timeout: float | None = None | ||
| if (raw_timeout := connection_config.get(CONNECTION_TIMEOUT)) is not None: | ||
| try: | ||
| timeout = float(raw_timeout) | ||
| except (TypeError, ValueError) as e: | ||
| raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a number, got: {raw_timeout!r}") from e | ||
| if timeout <= 0: | ||
| raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a positive number, got: {timeout}") | ||
|
|
||
| retries: int | None = None | ||
| if (raw_retries := connection_config.get(CONNECTION_RETRIES)) is not None: | ||
| try: | ||
| retries = int(raw_retries) | ||
| except (TypeError, ValueError) as e: | ||
| raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be an integer, got: {raw_retries!r}") from e | ||
| if retries < 0: | ||
| raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be non-negative, got: {retries}") | ||
|
|
||
| backoff_factor: float | None = None | ||
| if (raw_backoff := connection_config.get(CONNECTION_BACKOFF_FACTOR)) is not None: | ||
| try: | ||
| backoff_factor = float(raw_backoff) | ||
| except (TypeError, ValueError) as e: | ||
| raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must be a number, got: {raw_backoff!r}") from e | ||
| if backoff_factor < 0: | ||
| raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must be non-negative, got: {backoff_factor}") | ||
|
|
||
| max_retries: Retry | None = None | ||
| if retries is not None or backoff_factor is not None: | ||
| max_retries = Retry( | ||
| total=retries if retries is not None else 0, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can set retries initially to 0 to avoid this statement.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call — initialized |
||
| backoff_factor=backoff_factor if backoff_factor is not None else 0, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same fix — |
||
| status_forcelist=list(_CONNECTION_RETRY_STATUS_FORCELIST), | ||
| allowed_methods=_CONNECTION_RETRY_ALLOWED_METHODS, | ||
| ) | ||
|
|
||
| if timeout is None and max_retries is None: | ||
| return None | ||
|
|
||
| return _RetryTimeoutHTTPAdapter(timeout=timeout, max_retries=max_retries) | ||
|
|
||
|
|
||
| class RestCatalog(Catalog): | ||
| uri: str | ||
| _session: Session | ||
|
|
@@ -418,6 +512,12 @@ def _create_session(self) -> Session: | |
| """Create a request session with provided catalog configuration.""" | ||
| session = Session() | ||
|
|
||
| # Mount the retry/timeout adapter when `connection.*` properties are set. | ||
| # SigV4's adapter mounted below at `self.uri` is a longer prefix and still wins for that host. | ||
| if (connection_adapter := _create_connection_adapter(self.properties)) is not None: | ||
| session.mount("http://", connection_adapter) | ||
| session.mount("https://", connection_adapter) | ||
|
|
||
| # Set HTTP headers | ||
| self._config_headers(session) | ||
|
|
||
|
|
@@ -763,8 +863,6 @@ def _init_sigv4(self, session: Session) -> None: | |
| import boto3 | ||
| from botocore.auth import SigV4Auth | ||
| from botocore.awsrequest import AWSRequest | ||
| from requests import PreparedRequest | ||
| from requests.adapters import HTTPAdapter | ||
|
|
||
| class SigV4Adapter(HTTPAdapter): | ||
| def __init__(self, **properties: str): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,10 @@ | |
| import pyiceberg | ||
| from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog | ||
| from pyiceberg.catalog.rest import ( | ||
| CONNECTION, | ||
| CONNECTION_BACKOFF_FACTOR, | ||
| CONNECTION_RETRIES, | ||
| CONNECTION_TIMEOUT, | ||
| DEFAULT_ENDPOINTS, | ||
| EMPTY_BODY_SHA256, | ||
| OAUTH2_SERVER_URI, | ||
|
|
@@ -43,6 +47,7 @@ | |
| HttpMethod, | ||
| RestCatalog, | ||
| ScanPlanningMode, | ||
| _RetryTimeoutHTTPAdapter, | ||
| ) | ||
| from pyiceberg.exceptions import ( | ||
| AuthorizationExpiredError, | ||
|
|
@@ -2019,6 +2024,131 @@ def test_request_session_with_ssl_client_cert() -> None: | |
| assert "Could not find the TLS certificate file, invalid path: path_to_client_cert" in str(e.value) | ||
|
|
||
|
|
||
| def test_session_without_connection_config_uses_default_adapter(rest_mock: Mocker) -> None: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we get a test where we set the retry logic and then see the retries occur? We should be able to simulate this with mock HTTP calls and then see that X number of HTTP calls were made afterwards.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
| catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) | ||
| for adapter in catalog._session.adapters.values(): | ||
| assert not isinstance(adapter, _RetryTimeoutHTTPAdapter) | ||
|
|
||
|
|
||
| def test_session_with_connection_timeout_and_retries(rest_mock: Mocker) -> None: | ||
| catalog_properties = { | ||
| "uri": TEST_URI, | ||
| "token": TEST_TOKEN, | ||
| CONNECTION: { | ||
| CONNECTION_TIMEOUT: 60, | ||
| CONNECTION_RETRIES: 5, | ||
| CONNECTION_BACKOFF_FACTOR: 1.0, | ||
| }, | ||
| } | ||
| catalog = RestCatalog("rest", **catalog_properties) # type: ignore | ||
|
|
||
| https_adapter = catalog._session.adapters["https://"] | ||
| http_adapter = catalog._session.adapters["http://"] | ||
| assert isinstance(https_adapter, _RetryTimeoutHTTPAdapter) | ||
| assert https_adapter is http_adapter | ||
| assert https_adapter._timeout == 60.0 | ||
| assert https_adapter.max_retries.total == 5 | ||
| assert https_adapter.max_retries.backoff_factor == 1.0 | ||
| # Internal retry policy: transient codes and idempotent methods only. | ||
| assert https_adapter.max_retries.status_forcelist == [429, 500, 502, 503, 504] | ||
| allowed_methods = https_adapter.max_retries.allowed_methods or frozenset() | ||
| assert set(allowed_methods) == {"GET", "HEAD", "OPTIONS"} | ||
|
|
||
|
|
||
| def test_session_with_connection_timeout_only(rest_mock: Mocker) -> None: | ||
| catalog_properties = { | ||
| "uri": TEST_URI, | ||
| "token": TEST_TOKEN, | ||
| CONNECTION: {CONNECTION_TIMEOUT: "30"}, | ||
| } | ||
| catalog = RestCatalog("rest", **catalog_properties) # type: ignore | ||
| adapter = catalog._session.adapters["https://"] | ||
| assert isinstance(adapter, _RetryTimeoutHTTPAdapter) | ||
| assert adapter._timeout == 30.0 | ||
| # No retry options set, so no Retry object is configured. | ||
| assert adapter.max_retries.total == 0 | ||
|
|
||
|
|
||
| def test_session_retries_on_transient_5xx_then_succeeds() -> None: | ||
| """Three real 503 responses followed by a 200; the catalog should make all four attempts. | ||
|
|
||
| `requests_mock` would replace our HTTPAdapter, bypassing the retry logic we want to exercise, | ||
| so this test stands up an actual `http.server` on a loopback port instead. | ||
| """ | ||
| import json | ||
| import threading | ||
| from http.server import BaseHTTPRequestHandler, HTTPServer | ||
|
|
||
| state = {"namespace_calls": 0} | ||
| config_body = json.dumps( | ||
| {"defaults": {}, "overrides": {}, "endpoints": [str(endpoint) for endpoint in TEST_SUPPORTED_ENDPOINTS]} | ||
| ).encode() | ||
|
|
||
| class _Handler(BaseHTTPRequestHandler): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add the server setup into a different method? That way, we can easily see what this is actually testing + less about the test setup.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in 47a5382 — extracted the handler + threading setup into a |
||
| def do_GET(self) -> None: | ||
| if self.path.endswith("/v1/config"): | ||
| self._respond(200, config_body) | ||
| elif self.path.endswith("/v1/namespaces"): | ||
| state["namespace_calls"] += 1 | ||
| if state["namespace_calls"] <= 3: | ||
| self._respond(503, b"") | ||
| else: | ||
| self._respond(200, json.dumps({"namespaces": [["foo"]]}).encode()) | ||
| else: | ||
| self._respond(404, b"") | ||
|
|
||
| def _respond(self, status: int, body: bytes) -> None: | ||
| self.send_response(status) | ||
| self.send_header("Content-Type", "application/json") | ||
| self.send_header("Content-Length", str(len(body))) | ||
| self.end_headers() | ||
| if body: | ||
| self.wfile.write(body) | ||
|
|
||
| def log_message(self, format: str, *args: Any) -> None: # silence default access logs | ||
| pass | ||
|
|
||
| server = HTTPServer(("127.0.0.1", 0), _Handler) | ||
| port = server.server_address[1] | ||
| server_thread = threading.Thread(target=server.serve_forever, daemon=True) | ||
| server_thread.start() | ||
| try: | ||
| catalog = RestCatalog( | ||
| "rest", | ||
| **{ # type: ignore | ||
| "uri": f"http://127.0.0.1:{port}/", | ||
| "token": TEST_TOKEN, | ||
| # backoff-factor=0 keeps the test fast; retries=3 covers three 503s + the eventual 200. | ||
| CONNECTION: {CONNECTION_RETRIES: 3, CONNECTION_BACKOFF_FACTOR: 0}, | ||
| }, | ||
| ) | ||
| assert catalog.list_namespaces() == [("foo",)] | ||
| assert state["namespace_calls"] == 4 | ||
| finally: | ||
| server.shutdown() | ||
| server.server_close() | ||
|
|
||
|
|
||
| def test_session_with_invalid_connection_timeout_raises(rest_mock: Mocker) -> None: | ||
| catalog_properties = { | ||
| "uri": TEST_URI, | ||
| "token": TEST_TOKEN, | ||
| CONNECTION: {CONNECTION_TIMEOUT: -1}, | ||
| } | ||
| with pytest.raises(ValueError, match="`connection.timeout` must be a positive number"): | ||
| RestCatalog("rest", **catalog_properties) # type: ignore | ||
|
|
||
|
|
||
| def test_session_with_invalid_connection_retries_raises(rest_mock: Mocker) -> None: | ||
| catalog_properties = { | ||
| "uri": TEST_URI, | ||
| "token": TEST_TOKEN, | ||
| CONNECTION: {CONNECTION_RETRIES: -1}, | ||
| } | ||
| with pytest.raises(ValueError, match="`connection.retries` must be non-negative"): | ||
| RestCatalog("rest", **catalog_properties) # type: ignore | ||
|
|
||
|
|
||
| def test_rest_catalog_with_basic_auth_type(rest_mock: Mocker) -> None: | ||
| # Given | ||
| rest_mock.get( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd get rid of this sentence.
If you override
connection.timeout,connections.retriesshould still take its default value. This sentence makes it sound like you have to specify the default values on the other places or else you'll have undefined behavior.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropped that sentence in 47a5382.