diff --git a/trapdata/antenna/client.py b/trapdata/antenna/client.py index 5e8cde6..3c1b3e0 100644 --- a/trapdata/antenna/client.py +++ b/trapdata/antenna/client.py @@ -1,5 +1,6 @@ """Antenna API client for fetching jobs and posting results.""" +import json import socket import requests @@ -14,6 +15,12 @@ from trapdata.api.utils import get_http_session from trapdata.common.logs import logger +# Default maximum size (bytes) of a single result POST body. Used when a caller +# does not pass an explicit limit (e.g. settings.antenna_result_post_max_bytes). +# Chosen to stay well under the reverse-proxy request-body limit (commonly 100 MB) +# while still allowing several wide-taxonomy detections per request. +DEFAULT_RESULT_POST_MAX_BYTES = 25 * 1024 * 1024 + def get_full_service_name(service_name: str) -> str: """Build full service name with hostname. @@ -71,41 +78,133 @@ def get_jobs( return [] +def _result_json_size(result_json: dict) -> int: + """Approximate the serialized byte size of one result entry. + + Uses a compact JSON encoding (no extra whitespace) so the estimate tracks + what ``requests`` actually sends. The few bytes of array/comma framing that + join entries in the final payload are ignored; they are negligible next to + the per-result content for wide-taxonomy classifiers. + """ + return len(json.dumps(result_json, separators=(",", ":")).encode("utf-8")) + + +def chunk_results_by_size( + results_json: list[dict], + max_bytes: int, +) -> list[list[dict]]: + """Greedily pack already-serialized result dicts into byte-bounded chunks. + + Each returned chunk, once wrapped in the ``{"results": [...]}`` envelope, is + intended to stay at or below ``max_bytes``. A single result that exceeds + ``max_bytes`` on its own is emitted as its own chunk (it cannot be split + further without changing the per-image API contract), and a warning is + logged so the oversize case is visible. + + Args: + results_json: Result entries already converted to JSON-compatible dicts. + max_bytes: Target maximum size in bytes for one POST body. + + Returns: + A list of chunks, where each chunk is a list of result dicts. Returns an + empty list when given no results. + """ + if not results_json: + return [] + if max_bytes <= 0: + # Defensive: a non-positive cap would otherwise loop forever. Fall back + # to posting everything in a single chunk. + return [list(results_json)] + + # Account for the constant envelope overhead of ``{"results":[]}``. + envelope_overhead = len(b'{"results":[]}') + budget = max(1, max_bytes - envelope_overhead) + + chunks: list[list[dict]] = [] + current: list[dict] = [] + current_size = 0 + + for result_json in results_json: + size = _result_json_size(result_json) + if size > budget: + logger.warning( + f"Single result entry is {size} bytes, larger than the " + f"{max_bytes}-byte POST limit; sending it in its own request. " + "It may still be rejected by the server or proxy." + ) + # +1 accounts for the comma joining this entry to the previous one. + added_size = size + (1 if current else 0) + if current and current_size + added_size > budget: + chunks.append(current) + current = [] + current_size = 0 + added_size = size + current.append(result_json) + current_size += added_size + + if current: + chunks.append(current) + return chunks + + def post_batch_results( base_url: str, auth_token: str, job_id: int, results: list[AntennaTaskResult], + max_bytes: int = DEFAULT_RESULT_POST_MAX_BYTES, ) -> bool: """ - Post batch results back to the API. + Post batch results back to the API, splitting large batches across requests. + + The results for one processed batch are serialized once and packed into one + or more POST bodies that each stay at or below ``max_bytes``. This prevents a + single dense, wide-taxonomy batch from producing a request body large enough + to be rejected by the reverse proxy (HTTP 413). Args: base_url: Antenna API base URL (e.g., "http://localhost:8000/api/v2") auth_token: API authentication token job_id: Job ID results: List of AntennaTaskResult objects + max_bytes: Maximum size in bytes of a single POST body. Returns: - True if successful, False otherwise + True only if every chunk was posted successfully, False otherwise. """ + if not results: + return True + url = f"{base_url.rstrip('/')}/jobs/{job_id}/result/" - payload = AntennaTaskResults(results=results) + # Serialize each result once, then group into byte-bounded chunks so we never + # pay the serialization cost twice. + results_json = [ + AntennaTaskResults(results=[r]).model_dump(mode="json")["results"][0] + for r in results + ] + chunks = chunk_results_by_size(results_json, max_bytes) + + all_ok = True with get_http_session(auth_token) as session: - try: - response = session.post( - url, json=payload.model_dump(mode="json"), timeout=60 - ) - response.raise_for_status() - result = AntennaResultPostResponse.model_validate(response.json()) - logger.debug( - f"Posted {len(results)} results to job {job_id}: {result.results_queued} queued" - ) - return True - except requests.RequestException as e: - logger.error(f"Failed to post results to {url}: {e}") - return False + for chunk_idx, chunk in enumerate(chunks): + payload = {"results": chunk} + try: + response = session.post(url, json=payload, timeout=60) + response.raise_for_status() + result = AntennaResultPostResponse.model_validate(response.json()) + logger.debug( + f"Posted chunk {chunk_idx + 1}/{len(chunks)} " + f"({len(chunk)} results) to job {job_id}: " + f"{result.results_queued} queued" + ) + except requests.RequestException as e: + logger.error( + f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} " + f"to {url}: {e}" + ) + all_ok = False + return all_ok def get_user_projects(base_url: str, auth_token: str) -> list[dict]: diff --git a/trapdata/antenna/result_posting.py b/trapdata/antenna/result_posting.py index cd76737..c287fa3 100644 --- a/trapdata/antenna/result_posting.py +++ b/trapdata/antenna/result_posting.py @@ -24,7 +24,7 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from dataclasses import dataclass -from trapdata.antenna.client import post_batch_results +from trapdata.antenna.client import DEFAULT_RESULT_POST_MAX_BYTES, post_batch_results from trapdata.common.logs import logger @@ -69,9 +69,11 @@ def __init__( self, max_pending: int = 5, future_timeout: float = 30.0, + max_post_bytes: int = DEFAULT_RESULT_POST_MAX_BYTES, ): self.max_pending = max_pending self.future_timeout = future_timeout # Timeout for individual future waits + self.max_post_bytes = max_post_bytes # Per-POST body size cap (bytes) self.executor = ThreadPoolExecutor( max_workers=2, thread_name_prefix="result_poster" ) @@ -167,7 +169,13 @@ def _post_with_timing( True if successful, False otherwise """ try: - success = post_batch_results(base_url, auth_token, job_id, results) + success = post_batch_results( + base_url, + auth_token, + job_id, + results, + max_bytes=self.max_post_bytes, + ) elapsed_time = time.time() - start_time with self._metrics_lock: diff --git a/trapdata/antenna/tests/test_result_chunking.py b/trapdata/antenna/tests/test_result_chunking.py new file mode 100644 index 0000000..a80b8a9 --- /dev/null +++ b/trapdata/antenna/tests/test_result_chunking.py @@ -0,0 +1,195 @@ +"""Tests for byte-bounded chunking of result POSTs to the Antenna API. + +These tests reproduce the production failure mode where a single result POST for +a wide-taxonomy pipeline (e.g. the global moths model, ~29k classes) grew to +100+ MB and was rejected by the reverse proxy with HTTP 413. The fix splits the +results for one processed batch across multiple POSTs so each request body stays +under a configurable size cap. +""" + +import datetime +import json +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from trapdata.antenna.client import chunk_results_by_size, post_batch_results +from trapdata.antenna.schemas import AntennaTaskResult, AntennaTaskResults +from trapdata.api.schemas import ( + AlgorithmReference, + BoundingBox, + ClassificationResponse, + DetectionResponse, + PipelineResultsResponse, + SourceImageResponse, +) + + +def _make_detection(image_id: str, num_classes: int) -> DetectionResponse: + """Build a DetectionResponse carrying full-width classification arrays. + + This mirrors what the global moths classifier emits: one classification with + labels/scores/logits arrays each of length ``num_classes``. + """ + labels = [f"Genus_species_{i:06d}" for i in range(num_classes)] + scores = [1.0 / num_classes] * num_classes + logits = [float(i) for i in range(num_classes)] + return DetectionResponse( + source_image_id=image_id, + bbox=BoundingBox(x1=0, y1=0, x2=100, y2=100), + algorithm=AlgorithmReference(name="Object Detector", key="detector"), + timestamp=datetime.datetime(2024, 1, 1, 0, 0, 0), + classifications=[ + ClassificationResponse( + classification=labels[0], + labels=labels, + scores=scores, + logits=logits, + algorithm=AlgorithmReference( + name="Global Species Classifier", key="global_moths_2024" + ), + timestamp=datetime.datetime(2024, 1, 1, 0, 0, 0), + ) + ], + ) + + +def _make_result( + image_id: str, num_classes: int, detections_per_image: int = 1 +) -> AntennaTaskResult: + """Build one AntennaTaskResult for a single image with N wide detections.""" + detections = [ + _make_detection(image_id, num_classes) for _ in range(detections_per_image) + ] + return AntennaTaskResult( + reply_subject=f"reply.{image_id}", + result=PipelineResultsResponse( + pipeline="global_moths_2024", + source_images=[ + SourceImageResponse(id=image_id, url=f"http://x/{image_id}") + ], + detections=detections, + total_time=1.0, + ), + ) + + +def _serialized_body_size(results: list[AntennaTaskResult]) -> int: + """Size in bytes of the JSON body actually sent for a list of results.""" + payload = AntennaTaskResults(results=results).model_dump(mode="json") + return len(json.dumps(payload).encode("utf-8")) + + +class TestChunkResultsBySize(TestCase): + """Unit tests for the greedy byte-bounded packer.""" + + def test_empty_results_yields_no_chunks(self): + self.assertEqual(chunk_results_by_size([], max_bytes=1000), []) + + def test_each_chunk_stays_under_cap(self): + # ~29k-class detections: each result is ~2 MB. A cap of 5 MB should pack + # at most ~2 results per chunk. + num_classes = 29_000 + results = [_make_result(f"img{i}", num_classes) for i in range(8)] + results_json = AntennaTaskResults(results=results).model_dump(mode="json")[ + "results" + ] + + max_bytes = 5 * 1024 * 1024 + chunks = chunk_results_by_size(results_json, max_bytes=max_bytes) + + self.assertGreater(len(chunks), 1, "expected the batch to be split") + for chunk in chunks: + body_size = len(json.dumps({"results": chunk}).encode("utf-8")) + self.assertLessEqual( + body_size, + max_bytes, + f"chunk body {body_size} exceeded cap {max_bytes}", + ) + + def test_no_results_dropped(self): + num_classes = 1_000 + results = [_make_result(f"img{i}", num_classes) for i in range(10)] + results_json = AntennaTaskResults(results=results).model_dump(mode="json")[ + "results" + ] + chunks = chunk_results_by_size(results_json, max_bytes=500_000) + total = sum(len(c) for c in chunks) + self.assertEqual(total, len(results)) + + def test_oversize_single_result_gets_own_chunk(self): + # One result larger than the cap cannot be split below one image; it must + # still be emitted (in its own chunk) rather than dropped. + num_classes = 29_000 + results = [_make_result("big", num_classes, detections_per_image=4)] + results_json = AntennaTaskResults(results=results).model_dump(mode="json")[ + "results" + ] + chunks = chunk_results_by_size(results_json, max_bytes=1 * 1024 * 1024) + self.assertEqual(len(chunks), 1) + self.assertEqual(len(chunks[0]), 1) + + +class TestPostBatchResultsChunking(TestCase): + """post_batch_results must split a large batch across multiple POST bodies.""" + + def test_large_batch_split_into_multiple_under_cap_posts(self): + num_classes = 29_000 + # 8 images, each a single wide detection (~2 MB) -> ~15 MB total. + results = [_make_result(f"img{i}", num_classes) for i in range(8)] + + max_bytes = 4 * 1024 * 1024 + posted_bodies: list[int] = [] + + fake_response = MagicMock() + fake_response.raise_for_status.return_value = None + fake_response.json.return_value = { + "status": "accepted", + "job_id": 1, + "results_queued": 0, + } + + def fake_post(url, json=None, timeout=None): + body_size = len(__import__("json").dumps(json).encode("utf-8")) + posted_bodies.append(body_size) + return fake_response + + fake_session = MagicMock() + fake_session.post.side_effect = fake_post + fake_session.__enter__.return_value = fake_session + fake_session.__exit__.return_value = False + + with patch( + "trapdata.antenna.client.get_http_session", return_value=fake_session + ): + ok = post_batch_results( + base_url="http://x/api/v2", + auth_token="t", + job_id=1, + results=results, + max_bytes=max_bytes, + ) + + self.assertTrue(ok) + self.assertGreater(len(posted_bodies), 1, "expected multiple POSTs") + for size in posted_bodies: + self.assertLessEqual(size, max_bytes, f"POST body {size} exceeded cap") + + def test_unsplit_baseline_would_exceed_cap(self): + # Sanity check that the un-chunked body really is over the cap, so the + # test above is exercising a real split, not a no-op. + num_classes = 29_000 + results = [_make_result(f"img{i}", num_classes) for i in range(8)] + body_size = _serialized_body_size(results) + self.assertGreater(body_size, 4 * 1024 * 1024) + + def test_empty_results_is_noop_success(self): + with patch("trapdata.antenna.client.get_http_session") as get_session: + ok = post_batch_results( + base_url="http://x/api/v2", + auth_token="t", + job_id=1, + results=[], + max_bytes=1000, + ) + self.assertTrue(ok) + get_session.assert_not_called() diff --git a/trapdata/antenna/worker.py b/trapdata/antenna/worker.py index 2b7e1db..832deb9 100644 --- a/trapdata/antenna/worker.py +++ b/trapdata/antenna/worker.py @@ -465,7 +465,10 @@ def _process_job( if not classifier: classifier = classifier_class(source_images=[], detections=[]) detector = APIMothDetector([]) - result_poster = ResultPoster(max_pending=MAX_PENDING_POSTS) + result_poster = ResultPoster( + max_pending=MAX_PENDING_POSTS, + max_post_bytes=settings.antenna_result_post_max_bytes, + ) if use_binary_filter: binary_filter = MothClassifierBinary( diff --git a/trapdata/settings.py b/trapdata/settings.py index b07e043..b08d70b 100644 --- a/trapdata/settings.py +++ b/trapdata/settings.py @@ -42,6 +42,14 @@ class Settings(BaseSettings): antenna_api_auth_token: str = "" antenna_service_name: str = "AMI Data Companion" antenna_api_batch_size: int = 24 + # Maximum size (in bytes) of a single result POST body to the Antenna API. + # The results for one processed batch are split across multiple POSTs so that + # no single request exceeds this limit. Wide-taxonomy classifiers (e.g. the + # global moths model with ~29k classes) emit ~2 MB per detection because each + # classification carries full-length labels/scores/logits arrays, so a dense + # batch can otherwise produce a 100+ MB body that reverse proxies reject (413). + # Default 25 MB leaves headroom under common proxy limits (typically 100 MB). + antenna_result_post_max_bytes: int = 25 * 1024 * 1024 @pydantic.field_validator("image_base_path", "user_data_path") def validate_path(cls, v): @@ -170,6 +178,15 @@ class Config: "kivy_type": "numeric", "kivy_section": "antenna", }, + "antenna_result_post_max_bytes": { + "title": "Antenna Result POST Max Bytes", + "description": ( + "Maximum size in bytes of a single result POST body; results " + "for a batch are split across multiple POSTs to stay under it" + ), + "kivy_type": "numeric", + "kivy_section": "antenna", + }, "antenna_service_name": { "title": "Antenna Service Name", "description": "Name for the processing service registration (hostname will be added automatically)",