From 7ae5d9d862a41c1195e58de56bdfe7f8c70e1d5a Mon Sep 17 00:00:00 2001 From: lilydoar Date: Thu, 28 May 2026 14:35:32 -0700 Subject: [PATCH] Add workflow_with_many_timers scenario Each workflow holds many concurrent active timers, repeated across optional sequential iterations. Stresses timer scheduling and firing under high per-workflow timer cardinality. stack-info: PR: https://github.com/temporalio/omes/pull/368, branch: lilydoar/stack/2 --- scenarios/workflow_with_many_timers.go | 127 +++++++++++++++++ scenarios/workflow_with_many_timers_test.go | 150 ++++++++++++++++++++ 2 files changed, 277 insertions(+) create mode 100644 scenarios/workflow_with_many_timers.go create mode 100644 scenarios/workflow_with_many_timers_test.go diff --git a/scenarios/workflow_with_many_timers.go b/scenarios/workflow_with_many_timers.go new file mode 100644 index 00000000..61cc68bf --- /dev/null +++ b/scenarios/workflow_with_many_timers.go @@ -0,0 +1,127 @@ +package scenarios + +import ( + "context" + "fmt" + "hash/fnv" + "math/rand" + "time" + + "github.com/temporalio/omes/loadgen" + "github.com/temporalio/omes/loadgen/kitchensink" +) + +// workflow_with_many_timers exercises the timer subsystem with workflows that +// hold many concurrent active timers. Each workflow runs `iterations` +// sequential batches; within each batch, `concurrent-timers` timers fire in +// parallel. Each timer fires after `timer-duration`, optionally jittered by a +// random offset in [0, `timer-duration-jitter`). +// +// What this stresses: timer scheduling and firing under high per-workflow +// timer cardinality, and the worker's ability to manage many active timer +// futures concurrently. With zero jitter all timers share one deadline and +// fire as a single burst (thundering herd); a non-zero jitter spreads the +// deadlines so the timer queue holds many distinct fire times and the workflow +// wakes repeatedly as they trickle in. + +const ( + manyTimersConcurrentTimersFlag = "concurrent-timers" + manyTimersTimerDurationFlag = "timer-duration" + manyTimersTimerJitterFlag = "timer-duration-jitter" + manyTimersIterationsFlag = "iterations" +) + +func init() { + loadgen.MustRegisterScenario(loadgen.Scenario{ + Description: fmt.Sprintf( + "Run workflows that hold many concurrent active timers. "+ + "Options: '%s' (default 30), '%s' (default 10s), '%s' (default 0), '%s' (default 1).", + manyTimersConcurrentTimersFlag, + manyTimersTimerDurationFlag, + manyTimersTimerJitterFlag, + manyTimersIterationsFlag, + ), + ExecutorFn: func() loadgen.Executor { + return loadgen.KitchenSinkExecutor{ + TestInput: &kitchensink.TestInput{ + WorkflowInput: &kitchensink.WorkflowInput{ + InitialActions: []*kitchensink.ActionSet{}, + }, + }, + PrepareTestInput: func(_ context.Context, info loadgen.ScenarioInfo, params *kitchensink.TestInput) error { + cfg, err := parseManyTimersConfig(&info) + if err != nil { + return err + } + params.WorkflowInput.InitialActions = buildManyTimersActions(cfg, manyTimersSeededRng(info.RunID)) + return nil + }, + } + }, + }) +} + +type manyTimersConfig struct { + concurrentTimers int + timerDuration time.Duration + timerJitter time.Duration + iterations int +} + +func parseManyTimersConfig(info *loadgen.ScenarioInfo) (*manyTimersConfig, error) { + cfg := &manyTimersConfig{ + concurrentTimers: info.ScenarioOptionInt(manyTimersConcurrentTimersFlag, 30), + timerDuration: info.ScenarioOptionDuration(manyTimersTimerDurationFlag, 10*time.Second), + timerJitter: info.ScenarioOptionDuration(manyTimersTimerJitterFlag, 0), + iterations: info.ScenarioOptionInt(manyTimersIterationsFlag, 1), + } + if cfg.concurrentTimers <= 0 { + return nil, fmt.Errorf("%s must be > 0, got %d", manyTimersConcurrentTimersFlag, cfg.concurrentTimers) + } + if cfg.timerDuration <= 0 { + return nil, fmt.Errorf("%s must be > 0, got %v", manyTimersTimerDurationFlag, cfg.timerDuration) + } + if cfg.timerJitter < 0 { + return nil, fmt.Errorf("%s must be >= 0, got %v", manyTimersTimerJitterFlag, cfg.timerJitter) + } + if cfg.iterations <= 0 { + return nil, fmt.Errorf("%s must be > 0, got %d", manyTimersIterationsFlag, cfg.iterations) + } + return cfg, nil +} + +// manyTimersSeededRng derives a deterministic RNG from the run ID so a given +// run produces the same jittered timer durations across re-executions. +func manyTimersSeededRng(runID string) *rand.Rand { + h := fnv.New64a() + _, _ = fmt.Fprintf(h, "%s", runID) + return rand.New(rand.NewSource(int64(h.Sum64()))) +} + +// buildManyTimersActions composes the InitialActions for one workflow run: one +// concurrent batch of `concurrentTimers` timers per iteration, followed by a +// terminal ReturnResult. Each timer fires after `timerDuration`; when +// `timerJitter` is positive, a random offset in [0, timerJitter) is added so +// the timers in a batch fire at distinct times. rng is only consumed when +// jitter is positive. +func buildManyTimersActions(cfg *manyTimersConfig, rng *rand.Rand) []*kitchensink.ActionSet { + sets := make([]*kitchensink.ActionSet, 0, cfg.iterations+1) + for range cfg.iterations { + batch := make([]*kitchensink.Action, cfg.concurrentTimers) + for j := range batch { + d := cfg.timerDuration + if cfg.timerJitter > 0 { + d += time.Duration(rng.Int63n(int64(cfg.timerJitter))) + } + batch[j] = kitchensink.NewTimerAction(d) + } + sets = append(sets, &kitchensink.ActionSet{ + Actions: batch, + Concurrent: true, + }) + } + sets = append(sets, &kitchensink.ActionSet{ + Actions: []*kitchensink.Action{kitchensink.NewEmptyReturnResultAction()}, + }) + return sets +} diff --git a/scenarios/workflow_with_many_timers_test.go b/scenarios/workflow_with_many_timers_test.go new file mode 100644 index 00000000..5d4750d2 --- /dev/null +++ b/scenarios/workflow_with_many_timers_test.go @@ -0,0 +1,150 @@ +package scenarios + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/temporalio/omes/cmd/clioptions" + "github.com/temporalio/omes/loadgen" + "github.com/temporalio/omes/workers" +) + +func TestWorkflowWithManyTimers_ParseConfig(t *testing.T) { + t.Parallel() + + t.Run("defaults", func(t *testing.T) { + info := &loadgen.ScenarioInfo{ScenarioOptions: map[string]string{}} + cfg, err := parseManyTimersConfig(info) + require.NoError(t, err) + require.Equal(t, 30, cfg.concurrentTimers) + require.Equal(t, 10*time.Second, cfg.timerDuration) + require.Equal(t, time.Duration(0), cfg.timerJitter) + require.Equal(t, 1, cfg.iterations) + }) + + t.Run("overrides", func(t *testing.T) { + info := &loadgen.ScenarioInfo{ScenarioOptions: map[string]string{ + manyTimersConcurrentTimersFlag: "4", + manyTimersTimerDurationFlag: "10ms", + manyTimersTimerJitterFlag: "5ms", + manyTimersIterationsFlag: "3", + }} + cfg, err := parseManyTimersConfig(info) + require.NoError(t, err) + require.Equal(t, 4, cfg.concurrentTimers) + require.Equal(t, 10*time.Millisecond, cfg.timerDuration) + require.Equal(t, 5*time.Millisecond, cfg.timerJitter) + require.Equal(t, 3, cfg.iterations) + }) + + t.Run("rejects invalid values", func(t *testing.T) { + cases := map[string]map[string]string{ + "non-positive concurrent-timers": {manyTimersConcurrentTimersFlag: "0"}, + "non-positive timer-duration": {manyTimersTimerDurationFlag: "0"}, + "negative timer-duration-jitter": {manyTimersTimerJitterFlag: "-1s"}, + "non-positive iterations": {manyTimersIterationsFlag: "0"}, + } + for name, opts := range cases { + t.Run(name, func(t *testing.T) { + _, err := parseManyTimersConfig(&loadgen.ScenarioInfo{ScenarioOptions: opts}) + require.Error(t, err) + }) + } + }) +} + +func TestWorkflowWithManyTimers_BuildActions(t *testing.T) { + t.Parallel() + + t.Run("one concurrent batch per iteration plus a terminal return", func(t *testing.T) { + cfg := &manyTimersConfig{ + concurrentTimers: 4, + timerDuration: 10 * time.Millisecond, + iterations: 3, + } + sets := buildManyTimersActions(cfg, manyTimersSeededRng("test")) + require.Len(t, sets, 4, "3 timer batches + 1 terminal return set") + + for i := 0; i < 3; i++ { + require.True(t, sets[i].Concurrent, "timer batch %d should fire concurrently", i) + require.Len(t, sets[i].Actions, 4, "batch %d should hold concurrent-timers timers", i) + for _, a := range sets[i].Actions { + require.NotNil(t, a.GetTimer(), "batch %d action should be a timer", i) + require.Equal(t, uint64(10), a.GetTimer().GetMilliseconds(), + "zero jitter should leave every timer at timer-duration") + } + } + require.NotNil(t, sets[3].Actions[0].GetReturnResult(), "final set should return a result") + }) + + t.Run("jitter spreads durations within [base, base+jitter)", func(t *testing.T) { + base := 10 * time.Second + jitter := 5 * time.Second + cfg := &manyTimersConfig{ + concurrentTimers: 50, + timerDuration: base, + timerJitter: jitter, + iterations: 1, + } + sets := buildManyTimersActions(cfg, manyTimersSeededRng("run-jitter")) + timers := sets[0].Actions + require.Len(t, timers, 50) + + distinct := map[uint64]struct{}{} + for _, a := range timers { + ms := a.GetTimer().GetMilliseconds() + require.GreaterOrEqual(t, ms, uint64(base.Milliseconds()), "below base") + require.Less(t, ms, uint64((base + jitter).Milliseconds()), "at or above base+jitter") + distinct[ms] = struct{}{} + } + require.Greater(t, len(distinct), 1, "jitter should produce varied durations") + }) + + t.Run("jitter is reproducible for a fixed run ID", func(t *testing.T) { + cfg := &manyTimersConfig{ + concurrentTimers: 20, + timerDuration: time.Second, + timerJitter: time.Second, + iterations: 1, + } + durations := func(rng string) []uint64 { + sets := buildManyTimersActions(cfg, manyTimersSeededRng(rng)) + out := make([]uint64, 0, len(sets[0].Actions)) + for _, a := range sets[0].Actions { + out = append(out, a.GetTimer().GetMilliseconds()) + } + return out + } + require.Equal(t, durations("run-x"), durations("run-x"), "same run ID must reproduce durations") + require.NotEqual(t, durations("run-x"), durations("run-y"), "different run ID should diverge") + }) +} + +func TestWorkflowWithManyTimers(t *testing.T) { + t.Parallel() + + env := workers.SetupTestEnvironment(t, + workers.WithExecutorTimeout(1*time.Minute)) + + baseRunID := fmt.Sprintf("wmt-%d", time.Now().Unix()) + + t.Run("multiple sequential batches of concurrent timers", func(t *testing.T) { + executor := loadgen.GetScenario("workflow_with_many_timers").ExecutorFn() + + _, err := env.RunExecutorTest(t, executor, loadgen.ScenarioInfo{ + RunID: baseRunID + "-m", + Configuration: loadgen.RunConfiguration{ + Iterations: 3, + MaxConcurrent: 3, + }, + ScenarioOptions: map[string]string{ + manyTimersConcurrentTimersFlag: "5", + manyTimersTimerDurationFlag: "10ms", + manyTimersIterationsFlag: "2", + }, + }, clioptions.LangGo) + require.NoError(t, err) + }) +}