Fix continue-as-new handling in workflow_streams subscribe#1581
Open
brianstrauch wants to merge 2 commits into
Open
Fix continue-as-new handling in workflow_streams subscribe#1581brianstrauch wants to merge 2 commits into
brianstrauch wants to merge 2 commits into
Conversation
_follow_continue_as_new described the workflow with no run id, which returns the current run. After a continue-as-new that is the new RUNNING run, never CONTINUED_AS_NEW (which only sits on the old, closed run), so the check never fired and subscribe() stopped during a rollover instead of following the stream. Capture the run id each poll's update is admitted to (start_update with WaitForStage ACCEPTED, read workflow_run_id, then await result) and describe that specific run on failure. A rolled-over run reports CONTINUED_AS_NEW, a terminal run reports a terminal status, and a still-RUNNING run is a transient error that should surface. This also avoids mistaking an unrelated new execution that reused the workflow id for a successor. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The poll update's validator rejected new polls during detach-for-CAN with an untyped RuntimeError, which subscribe() did not classify and re-raised — ending the subscription with an error during a routine rollover. Give the validator the well-known StreamDraining ApplicationError type and have subscribe() back off and retry on it, so the poll lands on the successor run once the rollover completes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
37a1ad7 to
cb75ced
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Two fixes to how
WorkflowStreamClient.subscribe()behaves when the host workflow continues-as-new.1. Continue-as-new detection (
_follow_continue_as_new)_follow_continue_as_newdescribed the workflow with no run id, which returns the current run. After a continue-as-new that is the newRUNNINGrun — neverCONTINUED_AS_NEW, which only sits on the old, closed run — so the check never fired andsubscribe()stopped (or surfaced an error) during a rollover instead of following the stream.Now we capture the run id each poll's update is admitted to (
start_updatewithWaitForStage.ACCEPTED, readworkflow_run_id, thenawait result()) and describe that specific run on failure. A rolled-over run reportsCONTINUED_AS_NEW(→ retry), a terminal run reports a terminal status (→ stop), a still-RUNNINGrun is a transient error (→ surface). This also avoids mistaking an unrelated new execution that reused the workflow id for a successor.2. Retry polls rejected while draining
The poll update's validator rejects new polls during detach-for-CAN. It raised an untyped
RuntimeError, whichsubscribe()did not classify and re-raised — ending the subscription with an error during a routine rollover. The validator now raises the well-knownStreamDrainingApplicationErrortype, andsubscribe()backs off and retries on it.Cross-language note
"StreamDraining"is a new well-known error type in the cross-language workflow-streams protocol (alongsideTruncatedOffset). The matching changes land in sdk-go and sdk-typescript.Tests
test_follow_continue_as_new_describes_polled_run— against a real post-CAN run, the polled (old) run reportsCONTINUED_AS_NEWand the latest reportsRUNNING; the helper follows only when describing the polled run.test_subscribe_retries_while_draining— a workflow that detaches and holds open until released; a livesubscribe()survives the draining window and resumes on the successor run.Full
tests/contrib/workflow_streams/suite passes; mypy clean.🤖 Generated with Claude Code