Split oversized result uploads so wide-taxonomy batches don't get rejected#149
Split oversized result uploads so wide-taxonomy batches don't get rejected#149mihow wants to merge 1 commit into
Conversation
…y limits Wide-taxonomy pipelines (e.g. the global moths model with ~29k classes) emit roughly 2 MB per detection because each classification carries full-length labels, scores, and logits arrays. A single processed batch of two dozen images with several detections each therefore serialized to 110-140 MB, which reverse proxies rejected with HTTP 413 even after the body limit was raised to 512 MB. The results for one batch were already scoped to the current batch only (no accumulation across batches), so the size came purely from payload width times detection count. This change splits the results for a batch across multiple POST requests, each kept at or below a configurable byte cap, so no single request body exceeds the proxy limit. - Add chunk_results_by_size() and make post_batch_results() serialize each result once, greedily pack them into byte-bounded chunks, and POST each chunk; it now returns True only if every chunk succeeds. A single result that exceeds the cap on its own is sent alone (and logged) rather than dropped. - Add AMI_ANTENNA_RESULT_POST_MAX_BYTES setting (default 25 MB) and thread it through ResultPoster to post_batch_results. - Add tests asserting each POST body stays under the cap, no results are dropped, and the unsplit baseline would have exceeded the cap. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR implements byte-bounded chunking of Antenna API result POST bodies to prevent HTTP 413 (payload too large) errors when result batches exceed reverse-proxy limits. Results are now split into sequential POSTs, each constrained by a configurable per-POST byte cap (default 25 MiB). ChangesAntenna API Result Chunking
Sequence DiagramsequenceDiagram
participant Worker
participant ResultPoster
participant post_batch_results
participant chunk_results_by_size
participant HTTP
Worker->>ResultPoster: __init__(max_post_bytes=settings.antenna_result_post_max_bytes)
Note over ResultPoster: stores self.max_post_bytes
Worker->>ResultPoster: post_batch(results)
ResultPoster->>post_batch_results: post_batch_results(..., max_bytes=self.max_post_bytes)
post_batch_results->>post_batch_results: serialize to JSON dicts
post_batch_results->>chunk_results_by_size: JSON dicts, max_bytes
chunk_results_by_size-->>post_batch_results: list of chunks
loop for each chunk
post_batch_results->>HTTP: POST {results: chunk}
HTTP-->>post_batch_results: 200/validation error
post_batch_results->>post_batch_results: log per-chunk status
end
post_batch_results-->>ResultPoster: all_ok flag
ResultPoster-->>Worker: success status
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
trapdata/antenna/result_posting.py (1)
68-76: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winUpdate class docstring to document the new
max_post_bytesparameter.The
__init__method now accepts amax_post_bytesparameter, but the class docstring (lines 52-66) doesn't document it in the Args section. This makes the parameter undiscoverable for users reading the API documentation.📝 Proposed documentation update
Add to the docstring's Args section (after line 59):
Args: max_pending: Maximum number of concurrent posts before blocking (default: 5) + max_post_bytes: Maximum size in bytes of a single POST body (default: 25 MiB)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@trapdata/antenna/result_posting.py` around lines 68 - 76, Update the class docstring (the Args section in the class above __init__) to document the new __init__ parameter max_post_bytes: explain it's the per-POST body size cap in bytes, include the default value DEFAULT_RESULT_POST_MAX_BYTES, and place this entry alongside max_pending and future_timeout so users can discover the parameter when reading the API docs; reference the parameter name max_post_bytes and the constructor method __init__ when adding the brief description.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@trapdata/antenna/client.py`:
- Around line 190-207: The loop that posts chunks uses response.json() and
AntennaResultPostResponse.model_validate(), but the except currently only
catches requests.RequestException so JSON decoding or Pydantic validation errors
will leak and abort processing; update the exception handling around the
session.post/response parsing/model_validate block (the code that calls
session.post, response.json(), and AntennaResultPostResponse.model_validate) to
also catch json.JSONDecodeError and pydantic.ValidationError (or ValueError if
pydantic isn't imported) in the same except (or use a broad Exception as a last
resort), log the error via logger.error including chunk_idx/job_id/url, set
all_ok = False, and continue to the next chunk so remaining chunks are still
posted.
---
Outside diff comments:
In `@trapdata/antenna/result_posting.py`:
- Around line 68-76: Update the class docstring (the Args section in the class
above __init__) to document the new __init__ parameter max_post_bytes: explain
it's the per-POST body size cap in bytes, include the default value
DEFAULT_RESULT_POST_MAX_BYTES, and place this entry alongside max_pending and
future_timeout so users can discover the parameter when reading the API docs;
reference the parameter name max_post_bytes and the constructor method __init__
when adding the brief description.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1c2b7005-4ad0-4b52-aee5-febb971e4e3e
📒 Files selected for processing (5)
trapdata/antenna/client.pytrapdata/antenna/result_posting.pytrapdata/antenna/tests/test_result_chunking.pytrapdata/antenna/worker.pytrapdata/settings.py
| for chunk_idx, chunk in enumerate(chunks): | ||
| payload = {"results": chunk} | ||
| try: | ||
| response = session.post(url, json=payload, timeout=60) | ||
| response.raise_for_status() | ||
| result = AntennaResultPostResponse.model_validate(response.json()) | ||
| logger.debug( | ||
| f"Posted chunk {chunk_idx + 1}/{len(chunks)} " | ||
| f"({len(chunk)} results) to job {job_id}: " | ||
| f"{result.results_queued} queued" | ||
| ) | ||
| except requests.RequestException as e: | ||
| logger.error( | ||
| f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} " | ||
| f"to {url}: {e}" | ||
| ) | ||
| all_ok = False | ||
| return all_ok |
There was a problem hiding this comment.
Catch broader exception types to handle response validation errors.
The current except clause only catches requests.RequestException (line 201), which won't catch Pydantic validation errors from AntennaResultPostResponse.model_validate() (line 195) or JSON decode errors from response.json() (line 195). If the server returns malformed JSON or a valid JSON response that doesn't match the expected schema, these exceptions would propagate uncaught, preventing remaining chunks from being posted and causing the entire batch to fail.
🛡️ Proposed fix to catch all exceptions
except requests.RequestException as e:
+ except Exception as e:
logger.error(
f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} "
f"to {url}: {e}"
)
all_ok = False📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for chunk_idx, chunk in enumerate(chunks): | |
| payload = {"results": chunk} | |
| try: | |
| response = session.post(url, json=payload, timeout=60) | |
| response.raise_for_status() | |
| result = AntennaResultPostResponse.model_validate(response.json()) | |
| logger.debug( | |
| f"Posted chunk {chunk_idx + 1}/{len(chunks)} " | |
| f"({len(chunk)} results) to job {job_id}: " | |
| f"{result.results_queued} queued" | |
| ) | |
| except requests.RequestException as e: | |
| logger.error( | |
| f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} " | |
| f"to {url}: {e}" | |
| ) | |
| all_ok = False | |
| return all_ok | |
| for chunk_idx, chunk in enumerate(chunks): | |
| payload = {"results": chunk} | |
| try: | |
| response = session.post(url, json=payload, timeout=60) | |
| response.raise_for_status() | |
| result = AntennaResultPostResponse.model_validate(response.json()) | |
| logger.debug( | |
| f"Posted chunk {chunk_idx + 1}/{len(chunks)} " | |
| f"({len(chunk)} results) to job {job_id}: " | |
| f"{result.results_queued} queued" | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| f"Failed to post result chunk {chunk_idx + 1}/{len(chunks)} " | |
| f"to {url}: {e}" | |
| ) | |
| all_ok = False | |
| return all_ok |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@trapdata/antenna/client.py` around lines 190 - 207, The loop that posts
chunks uses response.json() and AntennaResultPostResponse.model_validate(), but
the except currently only catches requests.RequestException so JSON decoding or
Pydantic validation errors will leak and abort processing; update the exception
handling around the session.post/response parsing/model_validate block (the code
that calls session.post, response.json(), and
AntennaResultPostResponse.model_validate) to also catch json.JSONDecodeError and
pydantic.ValidationError (or ValueError if pydantic isn't imported) in the same
except (or use a broad Exception as a last resort), log the error via
logger.error including chunk_idx/job_id/url, set all_ok = False, and continue to
the next chunk so remaining chunks are still posted.
Summary
For models with very large taxonomies — for example the ~29,000-class
global_moths_2024classifier — a single batch's result upload can reach 110–142 MB, because every detection carries full per-classscoresandlogitsarrays. In a production deployment those uploads were rejected by the reverse proxy with HTTP 413 (Request Entity Too Large), so the worker could never record its results: the upload failed, the NATS task was never acknowledged, the task eventually exhausted its redelivery budget, and the job failed with nothing stored. Smaller-taxonomy jobs were unaffected, which is why this stayed invisible until a wide-taxonomy job ran on a dense capture set.This change makes the worker split a batch's results across multiple smaller uploads, each kept under a configurable byte cap, so wide-taxonomy jobs complete regardless of the proxy's request-body limit. It is a client-side fix that needs no server change.
List of Changes
chunk_results_by_size()greedy byte-bounded packer inclient.py;post_batch_results()serializes each result once, packs results into chunks undermax_bytes, posts each chunk, and reports success only if every chunk lands. A single result that exceeds the cap on its own is sent alone and logged.antenna_result_post_max_bytes→ env varAMI_ANTENNA_RESULT_POST_MAX_BYTES(default 25 MB), threaded throughResultPosterinto the worker.trapdata/antenna/tests/test_result_chunking.py— 7 tests: packing under cap, splitting a large batch across multiple uploads, over-cap single result, and all-chunks-must-succeed semantics.Diagnosis — why one batch reaches ~140 MB
ClassificationResponsecarriesscoresandlogits, each an array of length = number of model classes. At ~29k classes that is ~1.1 MB per detection for those two arrays alone.worker.py), and the classifier/detector are reset per batch, so there is no cross-batch accumulation.Notes and follow-ups (out of scope for this PR)
Directions to discuss, ordered by leverage. Important context: result payloads are expected to grow, not shrink.
logitsare needed and will be kept, and per-crop vector embeddings are planned for each detection. That makes compressing the upload more valuable over time, and argues against tightening the proxy's body-size limit.Content-Encoding: gziphandling). The big arrays —scores,logits, and soon per-crop embeddings — are numeric and compress roughly 5–10×, so gzipping uploads could cut every request at the source. It requires the API server to accept and decompress gzipped request bodies first, and the reverse proxy to pass them through. With embeddings incoming this is worth prioritizing as a cross-repo follow-up.logitsstay and embeddings are coming, neither the per-upload cap nor the proxy body-size limit should be tightened. Chunking keeps individual uploads bounded regardless of how large a full batch's results become.labelsis already omittable for large models. The per-classificationlabelsarray islist[str] | Noneand documented as "omitted if the model has too many labels" on both the worker and server schemas, so for a 29k-class model it is most likely already dropped — a minor saving already realized. The bulk isscores+logits(and soon embeddings).Not yet verified
Test status
uv run python -m pytest trapdata/antenna/tests/test_result_chunking.py -q→ 7 passed. TDD-verified: disabling chunking made the split test fail, restoring it passed.black,isort, andflake8clean on changed files.Summary by CodeRabbit
New Features
Bug Fixes
Tests