Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
AddActivityTaskCancelRequestedEvent(int64, int64, string) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)
AddActivityTaskCanceledEvent(int64, int64, int64, *commonpb.Payloads, string) (*historypb.HistoryEvent, error)
AddWorkerCommandsTasks(commands []*workerpb.WorkerCommand, controlQueue string) error
GenerateActivityCancelCommandsForClose() error
AddActivityTaskCompletedEvent(int64, int64, *workflowservice.RespondActivityTaskCompletedRequest) (*historypb.HistoryEvent, error)
AddActivityTaskFailedEvent(int64, int64, *failurepb.Failure, enumspb.RetryState, string, *commonpb.WorkerVersionStamp) (*historypb.HistoryEvent, error)
AddActivityTaskScheduledEvent(int64, *commandpb.ScheduleActivityTaskCommandAttributes, bool) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)
Expand Down
14 changes: 14 additions & 0 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"go.temporal.io/server/common/searchattribute/sadefs"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/common/tasktoken"
"go.temporal.io/server/common/util"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/components/callbacks"
Expand Down Expand Up @@ -4536,6 +4537,80 @@ func (ms *MutableStateImpl) AddWorkerCommandsTasks(commands []*workerpb.WorkerCo
return ms.taskGenerator.GenerateWorkerCommandsTasks(commands, controlQueue)
}

// GenerateActivityCancelCommandsForClose generates WorkerCommandsTasks to cancel all
// in-flight activities that have a worker control queue.
func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error {
if !ms.config.EnableCancelActivityWorkerCommand() {
return nil
}

// Cancel commands are best-effort and only dispatched on the active cluster.
// Skip task generation on standby to avoid creating tasks that will be dropped.
activeCluster := ms.clusterMetadata.ClusterNameForFailoverVersion(
ms.namespaceEntry.IsGlobalNamespace(), ms.GetCurrentVersion())
Comment thread
rkannan82 marked this conversation as resolved.
if activeCluster != ms.clusterMetadata.GetCurrentClusterName() {
return nil
}

serializer := tasktoken.NewSerializer()
wfKey := ms.GetWorkflowKey()
nsID := ms.GetNamespaceEntry().ID().String()

commandsByQueue := make(map[string][]*workerpb.WorkerCommand)
for _, ai := range ms.pendingActivityInfoIDs {
// No control queue means the activity was started before this feature was deployed.
if ai.WorkerControlTaskQueue == "" {
continue
}
if ai.StartedClock == nil {
Comment thread
rkannan82 marked this conversation as resolved.
// StartedClock is nil when the activity is not currently started (e.g. in retry backoff)
// or was started before this feature was deployed. Skip cancel command.
ms.logger.Debug("Skipping worker cancel command: activity not currently started",
tag.WorkflowNamespaceID(wfKey.NamespaceID),
tag.WorkflowID(wfKey.WorkflowID),
tag.WorkflowRunID(wfKey.RunID),
tag.WorkflowScheduledEventID(ai.ScheduledEventId),
)
continue
}

taskToken, err := serializer.Serialize(tasktoken.NewActivityTaskToken(
nsID,
wfKey.WorkflowID,
wfKey.RunID,
ai.ScheduledEventId,
ai.ActivityId,
ai.ActivityType.GetName(),
ai.Attempt,
ai.StartedClock,
ai.Version,
ai.StartVersion,
nil,
))
if err != nil {
return err
}

commandsByQueue[ai.WorkerControlTaskQueue] = append(
commandsByQueue[ai.WorkerControlTaskQueue],
&workerpb.WorkerCommand{
Type: &workerpb.WorkerCommand_CancelActivity{
CancelActivity: &workerpb.CancelActivityCommand{
TaskToken: taskToken,
},
},
},
)
}

for controlQueue, commands := range commandsByQueue {
if err := ms.AddWorkerCommandsTasks(commands, controlQueue); err != nil {
return err
}
}
return nil
}

func (ms *MutableStateImpl) ApplyActivityTaskCancelRequestedEvent(
event *historypb.HistoryEvent,
) error {
Expand Down
158 changes: 158 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7591,6 +7591,164 @@ func (s *mutableStateSuite) TestApplyWorkflowExecutionOptionsUpdatedEvent_TimeSk
}
}

func TestGenerateActivityCancelCommandsForClose(t *testing.T) {
t.Parallel()

startedClock := &clockspb.VectorClock{ShardId: 1, Clock: 100}

testCases := []struct {
name string
featureEnabled bool
activities map[int64]*persistencespb.ActivityInfo
expectedQueues map[string]int // controlQueue -> expected command count
expectedNoTasks bool
}{
{
name: "activities with control queue and started clock",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "control-queue-1",
StartedClock: startedClock,
Attempt: 1,
},
},
expectedQueues: map[string]int{"control-queue-1": 1},
},
{
name: "skips activities without control queue",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
StartedClock: startedClock,
Attempt: 1,
},
},
expectedNoTasks: true,
},
{
name: "skips activities without started clock",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "control-queue-1",
Attempt: 1,
},
},
expectedNoTasks: true,
},
{
name: "multiple activities batched by control queue",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "queue-A",
StartedClock: startedClock,
Attempt: 1,
},
2: {
ScheduledEventId: 2,
ActivityId: "act-2",
ActivityType: &commonpb.ActivityType{Name: "type2"},
WorkerControlTaskQueue: "queue-A",
StartedClock: startedClock,
Attempt: 1,
},
3: {
ScheduledEventId: 3,
ActivityId: "act-3",
ActivityType: &commonpb.ActivityType{Name: "type3"},
WorkerControlTaskQueue: "queue-B",
StartedClock: startedClock,
Attempt: 1,
},
},
expectedQueues: map[string]int{"queue-A": 2, "queue-B": 1},
},
{
name: "feature flag disabled - no tasks generated",
featureEnabled: false,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "control-queue-1",
StartedClock: startedClock,
Attempt: 1,
},
},
expectedNoTasks: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockEventsCache := events.NewMockCache(ctrl)
mockConfig := tests.NewDynamicConfig()
mockConfig.EnableCancelActivityWorkerCommand = dynamicconfig.GetBoolPropertyFn(tc.featureEnabled)

mockShard := shard.NewTestContext(
ctrl,
&persistencespb.ShardInfo{ShardId: 0, RangeId: 1},
mockConfig,
)
defer mockShard.StopForTest()
reg := hsm.NewRegistry()
require.NoError(t, RegisterStateMachine(reg))
require.NoError(t, callbacks.RegisterStateMachine(reg))
require.NoError(t, nexusoperations.RegisterStateMachines(reg))
mockShard.SetStateMachineRegistry(reg)
mockShard.SetEventsCacheForTesting(mockEventsCache)

namespaceEntry := tests.GlobalNamespaceEntry
mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(gomock.Any(), gomock.Any()).Return(cluster.TestCurrentClusterName).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes()

ms := NewMutableState(mockShard, mockEventsCache, log.NewTestLogger(), namespaceEntry, tests.WorkflowID, tests.RunID, time.Now().UTC())
ms.pendingActivityInfoIDs = tc.activities

err := ms.GenerateActivityCancelCommandsForClose()
require.NoError(t, err)

if tc.expectedNoTasks {
require.Empty(t, ms.InsertTasks[tasks.CategoryOutbound])
return
}

// Verify tasks were generated by checking outbound task messages
var workerCommandTasks []*tasks.WorkerCommandsTask
for _, task := range ms.InsertTasks[tasks.CategoryOutbound] {
if wct, ok := task.(*tasks.WorkerCommandsTask); ok {
workerCommandTasks = append(workerCommandTasks, wct)
}
}

// Verify each expected queue got the right number of commands
tasksByQueue := make(map[string]int)
for _, wct := range workerCommandTasks {
tasksByQueue[wct.Destination] = len(wct.Commands)
}
require.Equal(t, tc.expectedQueues, tasksByQueue)
})
}
}

// TestApplyTimeSkippingBound covers the full branch table of applyTimeSkippingBound:
// MaxElapsedDuration set / nil duration / nil bound / nil config / Enabled=false /
// MaxSkippedDuration clearing a stale CurrentElapsedDurationBound. The first-init
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(

r.mutableState.AddTasks(closeTasks...)

return nil
// Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed.
return r.mutableState.GenerateActivityCancelCommandsForClose()
}

// getRetention returns the retention period for this task generator's workflow execution.
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/task_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
return cfg
}).AnyTimes()

mutableState.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)

taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg, archivalMetadata, log.NewTestLogger())
err := taskGenerator.GenerateWorkflowCloseTasks(p.CloseEventTime, p.DeleteAfterClose, false)
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion service/history/workflow/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func TerminateWorkflow(
deleteAfterTerminate,
links,
)

return err
}

Expand Down
Loading
Loading