Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
AddActivityTaskCancelRequestedEvent(int64, int64, string) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)
AddActivityTaskCanceledEvent(int64, int64, int64, *commonpb.Payloads, string) (*historypb.HistoryEvent, error)
AddWorkerCommandsTasks(commands []*workerpb.WorkerCommand, controlQueue string) error
GenerateActivityCancelCommandsForClose() error
AddActivityTaskCompletedEvent(int64, int64, *workflowservice.RespondActivityTaskCompletedRequest) (*historypb.HistoryEvent, error)
AddActivityTaskFailedEvent(int64, int64, *failurepb.Failure, enumspb.RetryState, string, *commonpb.WorkerVersionStamp) (*historypb.HistoryEvent, error)
AddActivityTaskScheduledEvent(int64, *commandpb.ScheduleActivityTaskCommandAttributes, bool) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)
Expand Down
14 changes: 14 additions & 0 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions service/history/ndc/events_reapplier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent_Termination(
false,
nil,
).Return(nil, nil)
msCurrent.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)
events := []*historypb.HistoryEvent{
{EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED},
event,
Expand Down
2 changes: 2 additions & 0 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() {
false,
nil,
).Return(&historypb.HistoryEvent{}, nil)
mutableState.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)

err := s.workflowResetter.terminateWorkflow(mutableState, terminateReason)
s.NoError(err)
Expand Down Expand Up @@ -1256,6 +1257,7 @@ func (s *workflowResetterSuite) TestReapplyEvents() {
false,
event.Links,
).Return(&historypb.HistoryEvent{}, nil)
ms.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil)
}
}
}
Expand Down
68 changes: 68 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"go.temporal.io/server/common/searchattribute/sadefs"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/common/softassert"
"go.temporal.io/server/common/tasktoken"
"go.temporal.io/server/common/util"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/components/callbacks"
Expand Down Expand Up @@ -4536,6 +4537,73 @@ func (ms *MutableStateImpl) AddWorkerCommandsTasks(commands []*workerpb.WorkerCo
return ms.taskGenerator.GenerateWorkerCommandsTasks(commands, controlQueue)
}

// GenerateActivityCancelCommandsForClose generates WorkerCommandsTasks to cancel all
// in-flight activities that have a worker control queue. Called when the workflow is being
// terminated (or otherwise forcefully closed) to proactively notify workers.
Comment thread
rkannan82 marked this conversation as resolved.
Outdated
func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error {
if !ms.config.EnableCancelActivityWorkerCommand() {
return nil
}

serializer := tasktoken.NewSerializer()
wfKey := ms.GetWorkflowKey()
nsID := ms.GetNamespaceEntry().ID().String()

commandsByQueue := make(map[string][]*workerpb.WorkerCommand)
for _, ai := range ms.pendingActivityInfoIDs {
// No control queue means the activity was started before this feature was deployed.
if ai.WorkerControlTaskQueue == "" {
continue
}
if ai.StartedClock == nil {
Comment thread
rkannan82 marked this conversation as resolved.
// StartedClock may be nil for activities started before this feature was deployed.
// Skip cancel command; the activity will time out normally.
ms.logger.Debug("Skipping worker cancel command: activity missing StartedClock (pre-deploy)",
tag.WorkflowNamespaceID(wfKey.NamespaceID),
tag.WorkflowID(wfKey.WorkflowID),
tag.WorkflowRunID(wfKey.RunID),
tag.WorkflowScheduledEventID(ai.ScheduledEventId),
)
continue
}

taskToken, err := serializer.Serialize(tasktoken.NewActivityTaskToken(
nsID,
wfKey.WorkflowID,
wfKey.RunID,
ai.ScheduledEventId,
ai.ActivityId,
ai.ActivityType.GetName(),
ai.Attempt,
ai.StartedClock,
ai.Version,
ai.StartVersion,
nil,
))
if err != nil {
return err
}

commandsByQueue[ai.WorkerControlTaskQueue] = append(
commandsByQueue[ai.WorkerControlTaskQueue],
&workerpb.WorkerCommand{
Type: &workerpb.WorkerCommand_CancelActivity{
CancelActivity: &workerpb.CancelActivityCommand{
TaskToken: taskToken,
},
},
},
)
}

for controlQueue, commands := range commandsByQueue {
if err := ms.AddWorkerCommandsTasks(commands, controlQueue); err != nil {
return err
}
}
return nil
}

func (ms *MutableStateImpl) ApplyActivityTaskCancelRequestedEvent(
event *historypb.HistoryEvent,
) error {
Expand Down
158 changes: 158 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7591,6 +7591,164 @@ func (s *mutableStateSuite) TestApplyWorkflowExecutionOptionsUpdatedEvent_TimeSk
}
}

func TestGenerateActivityCancelCommandsForClose(t *testing.T) {
t.Parallel()

startedClock := &clockspb.VectorClock{ShardId: 1, Clock: 100}

testCases := []struct {
name string
featureEnabled bool
activities map[int64]*persistencespb.ActivityInfo
expectedQueues map[string]int // controlQueue -> expected command count
expectedNoTasks bool
}{
{
name: "activities with control queue and started clock",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "control-queue-1",
StartedClock: startedClock,
Attempt: 1,
},
},
expectedQueues: map[string]int{"control-queue-1": 1},
},
{
name: "skips activities without control queue",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
StartedClock: startedClock,
Attempt: 1,
},
},
expectedNoTasks: true,
},
{
name: "skips activities without started clock",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "control-queue-1",
Attempt: 1,
},
},
expectedNoTasks: true,
},
{
name: "multiple activities batched by control queue",
featureEnabled: true,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "queue-A",
StartedClock: startedClock,
Attempt: 1,
},
2: {
ScheduledEventId: 2,
ActivityId: "act-2",
ActivityType: &commonpb.ActivityType{Name: "type2"},
WorkerControlTaskQueue: "queue-A",
StartedClock: startedClock,
Attempt: 1,
},
3: {
ScheduledEventId: 3,
ActivityId: "act-3",
ActivityType: &commonpb.ActivityType{Name: "type3"},
WorkerControlTaskQueue: "queue-B",
StartedClock: startedClock,
Attempt: 1,
},
},
expectedQueues: map[string]int{"queue-A": 2, "queue-B": 1},
},
{
name: "feature flag disabled - no tasks generated",
featureEnabled: false,
activities: map[int64]*persistencespb.ActivityInfo{
1: {
ScheduledEventId: 1,
ActivityId: "act-1",
ActivityType: &commonpb.ActivityType{Name: "type1"},
WorkerControlTaskQueue: "control-queue-1",
StartedClock: startedClock,
Attempt: 1,
},
},
expectedNoTasks: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockEventsCache := events.NewMockCache(ctrl)
mockConfig := tests.NewDynamicConfig()
mockConfig.EnableCancelActivityWorkerCommand = dynamicconfig.GetBoolPropertyFn(tc.featureEnabled)

mockShard := shard.NewTestContext(
ctrl,
&persistencespb.ShardInfo{ShardId: 0, RangeId: 1},
mockConfig,
)
defer mockShard.StopForTest()
reg := hsm.NewRegistry()
require.NoError(t, RegisterStateMachine(reg))
require.NoError(t, callbacks.RegisterStateMachine(reg))
require.NoError(t, nexusoperations.RegisterStateMachines(reg))
mockShard.SetStateMachineRegistry(reg)
mockShard.SetEventsCacheForTesting(mockEventsCache)

namespaceEntry := tests.GlobalNamespaceEntry
mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(gomock.Any(), gomock.Any()).Return(cluster.TestCurrentClusterName).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes()

ms := NewMutableState(mockShard, mockEventsCache, log.NewTestLogger(), namespaceEntry, tests.WorkflowID, tests.RunID, time.Now().UTC())
ms.pendingActivityInfoIDs = tc.activities

err := ms.GenerateActivityCancelCommandsForClose()
require.NoError(t, err)

if tc.expectedNoTasks {
require.Empty(t, ms.InsertTasks[tasks.CategoryOutbound])
return
}

// Verify tasks were generated by checking outbound task messages
var workerCommandTasks []*tasks.WorkerCommandsTask
for _, task := range ms.InsertTasks[tasks.CategoryOutbound] {
if wct, ok := task.(*tasks.WorkerCommandsTask); ok {
workerCommandTasks = append(workerCommandTasks, wct)
}
}

// Verify each expected queue got the right number of commands
tasksByQueue := make(map[string]int)
for _, wct := range workerCommandTasks {
tasksByQueue[wct.Destination] = len(wct.Commands)
}
require.Equal(t, tc.expectedQueues, tasksByQueue)
})
}
}

// TestApplyTimeSkippingBound covers the full branch table of applyTimeSkippingBound:
// MaxElapsedDuration set / nil duration / nil bound / nil config / Enabled=false /
// MaxSkippedDuration clearing a stale CurrentElapsedDurationBound. The first-init
Expand Down
13 changes: 11 additions & 2 deletions service/history/workflow/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ func TimeoutWorkflow(
retryState,
continuedRunID,
)
return err
if err != nil {
return err
}

// Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed.
return mutableState.GenerateActivityCancelCommandsForClose()
}

// TerminateWorkflow will write a WorkflowExecutionTerminated event with a fresh
Expand Down Expand Up @@ -143,8 +148,12 @@ func TerminateWorkflow(
deleteAfterTerminate,
links,
)
if err != nil {
return err
}

return err
// Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed.
return mutableState.GenerateActivityCancelCommandsForClose()
}

// FindAutoResetPoint returns the auto reset point
Expand Down
Loading
Loading