diff --git a/service/history/nexus_dispatch_response.go b/common/nexus/dispatch_response.go similarity index 84% rename from service/history/nexus_dispatch_response.go rename to common/nexus/dispatch_response.go index 63b9b07a99..143525b85d 100644 --- a/service/history/nexus_dispatch_response.go +++ b/common/nexus/dispatch_response.go @@ -1,4 +1,4 @@ -package history +package nexus import ( "github.com/nexus-rpc/sdk-go/nexus" @@ -7,14 +7,14 @@ import ( "go.temporal.io/server/api/matchingservice/v1" ) -// dispatchResponseToError converts a DispatchNexusTaskResponse proto into a Go error. +// DispatchResponseToError converts a DispatchNexusTaskResponse proto into a Go error. // Returns nil if the response indicates success. // // For failure cases (worker explicitly returned an error), the Temporal SDK's failure // converter is used to produce standard Go errors (ApplicationError, CanceledError). // For transport-level issues (timeout, internal), a nexus.HandlerError is returned // so the caller can check Retryable(). -func dispatchResponseToError(resp *matchingservice.DispatchNexusTaskResponse) error { +func DispatchResponseToError(resp *matchingservice.DispatchNexusTaskResponse) error { switch t := resp.GetOutcome().(type) { case *matchingservice.DispatchNexusTaskResponse_Failure: // Worker received the task and explicitly failed it (via RespondNexusTaskFailed). @@ -22,15 +22,15 @@ func dispatchResponseToError(resp *matchingservice.DispatchNexusTaskResponse) er case *matchingservice.DispatchNexusTaskResponse_RequestTimeout: return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") case *matchingservice.DispatchNexusTaskResponse_Response: - return startOperationResponseToError(t.Response.GetStartOperation()) + return StartOperationResponseToError(t.Response.GetStartOperation()) default: return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "empty or unknown dispatch outcome") } } -// startOperationResponseToError converts a StartOperationResponse proto into a Go error. +// StartOperationResponseToError converts a StartOperationResponse proto into a Go error. // Returns nil for success variants (SyncSuccess, AsyncSuccess). -func startOperationResponseToError(resp *nexuspb.StartOperationResponse) error { +func StartOperationResponseToError(resp *nexuspb.StartOperationResponse) error { switch t := resp.GetVariant().(type) { case *nexuspb.StartOperationResponse_SyncSuccess: return nil diff --git a/service/history/nexus_dispatch_response_test.go b/common/nexus/dispatch_response_test.go similarity index 93% rename from service/history/nexus_dispatch_response_test.go rename to common/nexus/dispatch_response_test.go index 76b1b9568b..b98829fcca 100644 --- a/service/history/nexus_dispatch_response_test.go +++ b/common/nexus/dispatch_response_test.go @@ -1,4 +1,4 @@ -package history +package nexus import ( "testing" @@ -25,7 +25,7 @@ func TestDispatchResponseToError_SyncSuccess(t *testing.T) { }, }, } - err := dispatchResponseToError(resp) + err := DispatchResponseToError(resp) require.NoError(t, err) } @@ -45,7 +45,7 @@ func TestDispatchResponseToError_AsyncSuccess(t *testing.T) { }, }, } - err := dispatchResponseToError(resp) + err := DispatchResponseToError(resp) require.NoError(t, err) } @@ -55,7 +55,7 @@ func TestDispatchResponseToError_RequestTimeout(t *testing.T) { RequestTimeout: &matchingservice.DispatchNexusTaskResponse_Timeout{}, }, } - err := dispatchResponseToError(resp) + err := DispatchResponseToError(resp) require.Error(t, err) var handlerErr *nexus.HandlerError @@ -76,7 +76,7 @@ func TestDispatchResponseToError_WorkerFailure(t *testing.T) { }, }, } - err := dispatchResponseToError(resp) + err := DispatchResponseToError(resp) require.Error(t, err) var appErr *temporal.ApplicationError @@ -105,7 +105,7 @@ func TestDispatchResponseToError_OperationFailure_ApplicationError(t *testing.T) }, }, } - err := dispatchResponseToError(resp) + err := DispatchResponseToError(resp) require.Error(t, err) var appErr *temporal.ApplicationError @@ -132,7 +132,7 @@ func TestDispatchResponseToError_OperationFailure_CanceledError(t *testing.T) { }, }, } - err := dispatchResponseToError(resp) + err := DispatchResponseToError(resp) require.Error(t, err) var cancelErr *temporal.CanceledError @@ -141,7 +141,7 @@ func TestDispatchResponseToError_OperationFailure_CanceledError(t *testing.T) { func TestDispatchResponseToError_EmptyOutcome(t *testing.T) { resp := &matchingservice.DispatchNexusTaskResponse{} - err := dispatchResponseToError(resp) + err := DispatchResponseToError(resp) require.Error(t, err) var handlerErr *nexus.HandlerError @@ -151,7 +151,7 @@ func TestDispatchResponseToError_EmptyOutcome(t *testing.T) { func TestStartOperationResponseToError_EmptyVariant(t *testing.T) { resp := &nexuspb.StartOperationResponse{} - err := startOperationResponseToError(resp) + err := StartOperationResponseToError(resp) require.Error(t, err) var handlerErr *nexus.HandlerError diff --git a/service/history/worker_commands_task_dispatcher.go b/service/history/worker_commands_task_dispatcher.go index 0d9510429c..967f26548a 100644 --- a/service/history/worker_commands_task_dispatcher.go +++ b/service/history/worker_commands_task_dispatcher.go @@ -17,6 +17,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + commonnexus "go.temporal.io/server/common/nexus" "go.temporal.io/server/common/resource" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/tasks" @@ -156,7 +157,7 @@ func (d *workerCommandsTaskDispatcher) dispatchToWorker( return fmt.Errorf("failed to dispatch worker commands to control queue %s: %w", task.Destination, err) } - nexusErr := dispatchResponseToError(resp) + nexusErr := commonnexus.DispatchResponseToError(resp) if nexusErr == nil { metrics.WorkerCommandsSent.With(d.metricsHandler).Record(1, metrics.OutcomeTag("success")) return nil @@ -169,7 +170,7 @@ func (d *workerCommandsTaskDispatcher) handleError(nexusErr error, task *tasks.W var handlerErr *nexus.HandlerError if errors.As(nexusErr, &handlerErr) { // Handler-level error (transport, timeout, internal). These are constructed by - // dispatchResponseToError for non-worker-returned failures. + // DispatchResponseToError for non-worker-returned failures. if handlerErr.Type == nexus.HandlerErrorTypeUpstreamTimeout { d.logger.Warn("No worker polling control queue", tag.NewStringTag("control_queue", task.Destination))