Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions scenarios/workflow_with_many_timers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package scenarios

import (
"context"
"fmt"
"hash/fnv"
"math/rand"
"time"

"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/loadgen/kitchensink"
)

// workflow_with_many_timers exercises the timer subsystem with workflows that
// hold many concurrent active timers. Each workflow runs `iterations`
// sequential batches; within each batch, `concurrent-timers` timers fire in
// parallel. Each timer fires after `timer-duration`, optionally jittered by a
// random offset in [0, `timer-duration-jitter`).
//
// What this stresses: timer scheduling and firing under high per-workflow
// timer cardinality, and the worker's ability to manage many active timer
// futures concurrently. With zero jitter all timers share one deadline and
// fire as a single burst (thundering herd); a non-zero jitter spreads the
// deadlines so the timer queue holds many distinct fire times and the workflow
// wakes repeatedly as they trickle in.

const (
manyTimersConcurrentTimersFlag = "concurrent-timers"
manyTimersTimerDurationFlag = "timer-duration"
manyTimersTimerJitterFlag = "timer-duration-jitter"
manyTimersIterationsFlag = "iterations"
)

func init() {
loadgen.MustRegisterScenario(loadgen.Scenario{
Description: fmt.Sprintf(
"Run workflows that hold many concurrent active timers. "+
"Options: '%s' (default 30), '%s' (default 10s), '%s' (default 0), '%s' (default 1).",
manyTimersConcurrentTimersFlag,
manyTimersTimerDurationFlag,
manyTimersTimerJitterFlag,
manyTimersIterationsFlag,
),
ExecutorFn: func() loadgen.Executor {
return loadgen.KitchenSinkExecutor{
TestInput: &kitchensink.TestInput{
WorkflowInput: &kitchensink.WorkflowInput{
InitialActions: []*kitchensink.ActionSet{},
},
},
PrepareTestInput: func(_ context.Context, info loadgen.ScenarioInfo, params *kitchensink.TestInput) error {
cfg, err := parseManyTimersConfig(&info)
if err != nil {
return err
}
params.WorkflowInput.InitialActions = buildManyTimersActions(cfg, manyTimersSeededRng(info.RunID))
return nil
},
}
},
})
}

type manyTimersConfig struct {
concurrentTimers int
timerDuration time.Duration
timerJitter time.Duration
iterations int
}

func parseManyTimersConfig(info *loadgen.ScenarioInfo) (*manyTimersConfig, error) {
cfg := &manyTimersConfig{
concurrentTimers: info.ScenarioOptionInt(manyTimersConcurrentTimersFlag, 30),
timerDuration: info.ScenarioOptionDuration(manyTimersTimerDurationFlag, 10*time.Second),
timerJitter: info.ScenarioOptionDuration(manyTimersTimerJitterFlag, 0),
iterations: info.ScenarioOptionInt(manyTimersIterationsFlag, 1),
}
if cfg.concurrentTimers <= 0 {
return nil, fmt.Errorf("%s must be > 0, got %d", manyTimersConcurrentTimersFlag, cfg.concurrentTimers)
}
if cfg.timerDuration <= 0 {
return nil, fmt.Errorf("%s must be > 0, got %v", manyTimersTimerDurationFlag, cfg.timerDuration)
}
if cfg.timerJitter < 0 {
return nil, fmt.Errorf("%s must be >= 0, got %v", manyTimersTimerJitterFlag, cfg.timerJitter)
}
if cfg.iterations <= 0 {
return nil, fmt.Errorf("%s must be > 0, got %d", manyTimersIterationsFlag, cfg.iterations)
}
return cfg, nil
}

// manyTimersSeededRng derives a deterministic RNG from the run ID so a given
// run produces the same jittered timer durations across re-executions.
func manyTimersSeededRng(runID string) *rand.Rand {
h := fnv.New64a()
_, _ = fmt.Fprintf(h, "%s", runID)
return rand.New(rand.NewSource(int64(h.Sum64())))
}

// buildManyTimersActions composes the InitialActions for one workflow run: one
// concurrent batch of `concurrentTimers` timers per iteration, followed by a
// terminal ReturnResult. Each timer fires after `timerDuration`; when
// `timerJitter` is positive, a random offset in [0, timerJitter) is added so
// the timers in a batch fire at distinct times. rng is only consumed when
// jitter is positive.
func buildManyTimersActions(cfg *manyTimersConfig, rng *rand.Rand) []*kitchensink.ActionSet {
sets := make([]*kitchensink.ActionSet, 0, cfg.iterations+1)
for range cfg.iterations {
batch := make([]*kitchensink.Action, cfg.concurrentTimers)
for j := range batch {
d := cfg.timerDuration
if cfg.timerJitter > 0 {
d += time.Duration(rng.Int63n(int64(cfg.timerJitter)))
}
batch[j] = kitchensink.NewTimerAction(d)
}
sets = append(sets, &kitchensink.ActionSet{
Actions: batch,
Concurrent: true,
})
}
sets = append(sets, &kitchensink.ActionSet{
Actions: []*kitchensink.Action{kitchensink.NewEmptyReturnResultAction()},
})
return sets
}
150 changes: 150 additions & 0 deletions scenarios/workflow_with_many_timers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package scenarios

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/temporalio/omes/cmd/clioptions"
"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/workers"
)

func TestWorkflowWithManyTimers_ParseConfig(t *testing.T) {
t.Parallel()

t.Run("defaults", func(t *testing.T) {
info := &loadgen.ScenarioInfo{ScenarioOptions: map[string]string{}}
cfg, err := parseManyTimersConfig(info)
require.NoError(t, err)
require.Equal(t, 30, cfg.concurrentTimers)
require.Equal(t, 10*time.Second, cfg.timerDuration)
require.Equal(t, time.Duration(0), cfg.timerJitter)
require.Equal(t, 1, cfg.iterations)
})

t.Run("overrides", func(t *testing.T) {
info := &loadgen.ScenarioInfo{ScenarioOptions: map[string]string{
manyTimersConcurrentTimersFlag: "4",
manyTimersTimerDurationFlag: "10ms",
manyTimersTimerJitterFlag: "5ms",
manyTimersIterationsFlag: "3",
}}
cfg, err := parseManyTimersConfig(info)
require.NoError(t, err)
require.Equal(t, 4, cfg.concurrentTimers)
require.Equal(t, 10*time.Millisecond, cfg.timerDuration)
require.Equal(t, 5*time.Millisecond, cfg.timerJitter)
require.Equal(t, 3, cfg.iterations)
})

t.Run("rejects invalid values", func(t *testing.T) {
cases := map[string]map[string]string{
"non-positive concurrent-timers": {manyTimersConcurrentTimersFlag: "0"},
"non-positive timer-duration": {manyTimersTimerDurationFlag: "0"},
"negative timer-duration-jitter": {manyTimersTimerJitterFlag: "-1s"},
"non-positive iterations": {manyTimersIterationsFlag: "0"},
}
for name, opts := range cases {
t.Run(name, func(t *testing.T) {
_, err := parseManyTimersConfig(&loadgen.ScenarioInfo{ScenarioOptions: opts})
require.Error(t, err)
})
}
})
}

func TestWorkflowWithManyTimers_BuildActions(t *testing.T) {
t.Parallel()

t.Run("one concurrent batch per iteration plus a terminal return", func(t *testing.T) {
cfg := &manyTimersConfig{
concurrentTimers: 4,
timerDuration: 10 * time.Millisecond,
iterations: 3,
}
sets := buildManyTimersActions(cfg, manyTimersSeededRng("test"))
require.Len(t, sets, 4, "3 timer batches + 1 terminal return set")

for i := 0; i < 3; i++ {
require.True(t, sets[i].Concurrent, "timer batch %d should fire concurrently", i)
require.Len(t, sets[i].Actions, 4, "batch %d should hold concurrent-timers timers", i)
for _, a := range sets[i].Actions {
require.NotNil(t, a.GetTimer(), "batch %d action should be a timer", i)
require.Equal(t, uint64(10), a.GetTimer().GetMilliseconds(),
"zero jitter should leave every timer at timer-duration")
}
}
require.NotNil(t, sets[3].Actions[0].GetReturnResult(), "final set should return a result")
})

t.Run("jitter spreads durations within [base, base+jitter)", func(t *testing.T) {
base := 10 * time.Second
jitter := 5 * time.Second
cfg := &manyTimersConfig{
concurrentTimers: 50,
timerDuration: base,
timerJitter: jitter,
iterations: 1,
}
sets := buildManyTimersActions(cfg, manyTimersSeededRng("run-jitter"))
timers := sets[0].Actions
require.Len(t, timers, 50)

distinct := map[uint64]struct{}{}
for _, a := range timers {
ms := a.GetTimer().GetMilliseconds()
require.GreaterOrEqual(t, ms, uint64(base.Milliseconds()), "below base")
require.Less(t, ms, uint64((base + jitter).Milliseconds()), "at or above base+jitter")
distinct[ms] = struct{}{}
}
require.Greater(t, len(distinct), 1, "jitter should produce varied durations")
})

t.Run("jitter is reproducible for a fixed run ID", func(t *testing.T) {
cfg := &manyTimersConfig{
concurrentTimers: 20,
timerDuration: time.Second,
timerJitter: time.Second,
iterations: 1,
}
durations := func(rng string) []uint64 {
sets := buildManyTimersActions(cfg, manyTimersSeededRng(rng))
out := make([]uint64, 0, len(sets[0].Actions))
for _, a := range sets[0].Actions {
out = append(out, a.GetTimer().GetMilliseconds())
}
return out
}
require.Equal(t, durations("run-x"), durations("run-x"), "same run ID must reproduce durations")
require.NotEqual(t, durations("run-x"), durations("run-y"), "different run ID should diverge")
})
}

func TestWorkflowWithManyTimers(t *testing.T) {
t.Parallel()

env := workers.SetupTestEnvironment(t,
workers.WithExecutorTimeout(1*time.Minute))

baseRunID := fmt.Sprintf("wmt-%d", time.Now().Unix())

t.Run("multiple sequential batches of concurrent timers", func(t *testing.T) {
executor := loadgen.GetScenario("workflow_with_many_timers").ExecutorFn()

_, err := env.RunExecutorTest(t, executor, loadgen.ScenarioInfo{
RunID: baseRunID + "-m",
Configuration: loadgen.RunConfiguration{
Iterations: 3,
MaxConcurrent: 3,
},
ScenarioOptions: map[string]string{
manyTimersConcurrentTimersFlag: "5",
manyTimersTimerDurationFlag: "10ms",
manyTimersIterationsFlag: "2",
},
}, clioptions.LangGo)
require.NoError(t, err)
})
}
Loading