Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 115 additions & 16 deletions trapdata/antenna/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Antenna API client for fetching jobs and posting results."""

import json
import socket

import requests
Expand All @@ -14,6 +15,12 @@
from trapdata.api.utils import get_http_session
from trapdata.common.logs import logger

# Default maximum size (bytes) of a single result POST body. Used when a caller
# does not pass an explicit limit (e.g. settings.antenna_result_post_max_bytes).
# Chosen to stay well under the reverse-proxy request-body limit (commonly 100 MB)
# while still allowing several wide-taxonomy detections per request.
DEFAULT_RESULT_POST_MAX_BYTES = 25 * 1024 * 1024


def get_full_service_name(service_name: str) -> str:
"""Build full service name with hostname.
Expand Down Expand Up @@ -71,41 +78,133 @@ def get_jobs(
return []


def _result_json_size(result_json: dict) -> int:
"""Approximate the serialized byte size of one result entry.

Uses a compact JSON encoding (no extra whitespace) so the estimate tracks
what ``requests`` actually sends. The few bytes of array/comma framing that
join entries in the final payload are ignored; they are negligible next to
the per-result content for wide-taxonomy classifiers.
"""
return len(json.dumps(result_json, separators=(",", ":")).encode("utf-8"))


def chunk_results_by_size(
results_json: list[dict],
max_bytes: int,
) -> list[list[dict]]:
"""Greedily pack already-serialized result dicts into byte-bounded chunks.

Each returned chunk, once wrapped in the ``{"results": [...]}`` envelope, is
intended to stay at or below ``max_bytes``. A single result that exceeds
``max_bytes`` on its own is emitted as its own chunk (it cannot be split
further without changing the per-image API contract), and a warning is
logged so the oversize case is visible.

Args:
results_json: Result entries already converted to JSON-compatible dicts.
max_bytes: Target maximum size in bytes for one POST body.

Returns:
A list of chunks, where each chunk is a list of result dicts. Returns an
empty list when given no results.
"""
if not results_json:
return []
if max_bytes <= 0:
# Defensive: a non-positive cap would otherwise loop forever. Fall back
# to posting everything in a single chunk.
return [list(results_json)]

# Account for the constant envelope overhead of ``{"results":[]}``.
envelope_overhead = len(b'{"results":[]}')
budget = max(1, max_bytes - envelope_overhead)

chunks: list[list[dict]] = []
current: list[dict] = []
current_size = 0

for result_json in results_json:
size = _result_json_size(result_json)
if size > budget:
logger.warning(
f"Single result entry is {size} bytes, larger than the "
f"{max_bytes}-byte POST limit; sending it in its own request. "
"It may still be rejected by the server or proxy."
)
# +1 accounts for the comma joining this entry to the previous one.
added_size = size + (1 if current else 0)
if current and current_size + added_size > budget:
chunks.append(current)
current = []
current_size = 0
added_size = size
current.append(result_json)
current_size += added_size

if current:
chunks.append(current)
return chunks


def post_batch_results(
base_url: str,
auth_token: str,
job_id: int,
results: list[AntennaTaskResult],
max_bytes: int = DEFAULT_RESULT_POST_MAX_BYTES,
) -> bool:
"""
Post batch results back to the API.
Post batch results back to the API, splitting large batches across requests.

The results for one processed batch are serialized once and packed into one
or more POST bodies that each stay at or below ``max_bytes``. This prevents a
single dense, wide-taxonomy batch from producing a request body large enough
to be rejected by the reverse proxy (HTTP 413).

Args:
base_url: Antenna API base URL (e.g., "http://localhost:8000/api/v2")
auth_token: API authentication token
job_id: Job ID
results: List of AntennaTaskResult objects
max_bytes: Maximum size in bytes of a single POST body.

Returns:
True if successful, False otherwise
True only if every chunk was posted successfully, False otherwise.
"""
if not results:
return True

url = f"{base_url.rstrip('/')}/jobs/{job_id}/result/"
payload = AntennaTaskResults(results=results)

# Serialize each result once, then group into byte-bounded chunks so we never
# pay the serialization cost twice.
results_json = [
AntennaTaskResults(results=[r]).model_dump(mode="json")["results"][0]
for r in results
]
chunks = chunk_results_by_size(results_json, max_bytes)

all_ok = True
with get_http_session(auth_token) as session:
try:
response = session.post(
url, json=payload.model_dump(mode="json"), timeout=60
)
response.raise_for_status()
result = AntennaResultPostResponse.model_validate(response.json())
logger.debug(
f"Posted {len(results)} results to job {job_id}: {result.results_queued} queued"
)
return True
except requests.RequestException as e:
logger.error(f"Failed to post results to {url}: {e}")
return False
for chunk_idx, chunk in enumerate(chunks):
payload = {"results": chunk}
try:
response = session.post(url, json=payload, timeout=60)
response.raise_for_status()
result = AntennaResultPostResponse.model_validate(response.json())
logger.debug(
f"Posted chunk {chunk_idx + 1}/{len(chunks)} "
f"({len(chunk)} results) to job {job_id}: "
f"{result.results_queued} queued"
)
except requests.RequestException as e:
logger.error(
f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} "
f"to {url}: {e}"
)
all_ok = False
return all_ok
Comment on lines +190 to +207

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Catch broader exception types to handle response validation errors.

The current except clause only catches requests.RequestException (line 201), which won't catch Pydantic validation errors from AntennaResultPostResponse.model_validate() (line 195) or JSON decode errors from response.json() (line 195). If the server returns malformed JSON or a valid JSON response that doesn't match the expected schema, these exceptions would propagate uncaught, preventing remaining chunks from being posted and causing the entire batch to fail.

🛡️ Proposed fix to catch all exceptions
             except requests.RequestException as e:
+            except Exception as e:
                 logger.error(
                     f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} "
                     f"to {url}: {e}"
                 )
                 all_ok = False
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for chunk_idx, chunk in enumerate(chunks):
payload = {"results": chunk}
try:
response = session.post(url, json=payload, timeout=60)
response.raise_for_status()
result = AntennaResultPostResponse.model_validate(response.json())
logger.debug(
f"Posted chunk {chunk_idx + 1}/{len(chunks)} "
f"({len(chunk)} results) to job {job_id}: "
f"{result.results_queued} queued"
)
except requests.RequestException as e:
logger.error(
f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} "
f"to {url}: {e}"
)
all_ok = False
return all_ok
for chunk_idx, chunk in enumerate(chunks):
payload = {"results": chunk}
try:
response = session.post(url, json=payload, timeout=60)
response.raise_for_status()
result = AntennaResultPostResponse.model_validate(response.json())
logger.debug(
f"Posted chunk {chunk_idx + 1}/{len(chunks)} "
f"({len(chunk)} results) to job {job_id}: "
f"{result.results_queued} queued"
)
except Exception as e:
logger.error(
f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} "
f"to {url}: {e}"
)
all_ok = False
return all_ok
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@trapdata/antenna/client.py` around lines 190 - 207, The loop that posts
chunks uses response.json() and AntennaResultPostResponse.model_validate(), but
the except currently only catches requests.RequestException so JSON decoding or
Pydantic validation errors will leak and abort processing; update the exception
handling around the session.post/response parsing/model_validate block (the code
that calls session.post, response.json(), and
AntennaResultPostResponse.model_validate) to also catch json.JSONDecodeError and
pydantic.ValidationError (or ValueError if pydantic isn't imported) in the same
except (or use a broad Exception as a last resort), log the error via
logger.error including chunk_idx/job_id/url, set all_ok = False, and continue to
the next chunk so remaining chunks are still posted.



def get_user_projects(base_url: str, auth_token: str) -> list[dict]:
Expand Down
12 changes: 10 additions & 2 deletions trapdata/antenna/result_posting.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
from dataclasses import dataclass

from trapdata.antenna.client import post_batch_results
from trapdata.antenna.client import DEFAULT_RESULT_POST_MAX_BYTES, post_batch_results
from trapdata.common.logs import logger


Expand Down Expand Up @@ -69,9 +69,11 @@ def __init__(
self,
max_pending: int = 5,
future_timeout: float = 30.0,
max_post_bytes: int = DEFAULT_RESULT_POST_MAX_BYTES,
):
self.max_pending = max_pending
self.future_timeout = future_timeout # Timeout for individual future waits
self.max_post_bytes = max_post_bytes # Per-POST body size cap (bytes)
self.executor = ThreadPoolExecutor(
max_workers=2, thread_name_prefix="result_poster"
)
Expand Down Expand Up @@ -167,7 +169,13 @@ def _post_with_timing(
True if successful, False otherwise
"""
try:
success = post_batch_results(base_url, auth_token, job_id, results)
success = post_batch_results(
base_url,
auth_token,
job_id,
results,
max_bytes=self.max_post_bytes,
)
elapsed_time = time.time() - start_time

with self._metrics_lock:
Expand Down
Loading
Loading