-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Dispatch activity cancellation to worker using Nexus #9233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 55 commits
Commits
Show all changes
69 commits
Select commit
Hold shift + click to select a range
31e35cd
Store worker_instance_key in ActivityInfo
rkannan82 2874f4e
Add unit test
rkannan82 ad8c480
Fix lint
rkannan82 986e9a1
Remove redundant WorkerInstanceKey assignment in UpdateActivity callback
rkannan82 fb3066c
Store worker_control_task_queue in ActivityInfo
rkannan82 6ef2640
Update go.temporal.io/api to include worker_instance_key and worker_c…
rkannan82 eab94b0
Fix lint
rkannan82 0f8e48e
Remove worker_instance_key as it is not needed
rkannan82 5390759
Forward WorkerControlTaskQueue through matching service partitions
rkannan82 ac7be08
Define CancelActivityNexusTask transfer task type
rkannan82 113ca15
Fix lint errors
rkannan82 5ad9aeb
Regen proto
rkannan82 05adf1a
Remove worker_instance_key as it is not needed
rkannan82 39c1759
Move CancelActivityNexusTask creation to task_generator
rkannan82 131c278
Add WorkerControlTaskQueue to CancelActivityNexusTask
rkannan82 7bc8ada
Add metrics tag and low priority for CancelActivityNexusTask
rkannan82 01e4822
Update comment for standby executor
rkannan82 e9b1fac
Add Version field to CancelActivityNexusTask for multi-cluster support
rkannan82 faff7ee
Add comment
rkannan82 d6af322
Add ActivityCommandTask for outbound activity commands
rkannan82 7f5e0fc
Regenerate mocks with go-generate for correct ordering and parameter …
rkannan82 cd17334
Change ActivityCommandTask to use task_tokens instead of scheduled_ev…
rkannan82 6a1a66b
Change ActivityCommandTaskInfo to WorkerCommandsTask
rkannan82 fbf13b0
Add ActivityCommandTask dispatch via Nexus
rkannan82 d5591a4
Update api-go dependency to merged activity-cancel branch
rkannan82 537edb9
Merge branch 'kannan/activity-cancel/task-definition' into kannan/act…
rkannan82 d182f84
Update dispatcher to use WorkerCommandsRequest from nexusservices pac…
rkannan82 0f6651c
Update dispatcher for ExecuteCommands rename and extracted types
rkannan82 7f842cf
Generalize task to WorkerCommandsTask using API WorkerCommand type
rkannan82 2150c89
Merge branch 'kannan/activity-cancel/task-definition' into kannan/act…
rkannan82 1b85cce
Update dispatcher and integration test for WorkerCommandsTask
rkannan82 404ce2a
Rename TASK_TYPE_ACTIVITY_COMMAND to TASK_TYPE_WORKER_COMMANDS and re…
rkannan82 54232d3
Merge branch 'kannan/activity-cancel/task-definition' into kannan/act…
rkannan82 3e0a3a0
Rename activity_command_task.go to worker_commands_task.go
rkannan82 05a7c1d
Merge branch 'kannan/activity-cancel/task-definition' into kannan/act…
rkannan82 6614c3e
Add worker commands dispatcher with Nexus response handling and unit …
rkannan82 64c52e9
Fix lint errors: gofmt alignment, testifylint, importas, staticcheck,…
rkannan82 7343da1
Refactor worker commands dispatcher: deduplicate failure handling, re…
rkannan82 66dd3fb
Clean up dispatch response handling: inline failure conversion, impro…
rkannan82 dfac6d2
Store started_clock in ActivityInfo for task token reconstruction
rkannan82 29ac6e5
Cap worker commands task retries at 3 attempts
rkannan82 3fd7447
Add retry cap comment to dispatcher failure scenarios doc
rkannan82 412045e
Emit metric when dropping worker commands task at retry cap
rkannan82 083d313
Clarify attempt parameter in test calls with inline comment
rkannan82 8dec8b4
Update go.mod to latest api-go (includes API PR #708)
rkannan82 3e4267f
Address review feedback: backward compat, standby executor, lock safety
rkannan82 ca17749
Remove replace directive and update go.temporal.io/api to v1.62.8
rkannan82 0549e23
Fix compile error in convertTemporalFailure: := to = for named returns
rkannan82 466f4d2
Remove redundant StartedEventId check in cancel activity handler
rkannan82 061cc39
Merge origin/main into kannan/activity-cancel/dispatch-logic
rkannan82 e3a08a5
Revert executable_mock.go to main (Attempt() already merged via #9924)
rkannan82 8846349
Fix nits: align metric defs, replace assert with require in tests
rkannan82 3c4e62c
Remove unused common import in handler test
rkannan82 6a890ba
Replace workerservicepb.WorkerService with string constants
rkannan82 3420f9a
Fix GCI lint: alphabetize imports and align struct fields in test
rkannan82 56e9b73
Fix nil Clock in duplicate RecordActivityTaskStarted for pre-deploy a…
rkannan82 9859175
Improve RecordActivityTaskStarted test coverage and naming
rkannan82 7ef2b52
Add integration test for duplicate RecordActivityTaskStarted
rkannan82 a6ac85f
Address review feedback: move metric, check handler error retryability
rkannan82 0f546ca
Add description to WorkerCommandsSent metric
rkannan82 f4fe808
Use SDK failure converter instead of Nexus failure conversion
rkannan82 aa9dd98
Use retryable matching client for worker commands dispatch
rkannan82 7966342
Fix formatting alignment
rkannan82 c271f43
Merge branch 'main' into kannan/activity-cancel/dispatch-logic
rkannan82 d7190ef
Fix proto depIdxs conflict and test dependency for MatchingClient
Copilot 0f09733
Merge branch 'main' into kannan/activity-cancel/dispatch-logic
rkannan82 59b72db
Merge branch 'main' into kannan/activity-cancel/dispatch-logic
rkannan82 cd56d9c
Merge branch 'main' into kannan/activity-cancel/dispatch-logic
rkannan82 b0d428d
Merge branch 'main' into kannan/activity-cancel/dispatch-logic
rkannan82 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| package history | ||
|
|
||
| import ( | ||
| "fmt" | ||
|
|
||
| "github.com/nexus-rpc/sdk-go/nexus" | ||
| failurepb "go.temporal.io/api/failure/v1" | ||
| nexuspb "go.temporal.io/api/nexus/v1" | ||
| "go.temporal.io/server/api/matchingservice/v1" | ||
| commonnexus "go.temporal.io/server/common/nexus" | ||
| "go.temporal.io/server/common/nexus/nexusrpc" | ||
| ) | ||
|
|
||
| // convertTemporalFailure converts a Temporal API Failure proto into a Go error | ||
| // via the Nexus SDK failure converter. | ||
| func convertTemporalFailure(failure *failurepb.Failure) (nexusErr error, err error) { | ||
| nexusFailure, err := commonnexus.TemporalFailureToNexusFailure(failure) | ||
|
rkannan82 marked this conversation as resolved.
Outdated
|
||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to convert Temporal failure: %w", err) | ||
| } | ||
| nexusErr, err = nexusrpc.DefaultFailureConverter().FailureToError(nexusFailure) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to convert Nexus failure to error: %w", err) | ||
| } | ||
| return nexusErr, nil | ||
| } | ||
|
|
||
| // dispatchResponseToError converts a DispatchNexusTaskResponse proto into a Nexus SDK error. | ||
| // Returns nil if the response indicates success. | ||
| func dispatchResponseToError(resp *matchingservice.DispatchNexusTaskResponse) error { | ||
| switch t := resp.GetOutcome().(type) { | ||
| // Worker received the task and explicitly failed it (via RespondNexusTaskFailed). | ||
| case *matchingservice.DispatchNexusTaskResponse_Failure: | ||
| converted, err := convertTemporalFailure(t.Failure) | ||
| if err != nil { | ||
| return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "%v", err) | ||
| } | ||
| return converted | ||
| case *matchingservice.DispatchNexusTaskResponse_RequestTimeout: | ||
| return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") | ||
| // Worker responded successfully; check the inner StartOperation response. | ||
| case *matchingservice.DispatchNexusTaskResponse_Response: | ||
| return startOperationResponseToError(t.Response.GetStartOperation()) | ||
| default: | ||
| return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "empty or unknown dispatch outcome") | ||
| } | ||
| } | ||
|
|
||
| // startOperationResponseToError converts a StartOperationResponse proto into a Nexus SDK error. | ||
| // Returns nil for success variants (SyncSuccess, AsyncSuccess). | ||
| func startOperationResponseToError(resp *nexuspb.StartOperationResponse) error { | ||
| switch t := resp.GetVariant().(type) { | ||
| case *nexuspb.StartOperationResponse_SyncSuccess: | ||
| return nil | ||
| case *nexuspb.StartOperationResponse_AsyncSuccess: | ||
| return nil | ||
| // Operation processed but failed — the worker returned an explicit failure. | ||
| case *nexuspb.StartOperationResponse_Failure: | ||
| return operationErrorFromFailure(t.Failure) | ||
| default: | ||
| return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "empty or unknown start operation response variant") | ||
| } | ||
| } | ||
|
|
||
| // operationErrorFromFailure converts a Temporal API Failure into a Nexus SDK operation error. | ||
| func operationErrorFromFailure(failure *failurepb.Failure) error { | ||
|
rkannan82 marked this conversation as resolved.
Outdated
|
||
| state := nexus.OperationStateFailed | ||
| if failure.GetCanceledFailureInfo() != nil { | ||
| state = nexus.OperationStateCanceled | ||
| } | ||
| cause, err := convertTemporalFailure(failure) | ||
| if err != nil { | ||
| return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "%v", err) | ||
| } | ||
| opError := &nexus.OperationError{ | ||
| State: state, | ||
| Message: fmt.Sprintf("operation error: %s", failure.GetMessage()), | ||
| Cause: cause, | ||
| } | ||
| if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opError); err != nil { | ||
| return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "failed to mark operation error as wrapper: %v", err) | ||
| } | ||
| return opError | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.