feat(local): forward Lambda response streaming through start-api#9028
feat(local): forward Lambda response streaming through start-api#9028alexis779 wants to merge 1 commit into
Conversation
fcfeb87 to
d413d12
Compare
aws-sam-cli-bot
left a comment
There was a problem hiding this comment.
Code Review Results
Reviewed: 9ffa5ed..d413d12
Files: 39
Comments: 5
Comments on lines outside the diff:
[samcli/init.py:5] [BUG] The version is being downgraded from 1.160.1 to 1.160.0. A feature PR should bump the version forward, not regress it. Combined with the other reverts in this PR (see comment #2), this looks like the branch was based on a stale main and has accidentally reverted everything that landed since. Please rebase onto current main and keep the version bump pointing forward.
[pyproject.toml:38] [BUG] ,72,74,75,77,82 (and 26 sibling files)
This PR's stated scope is "forward Lambda response streaming through start-api", but the diff also reverts a large amount of unrelated work from main:
- Dependency downgrades:
boto3[crt]1.43.7 → 1.43.6,mypy2.1.0 → 1.19.1,types-PyYAML,types-chevron,types-setuptools,types-jsonschemaall rolled back to older pins. - Removal of
permissions: contents: readfrom.github/workflows/validate_pyinstaller.yml— this is a security-hardening regression (loss of GITHUB_TOKEN least-privilege). - Re-introduction of
cast(...)workarounds in samcli/lib/sync/sync_flow.py, samcli/lib/utils/resource_trigger.py, samcli/local/apigw/authorizers/lambda_authorizer.py, and removal of typing annotations in samcli/lib/telemetry/metric.py (_METRICS: Dict[str, "Metric"]→ untypeddict()) — consistent with the older mypy version being re-pinned. - Removal of
_resolve_property_paths,_get_prop_value,_set_prop_value,_leaf_prop_namehelpers in samcli/lib/package/language_extensions_packaging.py and the dotted-property-path handling in samcli/commands/build/build_context.py (e.g.Code.ImageUri,Command.ScriptLocation) — this drops a feature. - Removal of
ResolutionMode.PARTIALhandling for unresolvedFn::Selectindices in samcli/lib/cfn_language_extensions/resolvers/fn_select.py — this drops a feature, and its unit test is also being deleted. - Mass deletion of unit tests (≈1100 lines across tests/unit/lib/cfn_language_extensions/*, tests/unit/commands/buildcmd/test_build_context.py, tests/unit/commands/package/test_package_context.py, tests/unit/commands/_utils/test_template.py).
None of this is related to response streaming. Please rebase onto current main so the diff only contains the streaming changes.
| self._concurrency_semaphore.release() | ||
| raise | ||
|
|
||
| def release_streaming_slot(self): |
There was a problem hiding this comment.
[CONCURRENCY] -620
def release_streaming_slot(self):
"""
Release the concurrency semaphore acquired by
:meth:wait_for_streaming_response. Safe to call multiple times.
"""
if self._concurrency_semaphore:
try:
self._concurrency_semaphore.release()
except ValueError:
# already released
passThe "safe to call multiple times" guarantee does not hold. self._concurrency_semaphore is constructed at line 483 as threading.Semaphore(self._max_concurrency), not BoundedSemaphore. Plain Semaphore.release() never raises ValueError on over-release — it just increments the internal counter past _max_concurrency. So a double release_streaming_slot() call silently inflates the available-permit count and lets more concurrent invocations through than the configured AWS_LAMBDA_MAX_CONCURRENCY.
In practice the runtime-side cleanup in LambdaRuntime.invoke_streaming is gated by a threading.Event, so the common path is fine — but the docstring contract is wrong, and any future caller that takes it at face value will silently break the concurrency cap. Either:
- Switch the semaphore to
threading.BoundedSemaphoreso over-release actually raises, or - Track release state with a flag (similar to
completion_calledin streaming_response.py) and make idempotence real.
There was a problem hiding this comment.
Fixed in d82da42. release_streaming_slot() is now genuinely idempotent: wait_for_streaming_response() installs _streaming_slot_released = False and a _streaming_slot_lock; the release path takes the lock and only releases the underlying threading.Semaphore once, so a double-call cannot inflate the permit count past AWS_LAMBDA_MAX_CONCURRENCY. If release_streaming_slot() is called without a prior acquire it is a no-op.
| # either a streaming response (which we forward chunk by chunk | ||
| # to the browser) or a buffered body that is then parsed by | ||
| # the existing API Gateway payload format helpers. | ||
| invoke_kind, invoke_payload, invoke_cleanup = self._invoke_lambda_function_streaming( |
There was a problem hiding this comment.
[BUG] -804
invoke_kind, invoke_payload, invoke_cleanup = self._invoke_lambda_function_streaming(
route.function_name, route_lambda_event, tenant_id
)If _invoke_lambda_function_streaming raises any exception that isn't caught by one of the listed except clauses (the existing except Exception handler that the buffered path used to rely on at line 712 only applies to _invoke_lambda_function, not this new method's try block), invoke_kind / invoke_payload / invoke_cleanup remain unbound. The downstream if invoke_kind == "streaming" branch still executes safely because invoke_kind is initialized to None at line 791, but unexpected exceptions (e.g. requests.ConnectionError from the streaming POST, or any error from the new json.dumps(json.loads(...)) normalization that doesn't fall under ValueError/JSONDecodeError) will now propagate out as a 500 instead of being mapped to the user-facing lambda_failure_response the buffered path produced. Consider either keeping a catch-all except Exception (mirroring the original handler that this try replaced) or explicitly listing requests.RequestException.
There was a problem hiding this comment.
Fixed in d82da42, plus a larger structural change: the streaming pipeline is now gated on FunctionUrlConfig.InvokeMode == RESPONSE_STREAM, mirroring the production opt-in contract. Routes that don't declare it keep going through the original buffered _invoke_lambda_function path 100% unchanged (this also restores all existing unit tests that mocked lambda_runner.invoke). Streaming-enabled routes have their own try/except block with a final catch-all except Exception that logs the failure and returns lambda_failure_response() — same shape the buffered path produced.
| # Buffered response: drain the body now so we can release the | ||
| # container as soon as possible, then route through the existing | ||
| # JSON parsing pipeline. | ||
| try: |
There was a problem hiding this comment.
[RESOURCE_MANAGEMENT] -668
try:
body_bytes = response.content
finally:
cleanup()response.content on a stream=True response materializes the entire body in memory in one shot. For a buffered Lambda response that's the same memory profile as today, but on an unexpectedly-large body (e.g. a function that returns megabytes despite not being streaming-tagged) this now reads it all before the slot is released, blocking the container's concurrency slot for longer than necessary and pushing the Python process memory up. Consider whether you want a size cap here, or at least an explicit timeout — the streaming POST uses timeout=(connect, None), so a hung non-streaming runtime that never closes the body would hold the slot indefinitely.
There was a problem hiding this comment.
Fixed in d82da42. The buffered drain in _invoke_lambda_function_streaming now uses response.iter_content(chunk_size=64KiB) against a hard _BUFFERED_RESPONSE_BYTE_CAP = 8 MiB (slightly above Lambda's 6 MB sync response limit). If a misconfigured runtime exceeds the cap we abort the read, log, and raise LambdaResponseParseException so the container slot is released and a user-facing failure response is returned. This path is also now only reached for routes that opted into RESPONSE_STREAM and accidentally returned a buffered body, so the blast radius is small.
d413d12 to
d3f3973
Compare
aws-sam-cli-bot
left a comment
There was a problem hiding this comment.
Code Review Results
Reviewed: 9ffa5ed..d3f3973
Files: 39 (focused on streaming-related code + suspicious unrelated changes)
Comments: 6
Comments on lines outside the diff:
[samcli/init.py:5] [BUG] The version is being downgraded from 1.160.1 to 1.160.0. Combined with the rest of this diff — boto3[crt] 1.43.7 → 1.43.6, mypy 2.1.0 → 1.19.1, pyinstaller 6.20.0 → 6.19.0, types-PyYAML/types-chevron/types-setuptools/types-jsonschema all reverted to older dated stubs, ~1300 lines deleted under samcli/lib/cfn_language_extensions/** and samcli/lib/package/language_extensions_packaging.py, ~600 lines of corresponding tests removed, and ~120 lines deleted from docs/cfn-language-extensions.md — this branch is clearly based on a stale main and has accidentally reverted everything that landed since.
This was raised in the previous review and has not been addressed. Please rebase onto current main; do not merge until the diff contains only the streaming-related changes (plus the RIE binaries).
[.github/workflows/validate_pyinstaller.yml:10] [SECURITY] This PR removes the explicit
permissions:
contents: readblock from the workflow. Without an explicit top-level permissions: declaration, the workflow inherits the repository/org default GITHUB_TOKEN scopes, which in many configurations are write-permissive. Dropping the least-privilege restriction is a security regression. Almost certainly an unintended revert from the stale-base issue (#1) — restore the block.
| self._concurrency_semaphore.release() | ||
| raise | ||
|
|
||
| def release_streaming_slot(self): |
There was a problem hiding this comment.
[CONCURRENCY] ```python
def release_streaming_slot(self):
"""
... Safe to call multiple times.
"""
if self._concurrency_semaphore:
try:
self._concurrency_semaphore.release()
except ValueError:
# already released
pass
The "safe to call multiple times" guarantee does not hold. `self._concurrency_semaphore` is constructed at line 483 as a plain `threading.Semaphore`, not a `BoundedSemaphore`. Plain `Semaphore.release()` never raises `ValueError` — it simply increments the internal counter unboundedly. So calling this twice silently inflates the permit count above `_max_concurrency`, breaking the concurrency cap (subsequent `acquire()`s will succeed when they should block).
This was raised in the previous review and has not been addressed. Two options:
- Switch `_concurrency_semaphore` to `threading.BoundedSemaphore(self._max_concurrency)` at line 483 so the `except ValueError` branch actually fires; or
- Track release state explicitly here, e.g. with a dedicated `threading.Event` or a per-invocation flag plumbed in by the caller, and gate the `release()` on it.
Today the runtime-level `cleanup()` closure happens to guard against double-release with its own `cleanup_done` Event, but this method's contract still misleads any future caller.
| if stderr is not None: | ||
| container.start_logs_thread_if_not_alive(stderr) | ||
|
|
||
| response = container.wait_for_streaming_response(event, tenant_id=tenant_id) |
There was a problem hiding this comment.
[BUG] invoke_streaming never calls self._configure_interrupt(function_config.full_path, function_config.timeout, container, ...), unlike the buffered invoke at line 297-299 which uses it to install a threading.Timer that stops the container when the configured Lambda timeout expires.
As a result, a streaming Lambda that hangs (infinite loop, blocked I/O, stuck SSE producer) will run indefinitely locally — neither the function timeout nor Ctrl+C/SIGTERM will tear down the container. The caller's cleanup() only fires when the consumer finishes reading the body, which may never happen if the function never closes its stream.
You'll want a streaming-aware variant of the timer: when it fires, both stop the container and force-close the streaming requests.Response so the body iterator unblocks and the cleanup chain runs. Keep in mind the existing start_timer() is meant to be invoked after the request is sent and cancelled when it returns; for streaming you'll need to cancel the timer either when the body is fully consumed or when cleanup runs (whichever comes first).
There was a problem hiding this comment.
Fixed in b6eb111. LambdaRuntime.invoke_streaming now calls _configure_interrupt(function_config.full_path, function_config.timeout, container, bool(debug_context)) and arms the same threading.Timer the buffered invoke() uses. The timer is cancelled inside the cleanup() closure on normal completion, and when it fires it calls self._container_manager.stop(container) which terminates the in-flight HTTP body and causes the consumer's iter_content to reach EOF, triggering the standard cleanup chain.
Smoke-tested with Timeout: 3 on a handler that streams for 10 s: the connection cleanly delivers ~3 s of SSE frames followed by Task timed out after 3.00 seconds and closes, instead of running for the full 10 s.
| # container as soon as possible, then route through the existing | ||
| # JSON parsing pipeline. | ||
| try: | ||
| body_bytes = response.content |
There was a problem hiding this comment.
[RESOURCE_MANAGEMENT] ```python
try:
body_bytes = response.content
finally:
cleanup()
`response.content` on a `stream=True` response materializes the entire body in memory in one shot before `cleanup()` runs. For a normal, well-behaved buffered Lambda response this is fine, but on an unexpectedly-large body (a function that returns many MB without the streaming response-mode header) this now reads it all before the concurrency slot is released, holding the slot longer than necessary and creating an OOM-on-large-response failure mode that the old `StringIO`-based path also had but at least bounded by Lambda's 6 MB buffered response cap.
This was raised in the previous review and has not been addressed. Consider iterating `response.iter_content()` into a `BytesIO` with a hard size cap (e.g. 6 MiB to mirror the Lambda invoke-response limit) and raising `LambdaResponseParseException` when exceeded — that way malformed/oversized non-streaming responses fail fast instead of OOM-ing the developer's machine.
There was a problem hiding this comment.
Same finding as #3261063539 — addressed in d82da42 by switching to iter_content(64 KiB) with _BUFFERED_RESPONSE_BYTE_CAP = 8 MiB. See #3261141228.
| # either a streaming response (which we forward chunk by chunk | ||
| # to the browser) or a buffered body that is then parsed by | ||
| # the existing API Gateway payload format helpers. | ||
| invoke_kind, invoke_payload, invoke_cleanup = self._invoke_lambda_function_streaming( |
There was a problem hiding this comment.
[ERROR_HANDLING] ```python
invoke_kind, invoke_payload, invoke_cleanup = self._invoke_lambda_function_streaming(
route.function_name, route_lambda_event, tenant_id
)
The surrounding `try` block (line 793) only catches `TenantIdValidationError`, `FunctionNotFound`, `UnsupportedInlineCodeError`, `LambdaResponseParseException`, `DockerContainerCreationFailedException`, and `MissingFunctionNameException`. Anything else — `ContainerResponseException`, `requests.exceptions.RequestException`, `UnsupportedInvocationType`, arbitrary `RuntimeError` from the new streaming pipeline, etc. — bubbles up to Flask as a 500 with no friendly error mapping.
This was raised in the previous review and has not been addressed. Resource cleanup is mostly handled inside `runtime.invoke_streaming`'s and `container.wait_for_streaming_response`'s own `except` blocks (so the semaphore and container are released), but the user-facing failure mode is worse than the old buffered path because there's now more code between the invoke and the response that can throw. Add a catch-all `except Exception` branch that maps to `ServiceErrorResponses.lambda_failure_response()` (or equivalent) so the API stays well-behaved on unexpected runtime errors, and explicitly call `invoke_cleanup()` if it was assigned before the exception was raised.
There was a problem hiding this comment.
Same finding as #3261063531 — addressed in d82da42 with a catch-all except Exception in the streaming branch of _request_handler. The buffered path is also now untouched because streaming is gated on FunctionUrlConfig.InvokeMode == RESPONSE_STREAM. See #3261141051.
d3f3973 to
d82da42
Compare
|
Force-pushed Two unrelated concerns from the automated review ( Three substantive code points were valid and are fixed:
The original end-to-end SSE test still passes (Node.js |
| # threading.Semaphore (not BoundedSemaphore) does not raise on | ||
| # over-release; it silently inflates the permit count, which | ||
| # would let more concurrent invokes through than configured. | ||
| self._streaming_slot_released = False |
There was a problem hiding this comment.
[CONCURRENCY] The slot-tracking attributes are stored on the Container instance itself, but wait_for_streaming_response re-initializes them on every call:
self._streaming_slot_released = False
self._streaming_slot_lock = threading.Lock()When AWS_LAMBDA_MAX_CONCURRENCY > 1 (the very setting that justifies a streaming pipeline), two concurrent invocations against the same container both acquire the semaphore and then both run these lines, with the second overwriting the first. Result:
- Thread A acquires permit → sets _streaming_slot_released=False, lock=L1
- Thread B acquires permit → overwrites with _streaming_slot_released=False, lock=L2
- Thread A's stream finishes → release_streaming_slot() flips _streaming_slot_released=True and releases one permit
- Thread B's stream finishes → reads _streaming_slot_released=True (set by A), returns as a no-op, never releases its permit
So every concurrent streaming invocation past the first leaks a semaphore permit, and after max_concurrency such overlaps the container is permanently saturated and queues forever. The fix is to make the tracking state local to a single invocation rather than shared across them, e.g. return a per-invoke release token from wait_for_streaming_response:
def wait_for_streaming_response(self, event, tenant_id=None):
...
released = {"v": False}
released_lock = threading.Lock()
if self._concurrency_semaphore:
self._concurrency_semaphore.acquire()
def release():
if not self._concurrency_semaphore:
return
with released_lock:
if released["v"]:
return
released["v"] = True
self._concurrency_semaphore.release()
try:
return self._make_streaming_http_request(event, tenant_id), release
except Exception:
release()
raiseand have the caller invoke that closure instead of container.release_streaming_slot(). This keeps each invocation's release state independent and removes the cross-thread overwrite.
There was a problem hiding this comment.
Good catch — fixed in b6eb111. wait_for_streaming_response no longer stores tracking state on self; instead each call closes over its own released dict + release_lock and returns a per-invocation release callable. LambdaRuntime.invoke_streaming now receives (response, release) and uses the closure in its cleanup(). Two concurrent invocations on the same container at AWS_LAMBDA_MAX_CONCURRENCY > 1 cannot overwrite each other's state, so no permit leak.
Container.release_streaming_slot() is gone in favor of the closure pattern, which matches the snippet you suggested.
| if stderr is not None: | ||
| container.start_logs_thread_if_not_alive(stderr) | ||
|
|
||
| response = container.wait_for_streaming_response(event, tenant_id=tenant_id) |
There was a problem hiding this comment.
[BUG] invoke_streaming never installs the function-timeout interrupt that the buffered invoke relies on. Compare with the buffered path at line ~300:
start_timer = self._configure_interrupt(
function_config.full_path, function_config.timeout, container, bool(debug_context)
)
...
container.wait_for_result(..., start_timer=start_timer, ...)_configure_interrupt returns a closure that arms a threading.Timer(function_config.timeout, ...) which calls self._container_manager.stop(container) when the configured Lambda timeout expires. invoke_streaming skips this entirely:
response = container.wait_for_streaming_response(event, tenant_id=tenant_id)Practical impact: a streaming Lambda that hangs (infinite loop, blocked I/O, an SSE producer stuck waiting on a downstream call) is never terminated locally — the container runs until the user kills SAM, the per-container concurrency slot is held the whole time, and the Function URL/HTTP API parity claim in the PR description doesn't hold (in production, Lambda enforces the configured timeout for streaming functions too). It also means Ctrl+C debug behavior diverges between the two paths.
Arm the same interrupt before calling wait_for_streaming_response, and cancel the returned timer from cleanup() so it doesn't fire after a successful stream:
start_timer = self._configure_interrupt(
function_config.full_path, function_config.timeout, container, bool(debug_context)
)
timer = start_timer() if start_timer else None
try:
response = container.wait_for_streaming_response(event, tenant_id=tenant_id)
except Exception:
if timer:
timer.cancel()
...
raise
def cleanup():
if cleanup_done.is_set():
return
cleanup_done.set()
if timer:
timer.cancel()
...There was a problem hiding this comment.
Fixed in b6eb111 — same change as #3261109757 above (timer armed via _configure_interrupt, cancelled in cleanup(), fires self._container_manager.stop(container) on timeout). Verified locally: Timeout: 3 on a handler that tries to stream for 10 s now terminates at ~3 s with Task timed out after 3.00 seconds.
d82da42 to
b6eb111
Compare
|
Force-pushed
All 382 affected unit tests pass on Python 3.13 ( End-to-end:
Open threads are all replied to. |
|
|
||
| if is_streaming_response(response): | ||
| LOG.debug("Lambda function returned a streaming response, forwarding chunks as they arrive") | ||
| return build_streaming_flask_response( |
There was a problem hiding this comment.
[RESOURCE_MANAGEMENT] _invoke_lambda_function_streaming only invokes cleanup in the buffered fallback's finally. On the streaming branch:
response, cleanup = self.lambda_runner.invoke_streaming(...)
if is_streaming_response(response):
LOG.debug("Lambda function returned a streaming response, ...")
return build_streaming_flask_response(
response,
on_complete=cleanup,
extra_headers=cors_headers,
)if anything between the invoke_streaming call and the return raises, cleanup is never run — leaking the timeout timer, the container's concurrency slot, the open requests.Response, and (for cold runtimes) the container itself.
This is not hypothetical: build_streaming_flask_response synchronously iterates the upstream response inside _peek_prelude whenever the runtime advertises application/vnd.awslambda.http-integration-response (the Function URLs case this PR is built for). A connection reset, RIE crash, or any exception while reading those first chunks will propagate out of build_streaming_flask_response with cleanup still uncalled.
Wrap construction so a failure runs cleanup:
response, cleanup = self.lambda_runner.invoke_streaming(...)
try:
if is_streaming_response(response):
flask_resp = build_streaming_flask_response(
response, on_complete=cleanup, extra_headers=cors_headers,
)
# cleanup is now owned by the Flask response lifecycle
return flask_resp
except BaseException:
cleanup()
raise(or move the streaming-detection check inside build_streaming_flask_response and have it call on_complete in its own construction-time error path).
There was a problem hiding this comment.
Fixed in f2e262f. _invoke_lambda_function_streaming now wraps the build_streaming_flask_response(...) call in a try/except BaseException that calls cleanup() and re-raises if construction fails (e.g. a connection reset or malformed prelude while _peek_prelude is reading the upstream body synchronously). Once build_streaming_flask_response returns successfully the cleanup callable is owned by the Flask response lifecycle via Response.call_on_close, so the explicit cleanup is only run on the construction-failure path.
| streaming_response = self._invoke_lambda_function_streaming( | ||
| route.function_name, route_lambda_event, tenant_id, cors_headers | ||
| ) | ||
| except TenantIdValidationError as e: |
There was a problem hiding this comment.
[ERROR_HANDLING] The new streaming branch's try/except catches TenantIdValidationError, FunctionNotFound, UnsupportedInlineCodeError, DockerContainerCreationFailedException, MissingFunctionNameException, and a catch-all Exception — but not LambdaResponseParseException. The buffered path right below it does catch it explicitly and returns ServiceErrorResponses.lambda_body_failure_response() (HTTP 500).
_invoke_lambda_function_streaming raises LambdaResponseParseException in two places it added in this PR — when the buffered drain trips _BUFFERED_RESPONSE_BYTE_CAP (line 722) and when the body looks like a Lambda error envelope (line 742). On streaming-enabled routes those failures will fall into except Exception and surface as lambda_failure_response() (HTTP 502), while the same condition on a non-streaming route surfaces as HTTP 500. Same root cause, different status code and log message depending purely on whether the route opted into InvokeMode=RESPONSE_STREAM.
Add an explicit handler before the catch-all to mirror the buffered path:
except MissingFunctionNameException as ex:
endpoint_service_error = ServiceErrorResponses.lambda_failure_response(
f"Failed to execute endpoint. Got an invalid function name ({str(ex)})",
)
except LambdaResponseParseException:
endpoint_service_error = ServiceErrorResponses.lambda_body_failure_response()
except Exception as ex: # pylint: disable=broad-except
...There was a problem hiding this comment.
Fixed in f2e262f. Added an explicit except LambdaResponseParseException clause before the catch-all in the streaming branch of _request_handler, returning ServiceErrorResponses.lambda_body_failure_response() (HTTP 500). This matches the buffered path exactly, so the _BUFFERED_RESPONSE_BYTE_CAP violation and the Lambda-error-envelope detection in _invoke_lambda_function_streaming now produce the same HTTP status regardless of whether the route opted into InvokeMode=RESPONSE_STREAM.
b6eb111 to
f2e262f
Compare
| # ``threading.Timer`` that stops the container when | ||
| # ``function_config.timeout`` seconds elapse, so a hung | ||
| # streaming handler is killed instead of running forever. | ||
| start_timer = self._configure_interrupt( |
There was a problem hiding this comment.
[BUG] The streaming pipeline's claim that the timeout timer terminates a hung function does not hold for WarmLambdaRuntime, which is what sam local start-api --warm-containers uses.
invoke_streaming arms _configure_interrupt(...):
start_timer = self._configure_interrupt(
function_config.full_path,
function_config.timeout,
container,
bool(debug_context),
)
if start_timer:
timer = start_timer()In LambdaRuntime._configure_interrupt the timer handler stops the container:
def timer_handler():
LOG.info("Function '%s' timed out after %d seconds", function_full_path, timeout)
self._container_manager.stop(container)But WarmLambdaRuntime._configure_interrupt (in the same file) only logs:
def timer_handler():
LOG.info("Function '%s' timed out after %d seconds", function_full_path, timeout)There is no self._container_manager.stop(container) in the warm variant. As a result, when a streaming Lambda hangs (infinite loop, blocked I/O, stuck SSE producer) under --warm-containers:
- timer.start() arms the timer.
- timeout seconds later it logs but does not stop the container.
- The in-flight HTTP body never EOFs, so the consumer's iter_content never returns.
- _wrapped_body's finally never runs, cleanup() is never called.
- The Flask response generator hangs the worker, the concurrency semaphore permit is never released, and the container keeps running.
The buffered path tolerates this in warm mode because RIE eventually returns from wait_for_http_response for normal handlers; the streaming path returns its requests.Response before the function finishes, so the only way to bound execution is the in-process timer.
This is also at odds with the docstring's stated contract on lines 401–403 ("When the timer fires it stops the container, which terminates the in-flight HTTP body"), which is only true for the cold LambdaRuntime.
Fix options:
- Override timer_handler in invoke_streaming to call self._container_manager.stop(container) directly instead of relying on whatever the runtime's _configure_interrupt happens to do — for streaming you always want the container stopped on timeout. For warm runtimes you'd then also need to evict the container from WarmLambdaRuntime._containers / _function_configs so the next request rebuilds it.
- Or add a streaming-specific timer in invoke_streaming that always stops the container, independent of _configure_interrupt.
Either way, the warm-container case needs explicit verification — the prior fix was tested with Timeout: 3 but presumably only against the cold LambdaRuntime path.
There was a problem hiding this comment.
Fixed in f0f912e. Right — WarmLambdaRuntime's _configure_interrupt.timer_handler only logs, so my earlier "timer stops the container" claim was only true on the cold path. The original smoke test passed in warm mode because RIE enforces its own internal timeout (it's what wrote Task timed out after 3.00 seconds to the body); SAM CLI's timer wasn't actually doing the work.
Replaced the _configure_interrupt-based timer in invoke_streaming with a dedicated, streaming-aware one. On timeout it:
- Calls
response.close()— this is the key step, it forces the consumer'siter_contentto EOF / error and triggers the standardcleanup()chain. Works for warm and cold. - Calls
self._container_manager.stop(container)only when the runtime is not a WarmLambdaRuntime (isinstance(self, WarmLambdaRuntime)). For cold runtimes the container is per-invoke anyway. For warm we deliberately keep the container so the next request reuses it; stopping it would leak a stale entry in_containersthat the nextis_created()check would happily return.
The timer is also skipped when debug_context is set, mirroring the buffered path.
Verified end-to-end with --warm-containers LAZY and a function declared with Timeout: 3 that tries to stream for 10 s:
- The request terminates at ~3 s with our log line
Streaming function 'StreamShortTimeoutFunction' timed out after 3 seconds(so the timer is actually doing the work now, not RIE alone). - A follow-up request against the same warm runtime against the regular
/streamroute completes all 10 ticks +done, proving the warm container was not left in a broken state.
Docstring updated to describe the new design accurately.
…-api sam local start-api was buffering Lambda invokes in a StringIO and then running the result through API Gateway proxy JSON parsing, which made it impossible to test functions that use awslambda.streamifyResponse (or any other Lambda response-streaming pattern, e.g. Server-Sent Events). The HTTP request to RIE was issued without stream=True and the response body was materialised in full before the caller saw a single byte. This change adds a parallel "streaming-aware" invoke pipeline that the API Gateway service uses by default while keeping the buffered code path exactly as it was for non-streaming responses: * docker/container.py: new wait_for_streaming_response / _make_streaming_http_request methods that issue the RIE invoke with stream=True and hand the caller the raw requests.Response. A matching release_streaming_slot keeps the per-container concurrency semaphore correctly balanced across the lifetime of the stream. * lambdafn/runtime.py: new LambdaRuntime.invoke_streaming that builds / runs the container the same way invoke() does but returns a tuple of (response, cleanup). The cleanup callable releases the slot and invokes _on_invoke_done so warm containers stay warm while cold containers are stopped after the stream is fully consumed. * commands/local/lib/local_lambda.py: thin wrapper plumbing invoke_streaming through LocalLambdaRunner. * apigw/streaming_response.py (new): turns the streaming RIE response into a flask.Response. When the upstream Content-Type is application/vnd.awslambda.http-integration-response, the JSON prelude is parsed and applied to the outgoing status/headers/cookies before the body bytes (after the 8-NUL delimiter) are forwarded chunk by chunk. Chunks are pulled with raw.read1() for minimal latency, and call_on_close hooks the cleanup callable to the Flask response lifecycle. * apigw/local_apigw_service.py: _request_handler now invokes through the streaming-aware helper. When the runtime advertises streaming (via Lambda-Runtime-Function-Response-Mode), the streamed response is forwarded straight to the browser with CORS headers merged in. Otherwise the previously-buffered body is parsed and rendered through the existing v1/v2 payload-format helpers. * local/rapid/aws-lambda-rie-*: refreshed RIE binaries built from the streaming-aware fork that lives in aws-lambda-runtime-interface-emulator (case-insensitive response mode comparison + streamingCopy pass-through). Validated end-to-end with a Node.js Lambda using awslambda.streamifyResponse: an EventSource client in a browser receives one SSE frame per second over a 10-second stream rather than seeing all 10 frames arrive together at the end. Co-authored-by: Cursor <cursoragent@cursor.com>
f2e262f to
f0f912e
Compare
|
Here's the demo of lambda streaming function, executing a NodeJS handler and sending SSE to the browser. SamLocalApiStreamingDemo.mp4template.yamlapp.jsTesting |
Summary
sam local start-apicurrently buffers the runtime's/responsebody into aStringIOand then runs the result through the API Gateway proxy JSON parser. That makes it impossible to test Lambda functions that use response streaming (e.g. Node.jsawslambda.streamifyResponse, Python streaming responses, or any handler that emits Server-Sent Events). The HTTP request to RIE is issued withoutstream=Trueand the response body is materialized in full before the caller sees a single byte, so a 60-second SSE stream surfaces to the browser as one big blob at the end (or worse: as a 502 because the body isn't valid API Gateway proxy JSON).Fixes #6501.
This PR adds a parallel streaming-aware invoke pipeline that the API Gateway local service uses by default, while keeping the existing buffered code path 100% intact for non-streaming responses. The companion RIE change is required for chunks to actually flow through the emulator without being coalesced — see the dependency note below.
Dependency: companion RIE change
The RIE binary that ships under
samcli/local/rapid/aws-lambda-rie-{arm64,x86_64}had to be rebuilt from a fork of aws-lambda-runtime-interface-emulator that pumps the runtime's/responsebody through to the caller chunk-by-chunk instead ofio.ReadAll-ing it first. The exact RIE change is in companion PR aws/aws-lambda-runtime-interface-emulator#179 (tracked by aws/aws-lambda-runtime-interface-emulator#175). The updated binaries are committed undersamcli/local/rapid/in this PR.Approach
samcli/local/docker/container.py— newwait_for_streaming_response/_make_streaming_http_requestmethods that issue the RIE invoke withstream=Trueand hand the caller the rawrequests.Response. A matchingrelease_streaming_slotkeeps the per-container concurrency semaphore correctly balanced across the entire lifetime of the stream.samcli/local/lambdafn/runtime.py— newLambdaRuntime.invoke_streamingthat builds / runs the container exactly likeinvoke()does but returns a(response, cleanup)tuple. The cleanup callable releases the slot and runs_on_invoke_done, so warm containers stay warm while cold containers are stopped after the stream is fully consumed (or the client disconnects).samcli/commands/local/lib/local_lambda.py— thin wrapper plumbinginvoke_streamingthroughLocalLambdaRunner.samcli/local/apigw/streaming_response.py(new) — turns the streaming RIE response into aflask.Response. When the upstreamContent-Typeisapplication/vnd.awslambda.http-integration-response, the JSON prelude is parsed and applied to the outgoing status / headers / cookies before the body bytes (after the 8-NUL delimiter) are forwarded chunk by chunk. Chunks are pulled withraw.read1()for minimal latency, andcall_on_closehooks the cleanup callable onto the Flask response lifecycle so we don't leak containers if the browser disconnects mid-stream.samcli/local/apigw/local_apigw_service.py—_request_handlernow invokes through the streaming-aware helper. When the runtime advertises streaming (viaLambda-Runtime-Function-Response-Mode), the streamed response is forwarded straight to the browser with CORS headers merged in. Otherwise the previously-buffered body is parsed and rendered through the existing v1/v2 payload-format helpers — no behavior change for non-streaming Lambdas.samcli/local/rapid/aws-lambda-rie-{arm64,x86_64}— refreshed binaries from the companion RIE PR.Test plan
Setup
1. Streaming Lambda end-to-end (curl)
template.yaml:src/app.js:Run:
samdev local start-api --warm-containers LAZY curl -N http://127.0.0.1:3000/streamExpected: the 10 `tick` SSE frames appear in the terminal one per second, not in a single batch at the end. The response carries `HTTP/1.1 200 OK`, `Content-Type: text/event-stream` (from the prelude), and `Transfer-Encoding: chunked`.
2. Browser SSE test
Open the streaming endpoint from a simple HTML page with `new EventSource('http://127.0.0.1:3000/stream')\` and verify each `event: tick` is rendered in the DOM the moment it arrives (timestamp deltas should be ~1.00s, not 0s with all events at the end).
3. Regression: buffered Lambda still works
Add a second route returning a plain API Gateway proxy response (`{ statusCode: 200, body: JSON.stringify(...) }`) and `curl -i http://127.0.0.1:3000/hello\`. Should return `HTTP/1.1 200 OK` with `Content-Length: ` and the body parsed by the existing v1/v2 payload format helpers exactly like before.
4. Unit tests
```bash
pytest tests/unit/local/apigw tests/unit/local/docker tests/unit/local/lambdafn tests/unit/commands/local
```
All existing tests pass. (No new buffered-path behavior, the streaming pipeline is purely additive.)
5. RIE side
The companion PR includes unit tests for the new streaming-copy helper and runs `go test ./internal/lambda/...` clean.
Notes / out of scope
Made with Cursor