Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions scenarios/out_of_order_signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package scenarios

import (
"context"
"fmt"
"hash/fnv"
"maps"
"math/rand"
"time"

"google.golang.org/protobuf/types/known/durationpb"

"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/loadgen/kitchensink"
)

// out_of_order_signals exercises signal delivery and workflow ordering by
// sending each workflow's N signals in a deterministically shuffled order
// while the workflow processes them strictly in event sequence.
//
// What this stresses: signal queuing, workflow buffering across signal
// arrivals, and forward progress when an early-expected event arrives last.
// Optionally inserts a "processing" activity between events to add
// per-event work.
//
// Per-iteration determinism: shuffle decisions and orderings are seeded from
// (RunID, iteration) so a given run is reproducible across re-executions.
//
// Implementation note: kitchen-sink's SetWorkflowState action replaces the
// workflow state map rather than merging into it (verified across all
// language workers). To accumulate event keys reliably regardless of arrival
// order, each signal carries the cumulative state through its send position;
// the workflow's awaits poll for keys appearing in the (growing) state map.

const (
oooSignalsCountFlag = "signals-per-workflow"
oooSignalsShufflePercentageFlag = "shuffle-percentage"
oooSignalsProcessingTimeFlag = "processing-time-per-signal"
)

const oooSignalReadyValue = "ready"

func init() {
loadgen.MustRegisterScenario(loadgen.Scenario{
Description: fmt.Sprintf(
"Send N signals per workflow in a deterministically shuffled order; the workflow processes them in event sequence. "+
"Options: '%s' (default 10), '%s' (default 100), '%s' (default 0).",
oooSignalsCountFlag,
oooSignalsShufflePercentageFlag,
oooSignalsProcessingTimeFlag,
),
ExecutorFn: func() loadgen.Executor {
return loadgen.KitchenSinkExecutor{
TestInput: &kitchensink.TestInput{
WorkflowInput: &kitchensink.WorkflowInput{
InitialActions: []*kitchensink.ActionSet{},
},
},
UpdateWorkflowOptions: func(_ context.Context, run *loadgen.Run, opts *loadgen.KitchenSinkWorkflowOptions) error {
cfg, err := parseOutOfOrderSignalsConfig(run.ScenarioInfo)
if err != nil {
return err
}
rng := oooSeededRng(run.RunID, run.Iteration)
Copy link
Copy Markdown
Contributor

@THardy98 THardy98 Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth at some point creating a utils file/package with helpers like these seeded rng functions.

Not blocking

opts.Params.WorkflowInput.InitialActions = buildOrderedAwaitActions(cfg.signalsPerWorkflow, cfg.processingTime)
opts.Params.ClientSequence = buildShuffledSignals(cfg.signalsPerWorkflow, cfg.shufflePercentage, rng)
return nil
},
}
},
})
}

type outOfOrderSignalsConfig struct {
signalsPerWorkflow int
shufflePercentage int
processingTime time.Duration
}

func parseOutOfOrderSignalsConfig(info *loadgen.ScenarioInfo) (*outOfOrderSignalsConfig, error) {
cfg := &outOfOrderSignalsConfig{
signalsPerWorkflow: info.ScenarioOptionInt(oooSignalsCountFlag, 10),
shufflePercentage: info.ScenarioOptionInt(oooSignalsShufflePercentageFlag, 100),
processingTime: info.ScenarioOptionDuration(oooSignalsProcessingTimeFlag, 0),
}
if cfg.signalsPerWorkflow <= 0 {
return nil, fmt.Errorf("%s must be > 0, got %d", oooSignalsCountFlag, cfg.signalsPerWorkflow)
}
if cfg.shufflePercentage < 0 || cfg.shufflePercentage > 100 {
return nil, fmt.Errorf("%s must be in [0,100], got %d", oooSignalsShufflePercentageFlag, cfg.shufflePercentage)
}
if cfg.processingTime < 0 {
return nil, fmt.Errorf("%s must be >= 0, got %v", oooSignalsProcessingTimeFlag, cfg.processingTime)
}
return cfg, nil
}

func oooSeededRng(runID string, iter int) *rand.Rand {
h := fnv.New64a()
_, _ = fmt.Fprintf(h, "%s/%d", runID, iter)
return rand.New(rand.NewSource(int64(h.Sum64())))
}

func buildOrderedAwaitActions(n int, processingTime time.Duration) []*kitchensink.ActionSet {
actions := make([]*kitchensink.Action, 0, 2*n+1)
processingStartToClose := durationpb.New(processingTime + time.Minute)
for i := range n {
actions = append(actions, kitchensink.NewAwaitWorkflowStateAction(oooEventKey(i+1), oooSignalReadyValue))
if processingTime > 0 {
actions = append(actions, &kitchensink.Action{
Variant: &kitchensink.Action_ExecActivity{
ExecActivity: &kitchensink.ExecuteActivityAction{
ActivityType: &kitchensink.ExecuteActivityAction_Delay{
Delay: durationpb.New(processingTime),
},
StartToCloseTimeout: processingStartToClose,
Locality: &kitchensink.ExecuteActivityAction_Remote{
Remote: &kitchensink.RemoteActivityOptions{},
},
},
},
})
}
}
actions = append(actions, kitchensink.NewEmptyReturnResultAction())
return []*kitchensink.ActionSet{{Actions: actions}}
}

func buildShuffledSignals(n int, shufflePct int, rng *rand.Rand) *kitchensink.ClientSequence {
shuffle := shufflePct > 0 && rng.Intn(100) < shufflePct

sendOrder := make([]int, n)
for i := range sendOrder {
sendOrder[i] = i + 1
}
if shuffle {
rng.Shuffle(n, func(i, j int) {
sendOrder[i], sendOrder[j] = sendOrder[j], sendOrder[i]
})
}

cumulative := make(map[string]string, n)
signals := make([]*kitchensink.ClientAction, n)
for sendPos, eventID := range sendOrder {
cumulative[oooEventKey(eventID)] = oooSignalReadyValue
snapshot := make(map[string]string, len(cumulative))
maps.Copy(snapshot, cumulative)
signals[sendPos] = makeSetStateSignal(snapshot)
}

return &kitchensink.ClientSequence{
ActionSets: []*kitchensink.ClientActionSet{
{Actions: signals},
},
}
}

func makeSetStateSignal(kvs map[string]string) *kitchensink.ClientAction {
return &kitchensink.ClientAction{
Variant: &kitchensink.ClientAction_DoSignal{
DoSignal: &kitchensink.DoSignal{
Variant: &kitchensink.DoSignal_DoSignalActions_{
DoSignalActions: &kitchensink.DoSignal_DoSignalActions{
Variant: &kitchensink.DoSignal_DoSignalActions_DoActions{
DoActions: kitchensink.SingleActionSet(
&kitchensink.Action{
Variant: &kitchensink.Action_SetWorkflowState{
SetWorkflowState: &kitchensink.WorkflowState{Kvs: kvs},
},
},
),
},
},
},
},
},
}
}

func oooEventKey(i int) string {
return fmt.Sprintf("event_%d", i)
}
208 changes: 208 additions & 0 deletions scenarios/out_of_order_signals_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package scenarios

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/temporalio/omes/cmd/clioptions"
"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/loadgen/kitchensink"
"github.com/temporalio/omes/workers"
)

// oooSignalKvs extracts the SetWorkflowState key/value map carried by a single
// shuffled-signal client action.
func oooSignalKvs(t *testing.T, ca *kitchensink.ClientAction) map[string]string {
t.Helper()
set := ca.GetDoSignal().GetDoSignalActions().GetDoActions()
require.NotNil(t, set, "signal should carry a DoActions action set")
require.Len(t, set.Actions, 1)
ws := set.Actions[0].GetSetWorkflowState()
require.NotNil(t, ws, "signal action should set workflow state")
return ws.Kvs
}

func TestOutOfOrderSignals_SeededRngIsDeterministic(t *testing.T) {
t.Parallel()

draw := func(r interface{ Intn(int) int }) []int {
out := make([]int, 10)
for i := range out {
out[i] = r.Intn(1000)
}
return out
}

a := draw(oooSeededRng("run-a", 0))
b := draw(oooSeededRng("run-a", 0))
require.Equal(t, a, b, "same (RunID, iteration) must yield the same sequence")

require.NotEqual(t, a, draw(oooSeededRng("run-a", 1)),
"different iteration should diverge")
require.NotEqual(t, a, draw(oooSeededRng("run-b", 0)),
"different RunID should diverge")
}

func TestOutOfOrderSignals_BuildShuffledSignals(t *testing.T) {
t.Parallel()

const n = 6

t.Run("zero shuffle keeps event order and grows state cumulatively", func(t *testing.T) {
seq := buildShuffledSignals(n, 0, oooSeededRng("run", 0))
require.Len(t, seq.ActionSets, 1)
signals := seq.ActionSets[0].Actions
require.Len(t, signals, n)

for pos, sig := range signals {
kvs := oooSignalKvs(t, sig)
// Without shuffling, position p carries events 1..p+1.
require.Len(t, kvs, pos+1)
for e := 1; e <= pos+1; e++ {
require.Equal(t, oooSignalReadyValue, kvs[oooEventKey(e)],
"position %d should include %s", pos, oooEventKey(e))
}
}
})

t.Run("full shuffle still delivers every event exactly once with monotonic state", func(t *testing.T) {
seq := buildShuffledSignals(n, 100, oooSeededRng("run", 0))
signals := seq.ActionSets[0].Actions
require.Len(t, signals, n)

seenNewKey := map[string]bool{}
prevSize := 0
for pos, sig := range signals {
kvs := oooSignalKvs(t, sig)
// Each signal adds exactly one new key to the cumulative snapshot.
require.Equal(t, prevSize+1, len(kvs), "snapshot must grow by one at position %d", pos)
prevSize = len(kvs)
for k := range kvs {
seenNewKey[k] = true
}
}
require.Len(t, seenNewKey, n, "every event key should appear")
for e := 1; e <= n; e++ {
require.True(t, seenNewKey[oooEventKey(e)], "missing %s", oooEventKey(e))
}
})

t.Run("reproducible for a fixed seed", func(t *testing.T) {
s1 := buildShuffledSignals(n, 100, oooSeededRng("run", 7))
s2 := buildShuffledSignals(n, 100, oooSeededRng("run", 7))
require.True(t, proto.Equal(s1, s2), "same seed must produce identical signal sequences")
})
}

func TestOutOfOrderSignals_BuildOrderedAwaitActions(t *testing.T) {
t.Parallel()

const n = 4

t.Run("without processing time", func(t *testing.T) {
sets := buildOrderedAwaitActions(n, 0)
require.Len(t, sets, 1)
actions := sets[0].Actions
require.Len(t, actions, n+1)
for i := 0; i < n; i++ {
await := actions[i].GetAwaitWorkflowState()
require.NotNil(t, await, "action %d should await workflow state", i)
require.Equal(t, oooEventKey(i+1), await.Key)
require.Equal(t, oooSignalReadyValue, await.Value)
}
require.NotNil(t, actions[n].GetReturnResult())
})

t.Run("with processing time inserts an activity per event", func(t *testing.T) {
sets := buildOrderedAwaitActions(n, time.Millisecond)
actions := sets[0].Actions
require.Len(t, actions, 2*n+1)
for i := 0; i < n; i++ {
require.NotNil(t, actions[2*i].GetAwaitWorkflowState())
require.NotNil(t, actions[2*i+1].GetExecActivity())
}
require.NotNil(t, actions[2*n].GetReturnResult())
})
}

func TestOutOfOrderSignals(t *testing.T) {
t.Parallel()

env := workers.SetupTestEnvironment(t,
workers.WithExecutorTimeout(1*time.Minute))

baseRunID := fmt.Sprintf("ooo-%d", time.Now().Unix())

run := func(t *testing.T, suffix string, opts map[string]string) {
executor := loadgen.GetScenario("out_of_order_signals").ExecutorFn()
_, err := env.RunExecutorTest(t, executor, loadgen.ScenarioInfo{
RunID: baseRunID + suffix,
Configuration: loadgen.RunConfiguration{
Iterations: 3,
MaxConcurrent: 3,
},
ScenarioOptions: opts,
}, clioptions.LangGo)
require.NoError(t, err)
}

t.Run("shuffled signals processed in order", func(t *testing.T) {
run(t, "-shuffle", map[string]string{
oooSignalsCountFlag: "5",
oooSignalsShufflePercentageFlag: "100",
})
})

t.Run("in-order signals with per-signal processing", func(t *testing.T) {
run(t, "-inorder", map[string]string{
oooSignalsCountFlag: "4",
oooSignalsShufflePercentageFlag: "0",
oooSignalsProcessingTimeFlag: "1ms",
})
})
}

func TestOutOfOrderSignals_ParseConfig(t *testing.T) {
t.Parallel()

t.Run("defaults", func(t *testing.T) {
info := &loadgen.ScenarioInfo{ScenarioOptions: map[string]string{}}
cfg, err := parseOutOfOrderSignalsConfig(info)
require.NoError(t, err)
require.Equal(t, 10, cfg.signalsPerWorkflow)
require.Equal(t, 100, cfg.shufflePercentage)
require.Equal(t, time.Duration(0), cfg.processingTime)
})

t.Run("overrides", func(t *testing.T) {
info := &loadgen.ScenarioInfo{ScenarioOptions: map[string]string{
oooSignalsCountFlag: "5",
oooSignalsShufflePercentageFlag: "50",
oooSignalsProcessingTimeFlag: "250ms",
}}
cfg, err := parseOutOfOrderSignalsConfig(info)
require.NoError(t, err)
require.Equal(t, 5, cfg.signalsPerWorkflow)
require.Equal(t, 50, cfg.shufflePercentage)
require.Equal(t, 250*time.Millisecond, cfg.processingTime)
})

t.Run("rejects invalid values", func(t *testing.T) {
cases := map[string]map[string]string{
"non-positive count": {oooSignalsCountFlag: "0"},
"shuffle below range": {oooSignalsShufflePercentageFlag: "-1"},
"shuffle above range": {oooSignalsShufflePercentageFlag: "101"},
"negative processing": {oooSignalsProcessingTimeFlag: "-1s"},
}
for name, opts := range cases {
t.Run(name, func(t *testing.T) {
_, err := parseOutOfOrderSignalsConfig(&loadgen.ScenarioInfo{ScenarioOptions: opts})
require.Error(t, err)
})
}
})
}
Loading