diff --git a/go.mod b/go.mod index 847f9817a..6921fd3ca 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/temporalio/cli/cliext v0.0.0 github.com/temporalio/ui-server/v2 v2.49.1 go.temporal.io/api v1.62.13 - go.temporal.io/sdk v1.41.1 + go.temporal.io/sdk v1.44.1 go.temporal.io/sdk/contrib/envconfig v1.0.0 go.temporal.io/server v1.32.0-157.0 golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f diff --git a/go.sum b/go.sum index 5d98f630c..31903bc40 100644 --- a/go.sum +++ b/go.sum @@ -473,8 +473,8 @@ go.temporal.io/api v1.62.13 h1:xMa8Nt5oAMX+LvlCJA44wjTCc1H09i2rG9poB1/xvH4= go.temporal.io/api v1.62.13/go.mod h1:0k75tRljEuELWGeXjEZZO7zYqBln4+1FrG6+IMOMy7Q= go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2 h1:1hKeH3GyR6YD6LKMHGCZ76t6h1Sgha0hXVQBxWi3dlQ= go.temporal.io/auto-scaled-workers v0.0.0-20260407181057-edd947d743d2/go.mod h1:T8dnzVPeO+gaUTj9eDgm/lT2lZH4+JXNvrGaQGyVi50= -go.temporal.io/sdk v1.41.1 h1:yOpvsHyDD1lNuwlGBv/SUodCPhjv9nDeC9lLHW/fJUA= -go.temporal.io/sdk v1.41.1/go.mod h1:/InXQT5guZ6AizYzpmzr5avQ/GMgq1ZObcKlKE2AhTc= +go.temporal.io/sdk v1.44.1 h1:Mt2OZLZpqkzDIdg9YyQzO0Rb/HqCDnnqHlIAGAJ5gqM= +go.temporal.io/sdk v1.44.1/go.mod h1:vkApR12F9/Y8OR+hkxe7WyXQFuCX6clhzqnAk6rzDAM= go.temporal.io/sdk/contrib/envconfig v1.0.0 h1:1Q/swVgB4EW/p3k7rI9/4hpU4/DC57FSRbU90+UisXw= go.temporal.io/sdk/contrib/envconfig v1.0.0/go.mod h1:Pj4N1lwUEvxap6quBm8GrVMSUMJhSZkVtxjt3AYnPPg= go.temporal.io/server v1.32.0-157.0 h1:nzFqNwx+5lXsT0/DSiFyR5vHMnDcT3PVAvmRDqCUn38= diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index 5186d70e3..9670c593c 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -341,6 +341,47 @@ func (v *NexusEndpointConfigOptions) BuildFlags(f *pflag.FlagSet) { f.StringVar(&v.TargetUrl, "target-url", "", "An external Nexus Endpoint that receives forwarded Nexus requests. May be used as an alternative to `--target-namespace` and `--target-task-queue`. EXPERIMENTAL.") } +type NexusOperationReferenceOptions struct { + OperationId string + RunId string + FlagSet *pflag.FlagSet +} + +func (v *NexusOperationReferenceOptions) BuildFlags(f *pflag.FlagSet) { + v.FlagSet = f + f.StringVar(&v.OperationId, "operation-id", "", "Nexus Operation ID. Required.") + _ = cobra.MarkFlagRequired(f, "operation-id") + f.StringVarP(&v.RunId, "run-id", "r", "", "Run ID of the Nexus Operation.") +} + +type NexusOperationStartOptions struct { + Endpoint string + Service string + Operation string + OperationId string + ScheduleToCloseTimeout cliext.FlagDuration + IdConflictPolicy cliext.FlagStringEnum + IdReusePolicy cliext.FlagStringEnum + FlagSet *pflag.FlagSet +} + +func (v *NexusOperationStartOptions) BuildFlags(f *pflag.FlagSet) { + v.FlagSet = f + f.StringVar(&v.Endpoint, "endpoint", "", "Nexus Endpoint name. Required.") + _ = cobra.MarkFlagRequired(f, "endpoint") + f.StringVar(&v.Service, "service", "", "Nexus Service name. Required.") + _ = cobra.MarkFlagRequired(f, "service") + f.StringVar(&v.Operation, "operation", "", "Nexus Operation name. Required.") + _ = cobra.MarkFlagRequired(f, "operation") + f.StringVar(&v.OperationId, "operation-id", "", "Nexus Operation ID. If not supplied, a unique ID is generated.") + v.ScheduleToCloseTimeout = 0 + f.Var(&v.ScheduleToCloseTimeout, "schedule-to-close-timeout", "Total time the operation is allowed to run.") + v.IdConflictPolicy = cliext.NewFlagStringEnum([]string{"Fail", "UseExisting", "TerminateExisting"}, "") + f.Var(&v.IdConflictPolicy, "id-conflict-policy", "Policy for handling an Operation ID conflict with a running operation. Accepted values: Fail, UseExisting, TerminateExisting.") + v.IdReusePolicy = cliext.NewFlagStringEnum([]string{"AllowDuplicate", "RejectDuplicate"}, "") + f.Var(&v.IdReusePolicy, "id-reuse-policy", "Policy for re-using an Operation ID from a previously closed operation. Accepted values: AllowDuplicate, RejectDuplicate.") +} + type QueryModifiersOptions struct { RejectCondition cliext.FlagStringEnum Headers []string @@ -461,6 +502,7 @@ func NewTemporalCommand(cctx *CommandContext) *TemporalCommand { s.Command.AddCommand(&NewTemporalBatchCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalConfigCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalEnvCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalNexusCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalOperatorCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalScheduleCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalServerCommand(cctx, &s).Command) @@ -1369,6 +1411,285 @@ func NewTemporalEnvSetCommand(cctx *CommandContext, parent *TemporalEnvCommand) return &s } +type TemporalNexusCommand struct { + Parent *TemporalCommand + Command cobra.Command + cliext.ClientOptions +} + +func NewTemporalNexusCommand(cctx *CommandContext, parent *TemporalCommand) *TemporalNexusCommand { + var s TemporalNexusCommand + s.Parent = parent + s.Command.Use = "nexus" + s.Command.Short = "Start, list, and operate on Nexus Operations" + if hasHighlighting { + s.Command.Long = "Nexus Operation commands perform operations on Standalone Nexus\nOperation Executions:\n\n\x1b[1mtemporal nexus [command] [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal nexus operation list\x1b[0m" + } else { + s.Command.Long = "Nexus Operation commands perform operations on Standalone Nexus\nOperation Executions:\n\n```\ntemporal nexus [command] [options]\n```\n\nFor example:\n\n```\ntemporal nexus operation list\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalNexusOperationCommand(cctx, &s).Command) + s.ClientOptions.BuildFlags(s.Command.PersistentFlags()) + return &s +} + +type TemporalNexusOperationCommand struct { + Parent *TemporalNexusCommand + Command cobra.Command +} + +func NewTemporalNexusOperationCommand(cctx *CommandContext, parent *TemporalNexusCommand) *TemporalNexusOperationCommand { + var s TemporalNexusOperationCommand + s.Parent = parent + s.Command.Use = "operation" + s.Command.Short = "Commands for managing Standalone Nexus Operations" + if hasHighlighting { + s.Command.Long = "These commands manage Standalone Nexus Operation Executions.\n\nNexus Operation commands follow this syntax:\n\n\x1b[1mtemporal nexus operation [command] [options]\x1b[0m" + } else { + s.Command.Long = "These commands manage Standalone Nexus Operation Executions.\n\nNexus Operation commands follow this syntax:\n\n```\ntemporal nexus operation [command] [options]\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalNexusOperationCancelCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalNexusOperationCountCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalNexusOperationDescribeCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalNexusOperationExecuteCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalNexusOperationListCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalNexusOperationResultCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalNexusOperationStartCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalNexusOperationTerminateCommand(cctx, &s).Command) + return &s +} + +type TemporalNexusOperationCancelCommand struct { + Parent *TemporalNexusOperationCommand + Command cobra.Command + NexusOperationReferenceOptions + Reason string +} + +func NewTemporalNexusOperationCancelCommand(cctx *CommandContext, parent *TemporalNexusOperationCommand) *TemporalNexusOperationCancelCommand { + var s TemporalNexusOperationCancelCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "cancel [flags]" + s.Command.Short = "Request cancellation of a Standalone Nexus Operation (Experimental)" + if hasHighlighting { + s.Command.Long = "Request cancellation of a Standalone Nexus Operation.\n\n\x1b[1mtemporal nexus operation cancel \\\n --operation-id YourOperationId\x1b[0m\n\nThe Operation handler determines how to handle the\ncancellation request." + } else { + s.Command.Long = "Request cancellation of a Standalone Nexus Operation.\n\n```\ntemporal nexus operation cancel \\\n --operation-id YourOperationId\n```\n\nThe Operation handler determines how to handle the\ncancellation request." + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for cancellation.") + s.NexusOperationReferenceOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalNexusOperationCountCommand struct { + Parent *TemporalNexusOperationCommand + Command cobra.Command + Query string +} + +func NewTemporalNexusOperationCountCommand(cctx *CommandContext, parent *TemporalNexusOperationCommand) *TemporalNexusOperationCountCommand { + var s TemporalNexusOperationCountCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "count [flags]" + s.Command.Short = "Count Standalone Nexus Operations matching a query (Experimental)" + if hasHighlighting { + s.Command.Long = "Return a count of Standalone Nexus Operations. Use \x1b[1m--query\x1b[0m\nto filter the operations to be counted.\n\n\x1b[1mtemporal nexus operation count \\\n --query 'NexusEndpoint=\"YourEndpoint\"'\x1b[0m\n\nVisit https://docs.temporal.io/visibility to read more about\nSearch Attributes and queries." + } else { + s.Command.Long = "Return a count of Standalone Nexus Operations. Use `--query`\nto filter the operations to be counted.\n\n```\ntemporal nexus operation count \\\n --query 'NexusEndpoint=\"YourEndpoint\"'\n```\n\nVisit https://docs.temporal.io/visibility to read more about\nSearch Attributes and queries." + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Query to filter Nexus Operation Executions to count.") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalNexusOperationDescribeCommand struct { + Parent *TemporalNexusOperationCommand + Command cobra.Command + NexusOperationReferenceOptions + Raw bool +} + +func NewTemporalNexusOperationDescribeCommand(cctx *CommandContext, parent *TemporalNexusOperationCommand) *TemporalNexusOperationDescribeCommand { + var s TemporalNexusOperationDescribeCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "describe [flags]" + s.Command.Short = "Show detailed info for a Standalone Nexus Operation (Experimental)" + if hasHighlighting { + s.Command.Long = "Display detailed information about a specific Standalone Nexus\nOperation Execution.\n\n\x1b[1mtemporal nexus operation describe \\\n --operation-id YourOperationId\x1b[0m" + } else { + s.Command.Long = "Display detailed information about a specific Standalone Nexus\nOperation Execution.\n\n```\ntemporal nexus operation describe \\\n --operation-id YourOperationId\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().BoolVar(&s.Raw, "raw", false, "Print properties without changing their format.") + s.NexusOperationReferenceOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalNexusOperationExecuteCommand struct { + Parent *TemporalNexusOperationCommand + Command cobra.Command + NexusOperationStartOptions + PayloadInputOptions +} + +func NewTemporalNexusOperationExecuteCommand(cctx *CommandContext, parent *TemporalNexusOperationCommand) *TemporalNexusOperationExecuteCommand { + var s TemporalNexusOperationExecuteCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "execute [flags]" + s.Command.Short = "Start a new Standalone Nexus Operation and wait for its result (Experimental)" + if hasHighlighting { + s.Command.Long = "Start a new Standalone Nexus Operation Execution and block until\nit completes. The result is output to stdout.\n\n\x1b[1mtemporal nexus operation execute \\\n --endpoint YourEndpoint \\\n --service YourService \\\n --operation YourOperation \\\n --operation-id YourOperationId \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + } else { + s.Command.Long = "Start a new Standalone Nexus Operation Execution and block until\nit completes. The result is output to stdout.\n\n```\ntemporal nexus operation execute \\\n --endpoint YourEndpoint \\\n --service YourService \\\n --operation YourOperation \\\n --operation-id YourOperationId \\\n --input '{\"some-key\": \"some-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.NexusOperationStartOptions.BuildFlags(s.Command.Flags()) + s.PayloadInputOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalNexusOperationListCommand struct { + Parent *TemporalNexusOperationCommand + Command cobra.Command + Query string + Limit int + PageSize int +} + +func NewTemporalNexusOperationListCommand(cctx *CommandContext, parent *TemporalNexusOperationCommand) *TemporalNexusOperationListCommand { + var s TemporalNexusOperationListCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "list [flags]" + s.Command.Short = "List Standalone Nexus Operations matching a query (Experimental)" + if hasHighlighting { + s.Command.Long = "List Standalone Nexus Operations. Use \x1b[1m--query\x1b[0m to filter results.\n\n\x1b[1mtemporal nexus operation list \\\n --query 'NexusEndpoint=\"YourEndpoint\"'\x1b[0m\n\nVisit https://docs.temporal.io/visibility to read more about\nSearch Attributes and queries." + } else { + s.Command.Long = "List Standalone Nexus Operations. Use `--query` to filter results.\n\n```\ntemporal nexus operation list \\\n --query 'NexusEndpoint=\"YourEndpoint\"'\n```\n\nVisit https://docs.temporal.io/visibility to read more about\nSearch Attributes and queries." + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Query to filter the Nexus Operation Executions to list.") + s.Command.Flags().IntVar(&s.Limit, "limit", 0, "Maximum number of Nexus Operation Executions to display.") + s.Command.Flags().IntVar(&s.PageSize, "page-size", 0, "Maximum number of Nexus Operation Executions to fetch at a time from the server.") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalNexusOperationResultCommand struct { + Parent *TemporalNexusOperationCommand + Command cobra.Command + NexusOperationReferenceOptions +} + +func NewTemporalNexusOperationResultCommand(cctx *CommandContext, parent *TemporalNexusOperationCommand) *TemporalNexusOperationResultCommand { + var s TemporalNexusOperationResultCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "result [flags]" + s.Command.Short = "Wait for and output the result of a Standalone Nexus Operation (Experimental)" + if hasHighlighting { + s.Command.Long = "Wait for a Standalone Nexus Operation to complete and output\nthe result.\n\n\x1b[1mtemporal nexus operation result \\\n --operation-id YourOperationId\x1b[0m" + } else { + s.Command.Long = "Wait for a Standalone Nexus Operation to complete and output\nthe result.\n\n```\ntemporal nexus operation result \\\n --operation-id YourOperationId\n```" + } + s.Command.Args = cobra.NoArgs + s.NexusOperationReferenceOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalNexusOperationStartCommand struct { + Parent *TemporalNexusOperationCommand + Command cobra.Command + NexusOperationStartOptions + PayloadInputOptions +} + +func NewTemporalNexusOperationStartCommand(cctx *CommandContext, parent *TemporalNexusOperationCommand) *TemporalNexusOperationStartCommand { + var s TemporalNexusOperationStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "start [flags]" + s.Command.Short = "Start a new Standalone Nexus Operation (Experimental)" + if hasHighlighting { + s.Command.Long = "Start a new Standalone Nexus Operation. Outputs the\nOperation ID and Run ID.\n\n\x1b[1mtemporal nexus operation start \\\n --endpoint YourEndpoint \\\n --service YourService \\\n --operation YourOperation \\\n --operation-id YourOperationId \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + } else { + s.Command.Long = "Start a new Standalone Nexus Operation. Outputs the\nOperation ID and Run ID.\n\n```\ntemporal nexus operation start \\\n --endpoint YourEndpoint \\\n --service YourService \\\n --operation YourOperation \\\n --operation-id YourOperationId \\\n --input '{\"some-key\": \"some-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.NexusOperationStartOptions.BuildFlags(s.Command.Flags()) + s.PayloadInputOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalNexusOperationTerminateCommand struct { + Parent *TemporalNexusOperationCommand + Command cobra.Command + NexusOperationReferenceOptions + Reason string +} + +func NewTemporalNexusOperationTerminateCommand(cctx *CommandContext, parent *TemporalNexusOperationCommand) *TemporalNexusOperationTerminateCommand { + var s TemporalNexusOperationTerminateCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "terminate [flags]" + s.Command.Short = "Forcefully end a Standalone Nexus Operation (Experimental)" + if hasHighlighting { + s.Command.Long = "Terminate a Standalone Nexus Operation.\n\n\x1b[1mtemporal nexus operation terminate \\\n --operation-id YourOperationId \\\n --reason YourReason\x1b[0m\n\nOperation handlers cannot see or respond to terminations." + } else { + s.Command.Long = "Terminate a Standalone Nexus Operation.\n\n```\ntemporal nexus operation terminate \\\n --operation-id YourOperationId \\\n --reason YourReason\n```\n\nOperation handlers cannot see or respond to terminations." + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for termination. Defaults to a message with the current user's name.") + s.NexusOperationReferenceOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalOperatorCommand struct { Parent *TemporalCommand Command cobra.Command diff --git a/internal/temporalcli/commands.nexus_operation.go b/internal/temporalcli/commands.nexus_operation.go new file mode 100644 index 000000000..4e58c11bf --- /dev/null +++ b/internal/temporalcli/commands.nexus_operation.go @@ -0,0 +1,501 @@ +package temporalcli + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/fatih/color" + "github.com/google/uuid" + "github.com/temporalio/cli/internal/printer" + "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/failure/v1" + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" +) + +func (c *TemporalNexusOperationStartCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + handle, err := startNexusOperation(cctx, cl, &c.NexusOperationStartOptions, &c.PayloadInputOptions) + if err != nil { + return err + } + return printNexusOperationExecution(cctx, &c.NexusOperationStartOptions, handle.GetID(), handle.GetRunID(), c.Parent.Parent.Namespace) +} + +func (c *TemporalNexusOperationExecuteCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + handle, err := startNexusOperation(cctx, cl, &c.NexusOperationStartOptions, &c.PayloadInputOptions) + if err != nil { + return err + } + if !cctx.JSONOutput { + if err := printNexusOperationExecution(cctx, &c.NexusOperationStartOptions, handle.GetID(), handle.GetRunID(), c.Parent.Parent.Namespace); err != nil { + cctx.Logger.Error("Failed printing execution info", "error", err) + } + } + return getNexusOperationResult(cctx, cl, c.Parent.Parent.Namespace, handle.GetID(), handle.GetRunID()) +} + +func (c *TemporalNexusOperationResultCommand) run(cctx *CommandContext, _ []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + return getNexusOperationResult(cctx, cl, c.Parent.Parent.Namespace, c.OperationId, c.RunId) +} + +func startNexusOperation( + cctx *CommandContext, + cl client.Client, + opts *NexusOperationStartOptions, + inputOpts *PayloadInputOptions, +) (client.NexusOperationHandle, error) { + nexusCl, startOpts, err := buildNexusStartOptions(opts, inputOpts) + if err != nil { + return nil, err + } + nCl, err := cl.NewNexusClient(nexusCl) + if err != nil { + return nil, err + } + handle, err := nCl.ExecuteOperation(cctx, startOpts.operation, startOpts.input, startOpts.options) + if err != nil { + return nil, fmt.Errorf("failed starting nexus operation: %w", err) + } + return handle, nil +} + +func printNexusOperationExecution(cctx *CommandContext, opts *NexusOperationStartOptions, operationID, runID, namespace string) error { + if !cctx.JSONOutput { + cctx.Printer.Println(color.MagentaString("Started Nexus Operation:")) + } + return cctx.Printer.PrintStructured(struct { + Endpoint string `json:"endpoint"` + Service string `json:"service"` + Operation string `json:"operation"` + OperationId string `json:"operationId"` + RunId string `json:"runId"` + Namespace string `json:"namespace"` + }{ + Endpoint: opts.Endpoint, + Service: opts.Service, + Operation: opts.Operation, + OperationId: operationID, + RunId: runID, + Namespace: namespace, + }, printer.StructuredOptions{}) +} + +func getNexusOperationResult(cctx *CommandContext, cl client.Client, namespace, operationID, runID string) error { + resp, err := pollNexusOperationOutcome(cctx, cl, namespace, operationID, runID) + if err != nil { + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + return fmt.Errorf("nexus operation not found: %s", operationID) + } + return fmt.Errorf("failed polling nexus operation result: %w", err) + } + + // Use the run ID from the response if the caller didn't supply one. + if runID == "" { + runID = resp.GetRunId() + } + + switch v := resp.GetOutcome().(type) { + case *workflowservice.PollNexusOperationExecutionResponse_Result: + return printNexusOperationResult(cctx, operationID, runID, v.Result) + case *workflowservice.PollNexusOperationExecutionResponse_Failure: + if err := printNexusOperationFailure(cctx, operationID, runID, v.Failure); err != nil { + cctx.Logger.Error("Nexus operation failed, and printing the output also failed", "error", err) + } + return fmt.Errorf("nexus operation failed") + default: + return fmt.Errorf("unexpected nexus operation outcome type: %T", v) + } +} + +// Matches the SDK's pollNexusOperationTimeout in internal_nexus_client.go. +const pollNexusOperationTimeout = 60 * time.Second + +// pollNexusOperationOutcome polls for a nexus operation result using a +// hand-rolled loop rather than handle.Get() because handle.Get() deserializes +// the result into a Go value and converts failures to Go errors, losing the +// raw proto payloads. +func pollNexusOperationOutcome(cctx *CommandContext, cl client.Client, namespace, operationID, runID string) (*workflowservice.PollNexusOperationExecutionResponse, error) { + for { + pollCtx, cancel := context.WithTimeout(cctx, pollNexusOperationTimeout) + resp, err := cl.WorkflowService().PollNexusOperationExecution(pollCtx, &workflowservice.PollNexusOperationExecutionRequest{ + Namespace: namespace, + OperationId: operationID, + RunId: runID, + WaitStage: enumspb.NEXUS_OPERATION_WAIT_STAGE_CLOSED, + }) + if err != nil { + // check pollCtx.Err() first because it is set by cancel() + pollTimedOut := pollCtx.Err() != nil + cancel() + if cctx.Err() != nil { + return nil, cctx.Err() + } + if pollTimedOut { + continue + } + return nil, err + } + cancel() + if resp.GetOutcome() != nil { + return resp, nil + } + } +} + +func printNexusOperationResult(cctx *CommandContext, operationID, runID string, result *common.Payload) error { + if cctx.JSONOutput { + var resultJSON json.RawMessage + var err error + if cctx.JSONShorthandPayloads { + var valuePtr any + if err = converter.GetDefaultDataConverter().FromPayload(result, &valuePtr); err != nil { + return fmt.Errorf("nexus operation completed, but failed decoding result for json output: %w", err) + } + resultJSON, err = json.Marshal(valuePtr) + } else { + resultJSON, err = cctx.MarshalProtoJSON(result) + } + if err != nil { + return fmt.Errorf("nexus operation completed, but failed marshaling result for json output: %w", err) + } + return cctx.Printer.PrintStructured(struct { + OperationId string `json:"operationId"` + RunId string `json:"runId"` + Status string `json:"status"` + Result json.RawMessage `json:"result"` + }{ + OperationId: operationID, + RunId: runID, + Status: "COMPLETED", + Result: resultJSON, + }, printer.StructuredOptions{}) + } + + cctx.Printer.Println(color.MagentaString("Results:")) + var valuePtr any + if err := converter.GetDefaultDataConverter().FromPayload(result, &valuePtr); err != nil { + return fmt.Errorf("nexus operation completed, but failed decoding result: %w", err) + } + resultJSON, err := json.Marshal(valuePtr) + if err != nil { + return fmt.Errorf("nexus operation completed, but failed marshaling result: %w", err) + } + return cctx.Printer.PrintStructured(struct { + Status string + Result json.RawMessage `cli:",cardOmitEmpty"` + }{ + Status: color.GreenString("COMPLETED"), + Result: resultJSON, + }, printer.StructuredOptions{}) +} + +func printNexusOperationFailure(cctx *CommandContext, operationID, runID string, f *failure.Failure) error { + if cctx.JSONOutput { + failureJSON, err := cctx.MarshalProtoJSON(f) + if err != nil { + return fmt.Errorf("nexus operation failed, but failed marshaling failure for json output: %w", err) + } + return cctx.Printer.PrintStructured(struct { + OperationId string `json:"operationId"` + RunId string `json:"runId"` + Status string `json:"status"` + Failure json.RawMessage `json:"failure"` + }{ + OperationId: operationID, + RunId: runID, + Status: "FAILED", + Failure: failureJSON, + }, printer.StructuredOptions{}) + } + + cctx.Printer.Println(color.MagentaString("Results:")) + return cctx.Printer.PrintStructured(struct { + Status string + Failure string `cli:",cardOmitEmpty"` + }{ + Status: color.RedString("FAILED"), + Failure: cctx.MarshalFriendlyFailureBodyText(f, " "), + }, printer.StructuredOptions{}) +} + +func (c *TemporalNexusOperationDescribeCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + handle := cl.GetNexusOperationHandle(client.GetNexusOperationHandleOptions{ + OperationID: c.OperationId, + RunID: c.RunId, + }) + + desc, err := handle.Describe(cctx, client.DescribeNexusOperationOptions{}) + if err != nil { + return fmt.Errorf("failed describing nexus operation: %w", err) + } + + if c.Raw || cctx.JSONOutput { + return cctx.Printer.PrintStructured(desc.RawInfo, printer.StructuredOptions{}) + } + return printNexusOperationDescription(cctx, desc) +} + +func printNexusOperationDescription(cctx *CommandContext, desc *client.NexusOperationExecutionDescription) error { + summary, _ := desc.GetSummary() + d := struct { + OperationId string + RunId string + Endpoint string + Service string + Operation string + Status string + State string `cli:",cardOmitEmpty"` + Attempt int32 + ScheduleToCloseTimeout time.Duration `cli:",cardOmitEmpty"` + ScheduledTime time.Time `cli:",cardOmitEmpty"` + CloseTime time.Time `cli:",cardOmitEmpty"` + ExpirationTime time.Time `cli:",cardOmitEmpty"` + BlockedReason string `cli:",cardOmitEmpty"` + OperationToken string `cli:",cardOmitEmpty"` + Identity string `cli:",cardOmitEmpty"` + Summary string `cli:",cardOmitEmpty"` + }{ + OperationId: desc.OperationID, + RunId: desc.OperationRunID, + Endpoint: desc.Endpoint, + Service: desc.Service, + Operation: desc.Operation, + Status: desc.Status.String(), + State: desc.State.String(), + Attempt: desc.Attempt, + ScheduleToCloseTimeout: desc.ScheduleToCloseTimeout, + ScheduledTime: desc.ScheduledTime, + CloseTime: desc.CloseTime, + ExpirationTime: desc.ExpirationTime, + BlockedReason: desc.BlockedReason, + OperationToken: desc.OperationToken, + Identity: desc.Identity, + Summary: summary, + } + return cctx.Printer.PrintStructured(d, printer.StructuredOptions{}) +} + +func (c *TemporalNexusOperationCancelCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + handle := cl.GetNexusOperationHandle(client.GetNexusOperationHandleOptions{ + OperationID: c.OperationId, + RunID: c.RunId, + }) + + err = handle.Cancel(cctx, client.CancelNexusOperationOptions{ + Reason: c.Reason, + }) + if err != nil { + return fmt.Errorf("failed to request nexus operation cancellation: %w", err) + } + cctx.Printer.Println("Cancellation requested") + return nil +} + +func (c *TemporalNexusOperationTerminateCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + reason := c.Reason + if reason == "" { + reason = defaultReason() + } + handle := cl.GetNexusOperationHandle(client.GetNexusOperationHandleOptions{ + OperationID: c.OperationId, + RunID: c.RunId, + }) + if err := handle.Terminate(cctx, client.TerminateNexusOperationOptions{Reason: reason}); err != nil { + return fmt.Errorf("failed to terminate nexus operation: %w", err) + } + cctx.Printer.Println("Nexus Operation terminated") + return nil +} + +func (c *TemporalNexusOperationListCommand) run(cctx *CommandContext, _ []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + if c.Limit > 0 && c.Limit < c.PageSize { + c.PageSize = c.Limit + } + + cctx.Printer.StartList() + defer cctx.Printer.EndList() + + var nextPageToken []byte + var opsProcessed int + for pageIndex := 0; ; pageIndex++ { + resp, err := cl.WorkflowService().ListNexusOperationExecutions(cctx, &workflowservice.ListNexusOperationExecutionsRequest{ + Namespace: c.Parent.Parent.Namespace, + PageSize: int32(c.PageSize), + NextPageToken: nextPageToken, + Query: c.Query, + }) + if err != nil { + return fmt.Errorf("failed listing nexus operations: %w", err) + } + var textTable []map[string]any + for _, op := range resp.GetOperations() { + if c.Limit > 0 && opsProcessed >= c.Limit { + break + } + opsProcessed++ + if cctx.JSONOutput { + _ = cctx.Printer.PrintStructured(op, printer.StructuredOptions{}) + } else { + textTable = append(textTable, map[string]any{ + "Status": op.Status, + "OperationId": op.OperationId, + "Endpoint": op.Endpoint, + "Service": op.Service, + "Operation": op.Operation, + "StartTime": op.ScheduleTime.AsTime(), + }) + } + } + if len(textTable) > 0 { + _ = cctx.Printer.PrintStructured(textTable, printer.StructuredOptions{ + Fields: []string{"Status", "OperationId", "Endpoint", "Service", "Operation", "StartTime"}, + Table: &printer.TableOptions{NoHeader: pageIndex > 0}, + }) + } + nextPageToken = resp.GetNextPageToken() + if len(nextPageToken) == 0 || (c.Limit > 0 && opsProcessed >= c.Limit) { + return nil + } + } +} + +func (c *TemporalNexusOperationCountCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + resp, err := cl.WorkflowService().CountNexusOperationExecutions(cctx, &workflowservice.CountNexusOperationExecutionsRequest{ + Namespace: c.Parent.Parent.Namespace, + Query: c.Query, + }) + if err != nil { + return fmt.Errorf("failed counting nexus operations: %w", err) + } + groups := make([]countGroup, len(resp.Groups)) + for i, g := range resp.Groups { + groups[i] = g + } + if cctx.JSONOutput { + stripCountGroupMetadataType(groups) + return cctx.Printer.PrintStructured(resp, printer.StructuredOptions{}) + } + cctx.Printer.Printlnf("Total: %v", resp.Count) + printCountGroupsText(cctx, groups) + return nil +} + +// nexusStartInput holds the parsed inputs for starting a Nexus operation. +type nexusStartInput struct { + operation string + input any + options client.StartNexusOperationOptions +} + +func buildNexusStartOptions(s *NexusOperationStartOptions, p *PayloadInputOptions) (client.NexusClientOptions, *nexusStartInput, error) { + nexusCl := client.NexusClientOptions{ + Endpoint: s.Endpoint, + Service: s.Service, + } + + operationID := s.OperationId + if operationID == "" { + operationID = uuid.NewString() + } + + opts := client.StartNexusOperationOptions{ + ID: operationID, + ScheduleToCloseTimeout: s.ScheduleToCloseTimeout.Duration(), + } + + if s.IdConflictPolicy.Value != "" { + v, err := stringToProtoEnum[enumspb.NexusOperationIdConflictPolicy]( + s.IdConflictPolicy.Value, + enumspb.NexusOperationIdConflictPolicy_shorthandValue, + enumspb.NexusOperationIdConflictPolicy_value) + if err != nil { + return nexusCl, nil, fmt.Errorf("invalid id-conflict-policy: %w", err) + } + opts.IDConflictPolicy = v + } + + if s.IdReusePolicy.Value != "" { + v, err := stringToProtoEnum[enumspb.NexusOperationIdReusePolicy]( + s.IdReusePolicy.Value, + enumspb.NexusOperationIdReusePolicy_shorthandValue, + enumspb.NexusOperationIdReusePolicy_value) + if err != nil { + return nexusCl, nil, fmt.Errorf("invalid id-reuse-policy: %w", err) + } + opts.IDReusePolicy = v + } + + // Build input payload + var input any + rawInput, err := p.buildRawInput() + if err != nil { + return nexusCl, nil, err + } + if len(rawInput) > 1 { + return nexusCl, nil, fmt.Errorf("nexus operations accept at most one input argument, got %d", len(rawInput)) + } + if len(rawInput) == 1 { + input = rawInput[0] + } + + return nexusCl, &nexusStartInput{ + operation: s.Operation, + input: input, + options: opts, + }, nil +} diff --git a/internal/temporalcli/commands.nexus_operation_test.go b/internal/temporalcli/commands.nexus_operation_test.go new file mode 100644 index 000000000..277786955 --- /dev/null +++ b/internal/temporalcli/commands.nexus_operation_test.go @@ -0,0 +1,728 @@ +package temporalcli_test + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "testing" + "time" + + "github.com/google/uuid" + nexus "github.com/nexus-rpc/sdk-go/nexus" + "github.com/stretchr/testify/require" + nexuspb "go.temporal.io/api/nexus/v1" + "go.temporal.io/api/operatorservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporalnexus" + "go.temporal.io/sdk/workflow" +) + +func (s *SharedServerSuite) setupNexusEndpointAndWorker(t *testing.T) (string, *DevWorker) { + handlerWorkflow := func(ctx workflow.Context, input string) (string, error) { + return "got: " + input, nil + } + return s.setupNexusEndpointWithWorkflow(t, handlerWorkflow, "handler-") +} + +func (s *SharedServerSuite) setupNexusEndpointWithWorkflow( + t *testing.T, + handlerWorkflow func(workflow.Context, string) (string, error), + workflowIDPrefix string, +) (string, *DevWorker) { + endpointName := "test-ep-" + uuid.NewString()[:8] + + op := temporalnexus.NewWorkflowRunOperation( + "test-op", + handlerWorkflow, + func(ctx context.Context, input string, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + return client.StartWorkflowOptions{ + ID: workflowIDPrefix + opts.RequestID, + }, nil + }, + ) + svc := nexus.NewService("test-service") + require.NoError(t, svc.Register(op)) + + w := s.DevServer.StartDevWorker(t, DevWorkerOptions{ + Workflows: []any{handlerWorkflow}, + NexusServices: []*nexus.Service{svc}, + }) + + _, err := s.Client.OperatorService().CreateNexusEndpoint(s.Context, &operatorservice.CreateNexusEndpointRequest{ + Spec: &nexuspb.EndpointSpec{ + Name: endpointName, + Target: &nexuspb.EndpointTarget{ + Variant: &nexuspb.EndpointTarget_Worker_{ + Worker: &nexuspb.EndpointTarget_Worker{ + Namespace: s.Namespace(), + TaskQueue: w.Options.TaskQueue, + }, + }, + }, + }, + }) + require.NoError(t, err) + + return endpointName, w +} + +func (s *SharedServerSuite) TestNexusOperationStart() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "start-op-" + uuid.NewString()[:8] + + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "Started Nexus Operation") + s.Contains(res.Stdout.String(), opID) +} + +func (s *SharedServerSuite) TestNexusOperationExecute() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "exec-op-" + uuid.NewString()[:8] + + res := s.Execute( + "nexus", "operation", "execute", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "got: hello") +} + +func (s *SharedServerSuite) TestNexusOperationExecute_JSON() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "exec-json-op-" + uuid.NewString()[:8] + + res := s.Execute( + "nexus", "operation", "execute", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + "--output", "json", + ) + s.NoError(res.Err) + + var result struct { + OperationId string `json:"operationId"` + RunId string `json:"runId"` + Result json.RawMessage `json:"result"` + } + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &result)) + s.Equal(opID, result.OperationId) + s.Contains(string(result.Result), "got: hello") +} + +func (s *SharedServerSuite) TestNexusOperationDescribe() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "desc-op-" + uuid.NewString()[:8] + + // Start an operation first. + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + // Describe it. + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "describe", + "--address", s.Address(), + "--operation-id", opID, + ) + return res.Err == nil + }, 30*time.Second, 500*time.Millisecond) + + s.Contains(res.Stdout.String(), opID) + s.Contains(res.Stdout.String(), endpointName) + s.Contains(res.Stdout.String(), "test-service") +} + +func (s *SharedServerSuite) TestNexusOperationDescribe_JSON() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "desc-json-op-" + uuid.NewString()[:8] + + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "describe", + "--address", s.Address(), + "--operation-id", opID, + "--output", "json", + ) + return res.Err == nil + }, 30*time.Second, 500*time.Millisecond) + + s.NoError(res.Err) + s.Contains(res.Stdout.String(), opID) +} + +func (s *SharedServerSuite) TestNexusOperationCancel() { + blockingHandler := func(ctx workflow.Context, input string) (string, error) { + ctx.Done().Receive(ctx, nil) + return "", ctx.Err() + } + endpointName, w := s.setupNexusEndpointWithWorkflow(s.T(), blockingHandler, "cancel-handler-") + defer w.Stop() + + opID := "cancel-op-" + uuid.NewString()[:8] + + // Start operation. + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + // Cancel it. + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "cancel", + "--address", s.Address(), + "--operation-id", opID, + "--reason", "testing cancellation", + ) + return res.Err == nil + }, 30*time.Second, 500*time.Millisecond) + + s.Contains(res.Stdout.String(), "Cancellation requested") +} + +func (s *SharedServerSuite) TestNexusOperationTerminate() { + blockingHandler := func(ctx workflow.Context, input string) (string, error) { + ctx.Done().Receive(ctx, nil) + return "", ctx.Err() + } + endpointName, w := s.setupNexusEndpointWithWorkflow(s.T(), blockingHandler, "term-handler-") + defer w.Stop() + + opID := "term-op-" + uuid.NewString()[:8] + + // Start operation. + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + // Terminate it. + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "terminate", + "--address", s.Address(), + "--operation-id", opID, + "--reason", "testing termination", + ) + return res.Err == nil + }, 30*time.Second, 500*time.Millisecond) + + s.Contains(res.Stdout.String(), "Nexus Operation terminated") +} + +func (s *SharedServerSuite) TestNexusOperationList() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + // Start a couple of operations. + for i := 0; i < 2; i++ { + opID := fmt.Sprintf("list-op-%d-%s", i, uuid.NewString()[:8]) + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + } + + // List operations — wait until both are visible. + var res *CommandResult + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "list", + "--address", s.Address(), + ) + out := res.Stdout.String() + return res.Err == nil && + strings.Contains(out, "list-op-0") && + strings.Contains(out, "list-op-1") + }, 30*time.Second, 500*time.Millisecond) +} + +func (s *SharedServerSuite) TestNexusOperationCount() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "count-op-" + uuid.NewString()[:8] + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + // Count operations. + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "count", + "--address", s.Address(), + ) + return res.Err == nil && res.Stdout.String() != "" + }, 30*time.Second, 500*time.Millisecond) + + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "Total:") +} + +func (s *SharedServerSuite) TestNexusOperationResult() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "result-op-" + uuid.NewString()[:8] + + // Start an operation. + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + // Get result (blocks until completed). + res = s.Execute( + "nexus", "operation", "result", + "--address", s.Address(), + "--operation-id", opID, + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "got: hello") +} + +func (s *SharedServerSuite) TestNexusOperationResult_JSON() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "result-json-op-" + uuid.NewString()[:8] + + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + res = s.Execute( + "nexus", "operation", "result", + "--address", s.Address(), + "--operation-id", opID, + "--output", "json", + ) + s.NoError(res.Err) + + var result struct { + OperationId string `json:"operationId"` + Result json.RawMessage `json:"result"` + } + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &result)) + s.Equal(opID, result.OperationId) + s.Contains(string(result.Result), "got: hello") +} + +func (s *SharedServerSuite) TestNexusOperationStart_JSON() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "start-json-op-" + uuid.NewString()[:8] + + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + "--output", "json", + ) + s.NoError(res.Err) + + var result struct { + Endpoint string `json:"endpoint"` + Service string `json:"service"` + Operation string `json:"operation"` + OperationId string `json:"operationId"` + RunId string `json:"runId"` + } + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &result)) + s.Equal(endpointName, result.Endpoint) + s.Equal("test-service", result.Service) + s.Equal("test-op", result.Operation) + s.Equal(opID, result.OperationId) + s.NotEmpty(result.RunId) +} + +func (s *SharedServerSuite) TestNexusOperationStart_ServerGeneratedID() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--input", `"hello"`, + "--output", "json", + ) + s.NoError(res.Err) + + var result struct { + OperationId string `json:"operationId"` + } + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &result)) + s.NotEmpty(result.OperationId, "server should generate an operation ID") +} + +func (s *SharedServerSuite) TestNexusOperationStart_ScheduleToCloseTimeout() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "timeout-op-" + uuid.NewString()[:8] + + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--schedule-to-close-timeout", "30m", + "--input", `"hello"`, + "--output", "json", + ) + s.NoError(res.Err) + + var result struct { + OperationId string `json:"operationId"` + } + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &result)) + s.Equal(opID, result.OperationId) + + // Verify the timeout is reflected in describe. + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "describe", + "--address", s.Address(), + "--operation-id", opID, + "--output", "json", + ) + return res.Err == nil + }, 30*time.Second, 500*time.Millisecond) + s.Contains(res.Stdout.String(), opID) +} + +func (s *SharedServerSuite) TestNexusOperationStart_IdConflictPolicy() { + // Use a blocking handler so the operation stays running across the three + // start calls — IDConflictPolicy only triggers against an open operation. + blockingHandler := func(ctx workflow.Context, input string) (string, error) { + ctx.Done().Receive(ctx, nil) + return "", ctx.Err() + } + endpointName, w := s.setupNexusEndpointWithWorkflow(s.T(), blockingHandler, "conflict-handler-") + defer w.Stop() + + opID := "conflict-op-" + uuid.NewString()[:8] + + // Start first operation. + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + // Start again with same ID and UseExisting policy — should succeed. + res = s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--id-conflict-policy", "UseExisting", + "--input", `"hello"`, + ) + s.NoError(res.Err) + + // Start again with same ID and Fail policy — should fail. + res = s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--id-conflict-policy", "Fail", + "--input", `"hello"`, + ) + s.Error(res.Err) +} + +func (s *SharedServerSuite) TestNexusOperationList_JSON() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "list-json-op-" + uuid.NewString()[:8] + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "list", + "--address", s.Address(), + "--output", "json", + ) + return res.Err == nil && strings.Contains(res.Stdout.String(), opID) + }, 30*time.Second, 500*time.Millisecond) +} + +func (s *SharedServerSuite) TestNexusOperationList_Limit() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + // Start 3 operations. + for i := 0; i < 3; i++ { + opID := fmt.Sprintf("limit-op-%d-%s", i, uuid.NewString()[:8]) + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + } + + // List with limit=1, JSONL output so we can count entries. + var res *CommandResult + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "list", + "--address", s.Address(), + "--limit", "1", + "--output", "jsonl", + ) + return res.Err == nil && len(res.Stdout.String()) > 0 + }, 30*time.Second, 500*time.Millisecond) + + // Count JSON objects — each on its own line. + lines := 0 + for _, line := range strings.Split(strings.TrimSpace(res.Stdout.String()), "\n") { + if len(strings.TrimSpace(line)) > 0 { + lines++ + } + } + s.Equal(1, lines, "limit=1 should return exactly 1 operation") +} + +func (s *SharedServerSuite) TestNexusOperationCount_JSON() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "count-json-op-" + uuid.NewString()[:8] + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "count", + "--address", s.Address(), + "--output", "json", + ) + if res.Err != nil { + return false + } + // Proto JSON encodes int64 as a string; accept either form. + var result struct { + Count json.Number `json:"count"` + } + if err := json.Unmarshal(res.Stdout.Bytes(), &result); err != nil { + return false + } + n, err := result.Count.Int64() + return err == nil && n > 0 + }, 30*time.Second, 500*time.Millisecond) +} + +func (s *SharedServerSuite) TestNexusOperationCount_Query() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "count-query-op-" + uuid.NewString()[:8] + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + // Count with a query that matches this specific operation. + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "count", + "--address", s.Address(), + "--query", fmt.Sprintf(`OperationId = "%s"`, opID), + ) + return res.Err == nil && res.Stdout.String() != "" + }, 30*time.Second, 500*time.Millisecond) + + s.Contains(res.Stdout.String(), "Total:") +} + +func (s *SharedServerSuite) TestNexusOperationList_Query() { + endpointName, w := s.setupNexusEndpointAndWorker(s.T()) + defer w.Stop() + + opID := "query-op-" + uuid.NewString()[:8] + res := s.Execute( + "nexus", "operation", "start", + "--address", s.Address(), + "--endpoint", endpointName, + "--service", "test-service", + "--operation", "test-op", + "--operation-id", opID, + "--input", `"hello"`, + ) + s.NoError(res.Err) + + // List with query filter — wait until the operation appears. + s.Eventually(func() bool { + res = s.Execute( + "nexus", "operation", "list", + "--address", s.Address(), + "--query", fmt.Sprintf(`OperationId = "%s"`, opID), + ) + return res.Err == nil && strings.Contains(res.Stdout.String(), opID) + }, 30*time.Second, 500*time.Millisecond) +} + +func (s *SharedServerSuite) TestNexusOperationStart_MissingRequiredFlags() { + // Missing --endpoint + res := s.Execute( + "nexus", "operation", "start", + "--service", "test-service", + "--operation", "test-op", + "--operation-id", "some-id", + ) + s.Error(res.Err) + + // Missing --service + res = s.Execute( + "nexus", "operation", "start", + "--endpoint", "test-ep", + "--operation", "test-op", + "--operation-id", "some-id", + ) + s.Error(res.Err) + + // Missing --operation + res = s.Execute( + "nexus", "operation", "start", + "--endpoint", "test-ep", + "--service", "test-service", + "--operation-id", "some-id", + ) + s.Error(res.Err) +} diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index 1d383d179..488d15eaf 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -1619,6 +1619,210 @@ commands: description: Property value (required). # required: true + - name: temporal nexus + summary: Start, list, and operate on Nexus Operations + description: | + Nexus Operation commands perform operations on Standalone Nexus + Operation Executions: + + ``` + temporal nexus [command] [options] + ``` + + For example: + + ``` + temporal nexus operation list + ``` + option-sets: + - client + docs: + description-header: >- + Learn how to use Temporal Nexus commands for starting, listing, + and managing Standalone Nexus Operation Executions. + keywords: + - nexus + - nexus operation + - nexus operation start + - nexus operation execute + - nexus operation describe + - nexus operation cancel + - nexus operation terminate + - nexus operation list + - nexus operation count + - cli reference + - cli-feature + - command-line-interface-cli + - temporal cli + tags: + - Nexus + - Temporal CLI + + - name: temporal nexus operation + summary: Commands for managing Standalone Nexus Operations + description: | + These commands manage Standalone Nexus Operation Executions. + + Nexus Operation commands follow this syntax: + + ``` + temporal nexus operation [command] [options] + ``` + + - name: temporal nexus operation start + summary: Start a new Standalone Nexus Operation (Experimental) + description: | + Start a new Standalone Nexus Operation. Outputs the + Operation ID and Run ID. + + ``` + temporal nexus operation start \ + --endpoint YourEndpoint \ + --service YourService \ + --operation YourOperation \ + --operation-id YourOperationId \ + --input '{"some-key": "some-value"}' + ``` + option-sets: + - nexus-operation-start + - payload-input + + - name: temporal nexus operation execute + summary: Start a new Standalone Nexus Operation and wait for its result (Experimental) + description: | + Start a new Standalone Nexus Operation Execution and block until + it completes. The result is output to stdout. + + ``` + temporal nexus operation execute \ + --endpoint YourEndpoint \ + --service YourService \ + --operation YourOperation \ + --operation-id YourOperationId \ + --input '{"some-key": "some-value"}' + ``` + option-sets: + - nexus-operation-start + - payload-input + + - name: temporal nexus operation describe + summary: Show detailed info for a Standalone Nexus Operation (Experimental) + description: | + Display detailed information about a specific Standalone Nexus + Operation Execution. + + ``` + temporal nexus operation describe \ + --operation-id YourOperationId + ``` + option-sets: + - nexus-operation-reference + options: + - name: raw + type: bool + description: Print properties without changing their format. + + - name: temporal nexus operation cancel + summary: Request cancellation of a Standalone Nexus Operation (Experimental) + description: | + Request cancellation of a Standalone Nexus Operation. + + ``` + temporal nexus operation cancel \ + --operation-id YourOperationId + ``` + + The Operation handler determines how to handle the + cancellation request. + option-sets: + - nexus-operation-reference + options: + - name: reason + type: string + description: Reason for cancellation. + + - name: temporal nexus operation terminate + summary: Forcefully end a Standalone Nexus Operation (Experimental) + description: | + Terminate a Standalone Nexus Operation. + + ``` + temporal nexus operation terminate \ + --operation-id YourOperationId \ + --reason YourReason + ``` + + Operation handlers cannot see or respond to terminations. + option-sets: + - nexus-operation-reference + options: + - name: reason + type: string + description: | + Reason for termination. + Defaults to a message with the current user's name. + + - name: temporal nexus operation list + summary: List Standalone Nexus Operations matching a query (Experimental) + description: | + List Standalone Nexus Operations. Use `--query` to filter results. + + ``` + temporal nexus operation list \ + --query 'NexusEndpoint="YourEndpoint"' + ``` + + Visit https://docs.temporal.io/visibility to read more about + Search Attributes and queries. + options: + - name: query + short: q + type: string + description: | + Query to filter the Nexus Operation Executions to list. + - name: limit + type: int + description: | + Maximum number of Nexus Operation Executions to display. + - name: page-size + type: int + description: | + Maximum number of Nexus Operation Executions to fetch + at a time from the server. + + - name: temporal nexus operation count + summary: Count Standalone Nexus Operations matching a query (Experimental) + description: | + Return a count of Standalone Nexus Operations. Use `--query` + to filter the operations to be counted. + + ``` + temporal nexus operation count \ + --query 'NexusEndpoint="YourEndpoint"' + ``` + + Visit https://docs.temporal.io/visibility to read more about + Search Attributes and queries. + options: + - name: query + short: q + type: string + description: | + Query to filter Nexus Operation Executions to count. + + - name: temporal nexus operation result + summary: Wait for and output the result of a Standalone Nexus Operation (Experimental) + description: | + Wait for a Standalone Nexus Operation to complete and output + the result. + + ``` + temporal nexus operation result \ + --operation-id YourOperationId + ``` + option-sets: + - nexus-operation-reference + - name: temporal operator summary: Manage Temporal deployments description: | @@ -5056,6 +5260,58 @@ option-sets: May be used as an alternative to `--target-namespace` and `--target-task-queue`. + - name: nexus-operation-reference + options: + - name: operation-id + type: string + description: Nexus Operation ID. + required: true + - name: run-id + type: string + short: r + description: Run ID of the Nexus Operation. + + - name: nexus-operation-start + options: + - name: endpoint + type: string + description: Nexus Endpoint name. + required: true + - name: service + type: string + description: Nexus Service name. + required: true + - name: operation + type: string + description: Nexus Operation name. + required: true + - name: operation-id + type: string + description: | + Nexus Operation ID. + If not supplied, a unique ID is generated. + - name: schedule-to-close-timeout + type: duration + description: | + Total time the operation is allowed to run. + - name: id-conflict-policy + type: string-enum + description: | + Policy for handling an Operation ID conflict with a + running operation. + enum-values: + - Fail + - UseExisting + - TerminateExisting + - name: id-reuse-policy + type: string-enum + description: | + Policy for re-using an Operation ID from a previously + closed operation. + enum-values: + - AllowDuplicate + - RejectDuplicate + - name: query-modifiers options: - name: reject-condition diff --git a/internal/temporalcli/commands_test.go b/internal/temporalcli/commands_test.go index c3efa843e..1b1a18e51 100644 --- a/internal/temporalcli/commands_test.go +++ b/internal/temporalcli/commands_test.go @@ -240,6 +240,8 @@ func (s *SharedServerSuite) SetupSuite() { "history.enableSignalWithStartFromWorkflow": true, "activity.enableStandalone": true, "activity.longPollTimeout": 2 * time.Second, + "nexusoperation.enableStandalone": true, + "history.enableChasmCallbacks": true, // this is overridden since we don't want caching to be enabled // while testing DescribeTaskQueue behaviour related to versioning "matching.TaskQueueInfoByBuildIdTTL": 0 * time.Second,