Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package history
package nexus

import (
"github.com/nexus-rpc/sdk-go/nexus"
Expand All @@ -7,30 +7,30 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
)

// dispatchResponseToError converts a DispatchNexusTaskResponse proto into a Go error.
// DispatchResponseToError converts a DispatchNexusTaskResponse proto into a Go error.
// Returns nil if the response indicates success.
//
// For failure cases (worker explicitly returned an error), the Temporal SDK's failure
// converter is used to produce standard Go errors (ApplicationError, CanceledError).
// For transport-level issues (timeout, internal), a nexus.HandlerError is returned
// so the caller can check Retryable().
func dispatchResponseToError(resp *matchingservice.DispatchNexusTaskResponse) error {
func DispatchResponseToError(resp *matchingservice.DispatchNexusTaskResponse) error {
switch t := resp.GetOutcome().(type) {
case *matchingservice.DispatchNexusTaskResponse_Failure:
// Worker received the task and explicitly failed it (via RespondNexusTaskFailed).
return temporal.GetDefaultFailureConverter().FailureToError(t.Failure)
case *matchingservice.DispatchNexusTaskResponse_RequestTimeout:
return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout")
case *matchingservice.DispatchNexusTaskResponse_Response:
return startOperationResponseToError(t.Response.GetStartOperation())
return StartOperationResponseToError(t.Response.GetStartOperation())
default:
return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "empty or unknown dispatch outcome")
}
}

// startOperationResponseToError converts a StartOperationResponse proto into a Go error.
// StartOperationResponseToError converts a StartOperationResponse proto into a Go error.
// Returns nil for success variants (SyncSuccess, AsyncSuccess).
func startOperationResponseToError(resp *nexuspb.StartOperationResponse) error {
func StartOperationResponseToError(resp *nexuspb.StartOperationResponse) error {
switch t := resp.GetVariant().(type) {
case *nexuspb.StartOperationResponse_SyncSuccess:
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package history
package nexus

import (
"testing"
Expand All @@ -25,7 +25,7 @@ func TestDispatchResponseToError_SyncSuccess(t *testing.T) {
},
},
}
err := dispatchResponseToError(resp)
err := DispatchResponseToError(resp)
require.NoError(t, err)
}

Expand All @@ -45,7 +45,7 @@ func TestDispatchResponseToError_AsyncSuccess(t *testing.T) {
},
},
}
err := dispatchResponseToError(resp)
err := DispatchResponseToError(resp)
require.NoError(t, err)
}

Expand All @@ -55,7 +55,7 @@ func TestDispatchResponseToError_RequestTimeout(t *testing.T) {
RequestTimeout: &matchingservice.DispatchNexusTaskResponse_Timeout{},
},
}
err := dispatchResponseToError(resp)
err := DispatchResponseToError(resp)
require.Error(t, err)

var handlerErr *nexus.HandlerError
Expand All @@ -76,7 +76,7 @@ func TestDispatchResponseToError_WorkerFailure(t *testing.T) {
},
},
}
err := dispatchResponseToError(resp)
err := DispatchResponseToError(resp)
require.Error(t, err)

var appErr *temporal.ApplicationError
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestDispatchResponseToError_OperationFailure_ApplicationError(t *testing.T)
},
},
}
err := dispatchResponseToError(resp)
err := DispatchResponseToError(resp)
require.Error(t, err)

var appErr *temporal.ApplicationError
Expand All @@ -132,7 +132,7 @@ func TestDispatchResponseToError_OperationFailure_CanceledError(t *testing.T) {
},
},
}
err := dispatchResponseToError(resp)
err := DispatchResponseToError(resp)
require.Error(t, err)

var cancelErr *temporal.CanceledError
Expand All @@ -141,7 +141,7 @@ func TestDispatchResponseToError_OperationFailure_CanceledError(t *testing.T) {

func TestDispatchResponseToError_EmptyOutcome(t *testing.T) {
resp := &matchingservice.DispatchNexusTaskResponse{}
err := dispatchResponseToError(resp)
err := DispatchResponseToError(resp)
require.Error(t, err)

var handlerErr *nexus.HandlerError
Expand All @@ -151,7 +151,7 @@ func TestDispatchResponseToError_EmptyOutcome(t *testing.T) {

func TestStartOperationResponseToError_EmptyVariant(t *testing.T) {
resp := &nexuspb.StartOperationResponse{}
err := startOperationResponseToError(resp)
err := StartOperationResponseToError(resp)
require.Error(t, err)

var handlerErr *nexus.HandlerError
Expand Down
5 changes: 3 additions & 2 deletions service/history/worker_commands_task_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/tasks"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (d *workerCommandsTaskDispatcher) dispatchToWorker(
return fmt.Errorf("failed to dispatch worker commands to control queue %s: %w", task.Destination, err)
}

nexusErr := dispatchResponseToError(resp)
nexusErr := commonnexus.DispatchResponseToError(resp)
if nexusErr == nil {
metrics.WorkerCommandsSent.With(d.metricsHandler).Record(1, metrics.OutcomeTag("success"))
return nil
Expand All @@ -169,7 +170,7 @@ func (d *workerCommandsTaskDispatcher) handleError(nexusErr error, task *tasks.W
var handlerErr *nexus.HandlerError
if errors.As(nexusErr, &handlerErr) {
// Handler-level error (transport, timeout, internal). These are constructed by
// dispatchResponseToError for non-worker-returned failures.
// DispatchResponseToError for non-worker-returned failures.
if handlerErr.Type == nexus.HandlerErrorTypeUpstreamTimeout {
d.logger.Warn("No worker polling control queue",
tag.NewStringTag("control_queue", task.Destination))
Expand Down
Loading