From 8b09a35e37a4f4cba7bce770a62ff3c2284d940e Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 14 May 2026 09:28:00 -0400 Subject: [PATCH 01/10] Typescript - support worker profile env var --- README.md | 12 ++ cmd/cli/run_worker.go | 1 + workers/run.go | 19 +++ workers/run_test.go | 42 +++++++ workers/typescript/harness/profiles.ts | 29 +++++ .../typescript/harness/tests/worker.test.ts | 119 ++++++++++++++++-- workers/typescript/harness/worker.ts | 21 +++- 7 files changed, 232 insertions(+), 11 deletions(-) create mode 100644 workers/run_test.go create mode 100644 workers/typescript/harness/profiles.ts diff --git a/README.md b/README.md index 11eb2693..f3fea7d3 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,18 @@ Notes: - `--task-queue-suffix-index-start` and `--task-queue-suffix-index-end` represent an inclusive range for running the worker on multiple task queues. The process will create a worker for every task queue from `-` through `-end`. This only applies to multi-task-queue scenarios. +- `--worker-profile` selects a code-defined worker configuration profile in the language harness. Omes forwards the + selected profile to the worker process via `OMES_WORKER_PROFILE`; the profile is not a worker CLI flag. + +Worker profiles provide a named worker configuration selected with `--worker-profile`. When a profile is selected, it +takes precedence over worker configuration flags such as poller counts, autoscale poller settings, activity rate limits, +slot counts, and worker versioning options. Non-worker-configuration flags, such as task queue selection, client +connection settings, logging, metrics, and `--worker-err-on-unimplemented`, continue to apply normally. + +```sh +go run ./cmd run-scenario-with-worker --scenario throughput_stress --language go \ + --run-id local-profile-test --worker-profile resource-based-default +``` ### Run a test scenario diff --git a/cmd/cli/run_worker.go b/cmd/cli/run_worker.go index 8982e7d9..5146664d 100644 --- a/cmd/cli/run_worker.go +++ b/cmd/cli/run_worker.go @@ -57,6 +57,7 @@ func (r *workerRunner) addCLIFlags(fs *pflag.FlagSet) { fs.StringVar(&r.EmbeddedServerAddress, "embedded-server-address", "", "Address to bind local embedded server to") fs.IntVar(&r.TaskQueueIndexSuffixStart, "task-queue-suffix-index-start", 0, "Inclusive start for task queue suffix range") fs.IntVar(&r.TaskQueueIndexSuffixEnd, "task-queue-suffix-index-end", 0, "Inclusive end for task queue suffix range") + fs.StringVar(&r.WorkerProfile, "worker-profile", "", "Worker configuration profile to apply in the worker harness") fs.AddFlagSet(r.ClientOptions.FlagSet()) fs.AddFlagSet(r.MetricsOptions.FlagSet("worker-")) fs.AddFlagSet(r.WorkerOptions.FlagSet()) diff --git a/workers/run.go b/workers/run.go index 07257b52..041d2b85 100644 --- a/workers/run.go +++ b/workers/run.go @@ -29,6 +29,7 @@ type Runner struct { TaskQueueName string TaskQueueIndexSuffixStart int TaskQueueIndexSuffixEnd int + WorkerProfile string ScenarioID clioptions.ScenarioID ClientOptions clioptions.ClientOptions MetricsOptions clioptions.MetricsOptions @@ -161,6 +162,7 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { if err != nil { return fmt.Errorf("failed creating command: %w", err) } + cmd.Env = withWorkerProfileEnv(cmd.Environ(), r.WorkerProfile) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // set process group ID for shutdown // Direct logging output to provided logger, if available. @@ -232,6 +234,23 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { } } +const WorkerProfileEnvVar = "OMES_WORKER_PROFILE" + +func withWorkerProfileEnv(environ []string, profile string) []string { + const prefix = WorkerProfileEnvVar + "=" + nextEnv := make([]string, 0, len(environ)+1) + for _, item := range environ { + if strings.HasPrefix(item, prefix) { + continue + } + nextEnv = append(nextEnv, item) + } + if profile != "" { + nextEnv = append(nextEnv, prefix+profile) + } + return nextEnv +} + func passthrough(fs *pflag.FlagSet, prefix string) (flags []string) { fs.VisitAll(func(f *pflag.Flag) { if !f.Changed { diff --git a/workers/run_test.go b/workers/run_test.go new file mode 100644 index 00000000..654c2529 --- /dev/null +++ b/workers/run_test.go @@ -0,0 +1,42 @@ +package workers + +import ( + "slices" + "strings" + "testing" +) + +func TestWithWorkerProfileEnvSetsProfile(t *testing.T) { + env := withWorkerProfileEnv([]string{"PATH=/bin", "HOME=/tmp"}, "resource-based-default") + + if !slices.Contains(env, "OMES_WORKER_PROFILE=resource-based-default") { + t.Fatalf("expected worker profile env, got %#v", env) + } +} + +func TestWithWorkerProfileEnvReplacesExistingProfile(t *testing.T) { + env := withWorkerProfileEnv( + []string{"OMES_WORKER_PROFILE=old", "PATH=/bin", "OMES_WORKER_PROFILE=older"}, + "resource-based-default", + ) + + var matches []string + for _, item := range env { + if strings.HasPrefix(item, "OMES_WORKER_PROFILE=") { + matches = append(matches, item) + } + } + if len(matches) != 1 || matches[0] != "OMES_WORKER_PROFILE=resource-based-default" { + t.Fatalf("expected one replaced worker profile env var, got %#v", matches) + } +} + +func TestWithWorkerProfileEnvClearsAmbientProfileWhenUnset(t *testing.T) { + env := withWorkerProfileEnv([]string{"OMES_WORKER_PROFILE=old", "PATH=/bin"}, "") + + for _, item := range env { + if strings.HasPrefix(item, "OMES_WORKER_PROFILE=") { + t.Fatalf("expected no worker profile env var, got %#v", env) + } + } +} diff --git a/workers/typescript/harness/profiles.ts b/workers/typescript/harness/profiles.ts new file mode 100644 index 00000000..1b1c1342 --- /dev/null +++ b/workers/typescript/harness/profiles.ts @@ -0,0 +1,29 @@ +import type { WorkerOptions } from '@temporalio/worker'; + +export const WORKER_PROFILE_ENV_VAR = 'OMES_WORKER_PROFILE'; +export const RESOURCE_BASED_DEFAULT_PROFILE = 'resource-based-default'; + +export type WorkerProfile = Readonly>; + +const profiles = new Map(); + +function registerWorkerProfile(name: string, profile: WorkerProfile): void { + profiles.set(name, profile); +} + +export function lookupWorkerProfile(name: string): Partial { + const profile = profiles.get(name); + if (profile === undefined) { + throw new Error(`Unknown worker profile "${name}"`); + } + return { ...profile }; +} + +registerWorkerProfile(RESOURCE_BASED_DEFAULT_PROFILE, { + tuner: { + tunerOptions: { + targetMemoryUsage: 0.8, + targetCpuUsage: 0.8, + }, + }, +}); diff --git a/workers/typescript/harness/tests/worker.test.ts b/workers/typescript/harness/tests/worker.test.ts index 6bd901b2..ba4db8e0 100644 --- a/workers/typescript/harness/tests/worker.test.ts +++ b/workers/typescript/harness/tests/worker.test.ts @@ -5,6 +5,7 @@ import type { ClientConfig } from '../client.js'; import type { Worker } from '@temporalio/worker'; import { runWorker, runWorkers, type WorkerContext, type WorkerFactory } from '../worker.js'; import { makeClient } from './test-helpers.js'; +import { RESOURCE_BASED_DEFAULT_PROFILE } from '../profiles.js'; interface WorkerFactoryCall { client: Client; @@ -51,14 +52,20 @@ void test('runWorker passes shared client and context to each worker factory', a return client; }; - await runWorker(workerFactory, clientFactory, neverInterrupt(), [ - '--task-queue', - 'omes', - '--task-queue-suffix-index-start', - '1', - '--task-queue-suffix-index-end', - '2', - ]); + await runWorker( + workerFactory, + clientFactory, + neverInterrupt(), + [ + '--task-queue', + 'omes', + '--task-queue-suffix-index-start', + '1', + '--task-queue-suffix-index-end', + '2', + ], + undefined, + ); assert.strictEqual(workerFactoryCalls[0]?.client, client); assert.strictEqual(workerFactoryCalls[1]?.client, client); @@ -103,3 +110,99 @@ void test('runWorkers shuts down all workers when one fails', async () => { assert.equal(failingWorker.shutdownCalls, 1); assert.equal(successfulWorker.shutdownCalls, 1); }); + +void test('runWorker applies resource based worker profile', async () => { + const client = makeClient(); + const workerFactoryCalls: WorkerFactoryCall[] = []; + const workerFactory: WorkerFactory = async (receivedClient, context) => { + workerFactoryCalls.push({ + client: receivedClient, + context, + }); + return makeTestWorker(async () => undefined) as unknown as Worker; + }; + const clientFactory = async (_config: ClientConfig): Promise => client; + + await runWorker( + workerFactory, + clientFactory, + neverInterrupt(), + [], + RESOURCE_BASED_DEFAULT_PROFILE, + ); + + assert.deepEqual(workerFactoryCalls[0]?.context.workerOptions, { + tuner: { + tunerOptions: { + targetMemoryUsage: 0.8, + targetCpuUsage: 0.8, + }, + }, + }); +}); + +void test('runWorker ignores worker option flags when profile is selected', async () => { + const client = makeClient(); + const workerFactoryCalls: WorkerFactoryCall[] = []; + const workerFactory: WorkerFactory = async (receivedClient, context) => { + workerFactoryCalls.push({ + client: receivedClient, + context, + }); + return makeTestWorker(async () => undefined) as unknown as Worker; + }; + const clientFactory = async (_config: ClientConfig): Promise => client; + + await runWorker( + workerFactory, + clientFactory, + neverInterrupt(), + ['--max-concurrent-activities', '50'], + RESOURCE_BASED_DEFAULT_PROFILE, + ); + + assert.deepEqual(workerFactoryCalls[0]?.context.workerOptions, { + tuner: { + tunerOptions: { + targetMemoryUsage: 0.8, + targetCpuUsage: 0.8, + }, + }, + }); +}); + +void test('runWorker applies worker option flags when profile is not selected', async () => { + const client = makeClient(); + const workerFactoryCalls: WorkerFactoryCall[] = []; + const workerFactory: WorkerFactory = async (receivedClient, context) => { + workerFactoryCalls.push({ + client: receivedClient, + context, + }); + return makeTestWorker(async () => undefined) as unknown as Worker; + }; + const clientFactory = async (_config: ClientConfig): Promise => client; + + await runWorker( + workerFactory, + clientFactory, + neverInterrupt(), + ['--max-concurrent-activities', '50'], + ); + + assert.deepEqual(workerFactoryCalls[0]?.context.workerOptions, { + maxConcurrentActivityTaskExecutions: 50, + }); +}); + +void test('runWorker rejects unknown worker profile', async () => { + const client = makeClient(); + const workerFactory: WorkerFactory = async () => + makeTestWorker(async () => undefined) as unknown as Worker; + const clientFactory = async (_config: ClientConfig): Promise => client; + + await assert.rejects( + () => runWorker(workerFactory, clientFactory, neverInterrupt(), [], 'nope'), + /Unknown worker profile "nope"/, + ); +}); diff --git a/workers/typescript/harness/worker.ts b/workers/typescript/harness/worker.ts index 5913a083..b7d6e989 100644 --- a/workers/typescript/harness/worker.ts +++ b/workers/typescript/harness/worker.ts @@ -3,6 +3,7 @@ import { Logger, Worker, WorkerOptions } from '@temporalio/worker'; import { Command } from 'commander'; import { buildClientConfig, ClientFactory } from './client'; import { configureLogger } from './helpers'; +import { lookupWorkerProfile, WORKER_PROFILE_ENV_VAR } from './profiles'; export interface WorkerContext { logger: Logger; @@ -50,7 +51,13 @@ export async function runWorkerCli( const onInterrupt = () => resolveInterrupt(); process.once('SIGINT', onInterrupt); try { - await runWorker(workerFactory, clientFactory, interruptPromise, argv); + await runWorker( + workerFactory, + clientFactory, + interruptPromise, + argv, + process.env[WORKER_PROFILE_ENV_VAR], + ); } finally { process.off('SIGINT', onInterrupt); } @@ -62,6 +69,7 @@ export async function runWorker( interruptPromise: Promise, // sdkbuild's prepared package currently uses Commander 8 types, whose parse API expects string[]. argv: string[], + workerProfile?: string, ): Promise { const args = buildParser().parse(argv, { from: 'user' }).opts(); @@ -93,7 +101,7 @@ export async function runWorker( args.taskQueueSuffixIndexStart, args.taskQueueSuffixIndexEnd, ); - const workerOptions = buildWorkerOptions(args); + const workerOptions = buildWorkerOptions(args, workerProfile); const workers = await Promise.all( taskQueues.map( async (taskQueue) => @@ -229,7 +237,14 @@ function buildTaskQueues( return taskQueues; } -function buildWorkerOptions(options: WorkerCliOptions): Partial { +function buildWorkerOptions( + options: WorkerCliOptions, + profileName?: string, +): Partial { + if (profileName !== undefined && profileName !== '') { + return lookupWorkerProfile(profileName); + } + const workerOptions: Partial = {}; if (options.activityPollerAutoscaleMax !== undefined) { workerOptions.activityTaskPollerBehavior = { From b379438d1f4ffa711b2150fadc4e4ad0aef62906 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 14 May 2026 13:14:10 -0400 Subject: [PATCH 02/10] support worker profile env var for Python, Go, Ruby, .NET --- workers/dotnet/projects/harness/Profiles.cs | 31 ++++++ workers/dotnet/projects/harness/Worker.cs | 19 +++- .../projects/harness/tests/WorkerTests.cs | 95 ++++++++++++++++++- workers/go/go.mod | 8 ++ workers/go/go.sum | 21 ++++ workers/go/harness/profiles.go | 41 ++++++++ workers/go/harness/worker.go | 8 +- workers/go/harness/worker_test.go | 61 ++++++++++++ .../python/harness/src/harness/profiles.py | 35 +++++++ workers/python/harness/src/harness/worker.py | 20 +++- .../harness/tests/test_harness_worker.py | 55 ++++++++++- workers/ruby/harness/lib/harness/profiles.rb | 34 +++++++ workers/ruby/harness/lib/harness/worker.rb | 14 ++- workers/ruby/harness/sig/harness.rbs | 8 +- workers/ruby/harness/tests/test_worker.rb | 48 +++++++++- workers/run.go | 23 +---- workers/run_test.go | 42 -------- 17 files changed, 484 insertions(+), 79 deletions(-) create mode 100644 workers/dotnet/projects/harness/Profiles.cs create mode 100644 workers/go/harness/profiles.go create mode 100644 workers/python/harness/src/harness/profiles.py create mode 100644 workers/ruby/harness/lib/harness/profiles.rb delete mode 100644 workers/run_test.go diff --git a/workers/dotnet/projects/harness/Profiles.cs b/workers/dotnet/projects/harness/Profiles.cs new file mode 100644 index 00000000..a2a5e1d6 --- /dev/null +++ b/workers/dotnet/projects/harness/Profiles.cs @@ -0,0 +1,31 @@ +using Temporalio.Worker; +using Temporalio.Worker.Tuning; + +namespace Temporalio.Omes.Projects.Harness; + +internal static class WorkerProfiles +{ + public const string EnvVarName = "OMES_WORKER_PROFILE"; + public const string ResourceBasedDefaultProfile = "resource-based-default"; + + private static readonly IReadOnlyDictionary Profiles = + new Dictionary + { + [ResourceBasedDefaultProfile] = new TemporalWorkerOptions + { + Tuner = WorkerTuner.CreateResourceBased( + targetMemoryUsage: 0.8, + targetCpuUsage: 0.8), + }, + }; + + public static TemporalWorkerOptions Lookup(string name) + { + if (!Profiles.TryGetValue(name, out var profile)) + { + throw new ArgumentException($"Unknown worker profile \"{name}\""); + } + + return (TemporalWorkerOptions)profile.Clone(); + } +} diff --git a/workers/dotnet/projects/harness/Worker.cs b/workers/dotnet/projects/harness/Worker.cs index 1bf1d89b..76bcc075 100644 --- a/workers/dotnet/projects/harness/Worker.cs +++ b/workers/dotnet/projects/harness/Worker.cs @@ -138,7 +138,8 @@ await RunCoreAsync( workerFactory: (client, workerContext) => app.Worker(client, workerContext), clientFactory: app.ClientFactory, options: options, - runWorkersAsync: RunWorkersAsync); + runWorkersAsync: RunWorkersAsync, + workerProfile: Environment.GetEnvironmentVariable(WorkerProfiles.EnvVarName)); context.ExitCode = 0; } catch (ArgumentException err) @@ -154,7 +155,8 @@ internal static async Task RunCoreAsync( WorkerFactoryCore workerFactory, ClientFactory clientFactory, WorkerCliOptions options, - Func, Task> runWorkersAsync) + Func, Task> runWorkersAsync, + string? workerProfile = null) { if (options.TaskQueueSuffixIndexStart > options.TaskQueueSuffixIndexEnd) { @@ -173,7 +175,7 @@ internal static async Task RunCoreAsync( promListenAddress: options.PromListenAddress, loggerFactory: loggerFactory); - var workerSettings = BuildWorkerSettings(options); + var workerSettings = BuildWorkerSettings(options, workerProfile); var client = await clientFactory(clientConfig); var taskQueues = BuildTaskQueues( logger, @@ -303,8 +305,9 @@ private static List BuildTaskQueues( return taskQueues; } - private static WorkerSettings BuildWorkerSettings(WorkerCliOptions options) => + private static WorkerSettings BuildWorkerSettings(WorkerCliOptions options, string? profile) => new( + Profile: profile, MaxConcurrentActivities: options.MaxConcurrentActivities, MaxConcurrentWorkflowTasks: options.MaxConcurrentWorkflowTasks, ActivitiesPerSecond: options.ActivitiesPerSecond, @@ -315,6 +318,13 @@ private static WorkerSettings BuildWorkerSettings(WorkerCliOptions options) => private static TemporalWorkerOptions BuildWorkerOptions(string taskQueue, WorkerSettings settings) { + if (!string.IsNullOrEmpty(settings.Profile)) + { + var profileOptions = WorkerProfiles.Lookup(settings.Profile); + profileOptions.TaskQueue = taskQueue; + return profileOptions; + } + var workerOptions = new TemporalWorkerOptions(taskQueue); if (settings.MaxConcurrentWorkflowTasks is { } maxConcurrentWorkflowTasks) { @@ -406,6 +416,7 @@ internal static RootCommand CreateWorkerCommand() } private sealed record WorkerSettings( + string? Profile, int? MaxConcurrentActivities, int? MaxConcurrentWorkflowTasks, double? ActivitiesPerSecond, diff --git a/workers/dotnet/projects/harness/tests/WorkerTests.cs b/workers/dotnet/projects/harness/tests/WorkerTests.cs index 32232eda..b17fd987 100644 --- a/workers/dotnet/projects/harness/tests/WorkerTests.cs +++ b/workers/dotnet/projects/harness/tests/WorkerTests.cs @@ -1,6 +1,7 @@ using System.CommandLine; using Temporalio.Client; using Temporalio.Omes.Projects.Harness; +using Temporalio.Worker; using Xunit; namespace Temporalio.Omes.Projects.Tests.HarnessTests; @@ -45,7 +46,8 @@ await WorkerHarness.RunCoreAsync( { runWorkersInput = workers; return Task.CompletedTask; - }); + }, + workerProfile: null); Assert.All(seenClients, client => Assert.Same(sharedClient, client)); Assert.Equal(["omes-1", "omes-2"], createdWorkers); @@ -57,6 +59,97 @@ await WorkerHarness.RunCoreAsync( Assert.Null(capturedConfig.Tls); } + [Fact] + public async Task RunAppliesResourceBasedWorkerProfile() + { + var sharedClient = HarnessTestSupport.CreateStrictTemporalClientProbe(); + TemporalWorkerOptions? capturedOptions = null; + var options = WorkerHarness.ParseWorkerOptions( + WorkerHarness.CreateWorkerCommand().Parse(["--log-level", "panic"])); + + await WorkerHarness.RunCoreAsync( + workerFactory: (_, context) => + { + capturedOptions = context.WorkerOptions; + return context.TaskQueue; + }, + clientFactory: _ => Task.FromResult(sharedClient), + options: options, + runWorkersAsync: _ => Task.CompletedTask, + workerProfile: WorkerProfiles.ResourceBasedDefaultProfile); + + Assert.NotNull(capturedOptions); + Assert.NotNull(capturedOptions!.Tuner); + } + + [Fact] + public async Task RunIgnoresWorkerOptionFlagsWhenProfileIsSelected() + { + var sharedClient = HarnessTestSupport.CreateStrictTemporalClientProbe(); + TemporalWorkerOptions? capturedOptions = null; + var options = WorkerHarness.ParseWorkerOptions( + WorkerHarness.CreateWorkerCommand().Parse( + ["--log-level", "panic", "--max-concurrent-activities", "50"])); + + await WorkerHarness.RunCoreAsync( + workerFactory: (_, context) => + { + capturedOptions = context.WorkerOptions; + return context.TaskQueue; + }, + clientFactory: _ => Task.FromResult(sharedClient), + options: options, + runWorkersAsync: _ => Task.CompletedTask, + workerProfile: WorkerProfiles.ResourceBasedDefaultProfile); + + Assert.NotNull(capturedOptions); + Assert.NotNull(capturedOptions!.Tuner); + Assert.NotEqual(50, capturedOptions.MaxConcurrentActivities); + } + + [Fact] + public async Task RunAppliesWorkerOptionFlagsWithoutProfile() + { + var sharedClient = HarnessTestSupport.CreateStrictTemporalClientProbe(); + TemporalWorkerOptions? capturedOptions = null; + var options = WorkerHarness.ParseWorkerOptions( + WorkerHarness.CreateWorkerCommand().Parse( + ["--log-level", "panic", "--max-concurrent-activities", "50"])); + + await WorkerHarness.RunCoreAsync( + workerFactory: (_, context) => + { + capturedOptions = context.WorkerOptions; + return context.TaskQueue; + }, + clientFactory: _ => Task.FromResult(sharedClient), + options: options, + runWorkersAsync: _ => Task.CompletedTask, + workerProfile: null); + + Assert.NotNull(capturedOptions); + Assert.Equal(50, capturedOptions!.MaxConcurrentActivities); + } + + [Fact] + public async Task RunRejectsUnknownWorkerProfile() + { + var sharedClient = HarnessTestSupport.CreateStrictTemporalClientProbe(); + var options = WorkerHarness.ParseWorkerOptions( + WorkerHarness.CreateWorkerCommand().Parse(["--log-level", "panic"])); + + var error = await Assert.ThrowsAsync( + () => + WorkerHarness.RunCoreAsync( + workerFactory: (_, context) => context.TaskQueue, + clientFactory: _ => Task.FromResult(sharedClient), + options: options, + runWorkersAsync: _ => Task.CompletedTask, + workerProfile: "nope")); + + Assert.Contains("Unknown worker profile \"nope\"", error.Message); + } + [Fact] public async Task WorkerModeCancelsRemainingWorkersWhenOneFails() { diff --git a/workers/go/go.mod b/workers/go/go.mod index 91b50fa7..2e7a1c4b 100644 --- a/workers/go/go.mod +++ b/workers/go/go.mod @@ -16,6 +16,7 @@ require ( go.temporal.io/api v1.62.11 go.temporal.io/sdk v1.43.0 go.temporal.io/sdk/contrib/aws/lambdaworker v0.1.1 + go.temporal.io/sdk/contrib/sysinfo v0.1.0 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.79.3 ) @@ -39,10 +40,14 @@ require ( github.com/aws/smithy-go v1.25.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cilium/ebpf v0.11.0 // indirect + github.com/containerd/cgroups/v3 v3.0.3 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/ebitengine/purego v0.9.0 // indirect github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/go-ole/go-ole v1.2.6 // indirect + github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect @@ -54,6 +59,7 @@ require ( github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/parquet-go/parquet-go v0.25.1 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -63,6 +69,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/shirou/gopsutil/v4 v4.25.10 // indirect + github.com/sirupsen/logrus v1.9.1 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.11.1 // indirect github.com/tklauser/go-sysconf v0.3.15 // indirect @@ -70,6 +77,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.temporal.io/sdk/contrib/envconfig v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.40.0 // indirect diff --git a/workers/go/go.sum b/workers/go/go.sum index 7f0c0a3a..65a66a3e 100644 --- a/workers/go/go.sum +++ b/workers/go/go.sum @@ -38,6 +38,12 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cilium/ebpf v0.11.0 h1:V8gS/bTCCjX9uUnkUFUpPsksM8n1lXBAvHcpiFk1X2Y= +github.com/cilium/ebpf v0.11.0/go.mod h1:WE7CZAnqOL2RouJ4f1uyNhqr2P4CCvXFIqdRDUgWsVs= +github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0= +github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0= +github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -45,6 +51,8 @@ github.com/ebitengine/purego v0.9.0 h1:mh0zpKBIXDceC63hpvPuGLiJ8ZAa3DfrFTudmfi8A github.com/ebitengine/purego v0.9.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw= github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA= +github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= +github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -53,6 +61,8 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/gofrs/flock v0.13.0 h1:95JolYOvGMqeH31+FC7D2+uULf6mG61mEZ/A8dRYMzw= github.com/gofrs/flock v0.13.0/go.mod h1:jxeyy9R1auM5S6JYDBhDt+E2TCo7DkratH4Pgi8P+Z0= +github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= @@ -94,6 +104,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nexus-rpc/sdk-go v0.6.0 h1:QRgnP2zTbxEbiyWG/aXH8uSC5LV/Mg1fqb19jb4DBlo= github.com/nexus-rpc/sdk-go v0.6.0/go.mod h1:FHdPfVQwRuJFZFTF0Y2GOAxCrbIBNrcPna9slkGKPYk= +github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= +github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/parquet-go/parquet-go v0.25.1 h1:l7jJwNM0xrk0cnIIptWMtnSnuxRkwq53S+Po3KG8Xgo= github.com/parquet-go/parquet-go v0.25.1/go.mod h1:AXBuotO1XiBtcqJb/FKFyjBG4aqa3aQAAWF3ZPzCanY= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -116,12 +128,15 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/shirou/gopsutil/v4 v4.25.10 h1:at8lk/5T1OgtuCp+AwrDofFRjnvosn0nkN2OLQ6g8tA= github.com/shirou/gopsutil/v4 v4.25.10/go.mod h1:+kSwyC8DRUD9XXEHCAFjK+0nuArFJM0lva+StQAcskM= +github.com/sirupsen/logrus v1.9.1 h1:Ou41VVR3nMWWmTiEUnj0OlsgOSCUFgsPAOl6jRIcVtQ= +github.com/sirupsen/logrus v1.9.1/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/temporalio/features v0.0.0-20260427223549-86e4c0deedd7 h1:gBLwgyi8xw0oqZgxMwxTRGIfP8RxtI7r1igm3G6aXGY= @@ -155,6 +170,8 @@ go.temporal.io/sdk/contrib/aws/lambdaworker v0.1.1 h1:AQBa7CN+EOWhZaf4vr46TfxTZM go.temporal.io/sdk/contrib/aws/lambdaworker v0.1.1/go.mod h1:Rgn/tlb4MDNAAjnXKNgHui4IY+MogMCk4Y4c2YA6Dcc= go.temporal.io/sdk/contrib/envconfig v1.0.0 h1:1Q/swVgB4EW/p3k7rI9/4hpU4/DC57FSRbU90+UisXw= go.temporal.io/sdk/contrib/envconfig v1.0.0/go.mod h1:Pj4N1lwUEvxap6quBm8GrVMSUMJhSZkVtxjt3AYnPPg= +go.temporal.io/sdk/contrib/sysinfo v0.1.0 h1:jqP6BFZ/n+BoTt/OnlD+llufgSJSoxlq+aAIZCxnBdI= +go.temporal.io/sdk/contrib/sysinfo v0.1.0/go.mod h1:1i61HTH1W1M331FprZF3SHeLFhZQSmb/YEbFsPhI654= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -164,6 +181,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= +golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -190,6 +209,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -223,5 +243,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/workers/go/harness/profiles.go b/workers/go/harness/profiles.go new file mode 100644 index 00000000..5fcfc054 --- /dev/null +++ b/workers/go/harness/profiles.go @@ -0,0 +1,41 @@ +package harness + +import ( + "fmt" + + "go.temporal.io/sdk/contrib/sysinfo" + sdkworker "go.temporal.io/sdk/worker" +) + +const ( + workerProfileEnvVar = "OMES_WORKER_PROFILE" + resourceBasedDefaultWorkerProfile = "resource-based-default" +) + +var workerProfiles = map[string]sdkworker.Options{} + +func registerWorkerProfile(name string, profile sdkworker.Options) { + workerProfiles[name] = profile +} + +func lookupWorkerProfile(name string) (sdkworker.Options, error) { + profile, ok := workerProfiles[name] + if !ok { + return sdkworker.Options{}, fmt.Errorf("unknown worker profile %q", name) + } + return profile, nil +} + +func init() { + tuner, err := sdkworker.NewResourceBasedTuner(sdkworker.ResourceBasedTunerOptions{ + TargetMem: 0.8, + TargetCpu: 0.8, + InfoSupplier: sysinfo.SysInfoProvider(), + }) + if err != nil { + panic(err) + } + registerWorkerProfile(resourceBasedDefaultWorkerProfile, sdkworker.Options{ + Tuner: tuner, + }) +} diff --git a/workers/go/harness/worker.go b/workers/go/harness/worker.go index dc76a188..cbc57241 100644 --- a/workers/go/harness/worker.go +++ b/workers/go/harness/worker.go @@ -50,7 +50,7 @@ func runWorkerCLI(workerFactory WorkerFactory, clientFactory ClientFactory, argv if options.taskQueueSuffixIndexStart > options.taskQueueSuffixIndexEnd { return fmt.Errorf("task queue suffix start after end") } - workerOptions, err := buildWorkerOptions(options.flags, options.workerOptions) + workerOptions, err := buildWorkerOptions(options.flags, options.workerOptions, os.Getenv(workerProfileEnvVar)) if err != nil { return err } @@ -128,7 +128,11 @@ func parseVersioningBehavior(value string) (workflow.VersioningBehavior, error) } } -func buildWorkerOptions(flags *pflag.FlagSet, args clioptions.WorkerOptions) (sdkworker.Options, error) { +func buildWorkerOptions(flags *pflag.FlagSet, args clioptions.WorkerOptions, profileName string) (sdkworker.Options, error) { + if profileName != "" { + return lookupWorkerProfile(profileName) + } + options := sdkworker.Options{} if args.DeploymentName != "" { if args.BuildID != "" { diff --git a/workers/go/harness/worker_test.go b/workers/go/harness/worker_test.go index 6bfa52d2..651de85a 100644 --- a/workers/go/harness/worker_test.go +++ b/workers/go/harness/worker_test.go @@ -3,6 +3,7 @@ package harness import ( "context" "errors" + "strings" "testing" "time" @@ -68,6 +69,66 @@ func TestRunWorkerCLIPassesSharedClientAndWorkerContext(t *testing.T) { } } +func TestBuildWorkerOptionsAppliesResourceBasedProfile(t *testing.T) { + options := newWorkerCLIOptions() + if err := options.parse(nil); err != nil { + t.Fatal(err) + } + + workerOptions, err := buildWorkerOptions(options.flags, options.workerOptions, resourceBasedDefaultWorkerProfile) + if err != nil { + t.Fatal(err) + } + if workerOptions.Tuner == nil { + t.Fatal("expected worker profile to set tuner") + } +} + +func TestBuildWorkerOptionsIgnoresWorkerOptionFlagsWhenProfileIsSelected(t *testing.T) { + options := newWorkerCLIOptions() + if err := options.parse([]string{"--max-concurrent-activities", "50"}); err != nil { + t.Fatal(err) + } + + workerOptions, err := buildWorkerOptions(options.flags, options.workerOptions, resourceBasedDefaultWorkerProfile) + if err != nil { + t.Fatal(err) + } + if workerOptions.Tuner == nil { + t.Fatal("expected worker profile to set tuner") + } + if workerOptions.MaxConcurrentActivityExecutionSize != 0 { + t.Fatalf("expected profile to ignore max concurrent activities flag, got %d", workerOptions.MaxConcurrentActivityExecutionSize) + } +} + +func TestBuildWorkerOptionsAppliesWorkerOptionFlagsWithoutProfile(t *testing.T) { + options := newWorkerCLIOptions() + if err := options.parse([]string{"--max-concurrent-activities", "50"}); err != nil { + t.Fatal(err) + } + + workerOptions, err := buildWorkerOptions(options.flags, options.workerOptions, "") + if err != nil { + t.Fatal(err) + } + if workerOptions.MaxConcurrentActivityExecutionSize != 50 { + t.Fatalf("expected max concurrent activities 50, got %d", workerOptions.MaxConcurrentActivityExecutionSize) + } +} + +func TestBuildWorkerOptionsRejectsUnknownProfile(t *testing.T) { + options := newWorkerCLIOptions() + if err := options.parse(nil); err != nil { + t.Fatal(err) + } + + _, err := buildWorkerOptions(options.flags, options.workerOptions, "nope") + if err == nil || !strings.Contains(err.Error(), `unknown worker profile "nope"`) { + t.Fatalf("expected unknown profile error, got %v", err) + } +} + func TestRunWorkersStopsRemainingWorkersOnFailure(t *testing.T) { stopped := make(chan struct{}, 1) diff --git a/workers/python/harness/src/harness/profiles.py b/workers/python/harness/src/harness/profiles.py new file mode 100644 index 00000000..019cb692 --- /dev/null +++ b/workers/python/harness/src/harness/profiles.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from typing import Any + +from temporalio.worker import WorkerTuner + +WORKER_PROFILE_ENV_VAR = "OMES_WORKER_PROFILE" +RESOURCE_BASED_DEFAULT_PROFILE = "resource-based-default" + +WorkerProfile = dict[str, Any] + + +_profiles: dict[str, WorkerProfile] = {} + + +def _register_profile(name: str, profile: WorkerProfile) -> None: + _profiles[name] = profile + + +def lookup_profile(name: str) -> dict[str, Any]: + try: + return dict(_profiles[name]) + except KeyError as err: + raise ValueError(f"Unknown worker profile {name!r}") from err + + +_register_profile( + RESOURCE_BASED_DEFAULT_PROFILE, + { + "tuner": WorkerTuner.create_resource_based( + target_memory_usage=0.8, + target_cpu_usage=0.8, + ) + }, +) diff --git a/workers/python/harness/src/harness/worker.py b/workers/python/harness/src/harness/worker.py index 8c892a35..9dff7d93 100644 --- a/workers/python/harness/src/harness/worker.py +++ b/workers/python/harness/src/harness/worker.py @@ -3,6 +3,7 @@ import argparse import asyncio import logging +import os from collections.abc import Callable, Sequence from contextlib import suppress from dataclasses import dataclass @@ -13,6 +14,7 @@ from harness.client import ClientFactory, build_client_config from harness.helpers import configure_logger +from harness.profiles import WORKER_PROFILE_ENV_VAR, lookup_profile @dataclass(frozen=True) @@ -37,7 +39,13 @@ def run_worker_cli( loop = asyncio.new_event_loop() try: loop.run_until_complete( - _run(worker_factory, client_factory, interrupt_event, argv) + _run( + worker_factory, + client_factory, + interrupt_event, + argv, + os.environ.get(WORKER_PROFILE_ENV_VAR), + ) ) except KeyboardInterrupt: interrupt_event.set() @@ -51,6 +59,7 @@ async def _run( client_factory: ClientFactory, interrupt_event: asyncio.Event, argv: Sequence[str], + worker_profile: str | None = None, ) -> None: args = _build_parser().parse_args(argv) @@ -75,7 +84,7 @@ async def _run( args.task_queue_suffix_index_start, args.task_queue_suffix_index_end, ) - worker_kwargs = _build_worker_kwargs(args) + worker_kwargs = _build_worker_kwargs(args, worker_profile) workers = [ worker_factory( client, @@ -213,7 +222,12 @@ def _build_task_queues( return task_queues -def _build_worker_kwargs(args: argparse.Namespace) -> dict[str, Any]: +def _build_worker_kwargs( + args: argparse.Namespace, profile_name: str | None = None +) -> dict[str, Any]: + if profile_name: + return lookup_profile(profile_name) + worker_kwargs: dict[str, Any] = {} if args.activity_poller_autoscale_max is not None: worker_kwargs["activity_task_poller_behavior"] = PollerBehaviorAutoscaling( diff --git a/workers/python/harness/tests/test_harness_worker.py b/workers/python/harness/tests/test_harness_worker.py index 01193f7d..4e732000 100644 --- a/workers/python/harness/tests/test_harness_worker.py +++ b/workers/python/harness/tests/test_harness_worker.py @@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, Mock, create_autospec, patch from temporalio.client import Client -from temporalio.worker import PollerBehaviorAutoscaling, Worker +from temporalio.worker import Worker, WorkerTuner from harness import worker @@ -55,6 +55,7 @@ async def test_run_passes_shared_client_and_context_to_each_worker_factory( "--task-queue-suffix-index-end", "2", ], + None, ) build_client_config.assert_called_once_with( @@ -76,6 +77,58 @@ async def test_run_passes_shared_client_and_context_to_each_worker_factory( self.assertEqual(first_context.task_queue, "omes-1") self.assertEqual(second_context.task_queue, "omes-2") + async def test_run_applies_resource_based_worker_profile(self) -> None: + client = create_autospec(Client, instance=True, spec_set=True) + captured_contexts: list[worker.WorkerContext] = [] + + def capture_context(_client: Client, context: worker.WorkerContext) -> object: + captured_contexts.append(context) + return object() + + worker_factory = Mock(side_effect=capture_context) + run_workers = AsyncMock() + + with ( + patch.object( + worker, "build_client_config", autospec=True, return_value=object() + ), + patch.object(worker, "_run_workers", new=run_workers), + ): + await worker._run( + worker_factory, + AsyncMock(return_value=client), + asyncio.Event(), + [], + "resource-based-default", + ) + + self.assertIsInstance(captured_contexts[0].worker_kwargs["tuner"], WorkerTuner) + + def test_build_worker_kwargs_ignores_worker_option_flags_when_profile_is_selected( + self, + ) -> None: + args = worker._build_parser().parse_args(["--max-concurrent-activities", "50"]) + + worker_kwargs = worker._build_worker_kwargs(args, "resource-based-default") + + self.assertIsInstance(worker_kwargs["tuner"], WorkerTuner) + self.assertNotIn("max_concurrent_activities", worker_kwargs) + + def test_build_worker_kwargs_applies_worker_option_flags_without_profile( + self, + ) -> None: + args = worker._build_parser().parse_args(["--max-concurrent-activities", "50"]) + + worker_kwargs = worker._build_worker_kwargs(args, None) + + self.assertEqual(worker_kwargs["max_concurrent_activities"], 50) + + def test_build_worker_kwargs_rejects_unknown_profile(self) -> None: + args = worker._build_parser().parse_args([]) + + with self.assertRaisesRegex(ValueError, "Unknown worker profile 'nope'"): + worker._build_worker_kwargs(args, "nope") + async def test_run_workers_shuts_down_all_workers_when_one_fails(self) -> None: async def fail_immediately() -> None: raise RuntimeError("boom") diff --git a/workers/ruby/harness/lib/harness/profiles.rb b/workers/ruby/harness/lib/harness/profiles.rb new file mode 100644 index 00000000..cc6e92a6 --- /dev/null +++ b/workers/ruby/harness/lib/harness/profiles.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +require 'temporalio/worker' + +module Harness + module Profiles + WORKER_PROFILE_ENV_VAR = 'OMES_WORKER_PROFILE' + RESOURCE_BASED_DEFAULT_PROFILE = 'resource-based-default' + + @registry = {} + + module_function + + def register(name, profile) + @registry[name] = profile + end + + def lookup(name) + @registry.fetch(name).dup + rescue KeyError + raise ArgumentError, "Unknown worker profile #{name.inspect}" + end + + register( + RESOURCE_BASED_DEFAULT_PROFILE, + { + tuner: Temporalio::Worker::Tuner.create_resource_based( + target_memory_usage: 0.8, + target_cpu_usage: 0.8 + ) + } + ) + end +end diff --git a/workers/ruby/harness/lib/harness/worker.rb b/workers/ruby/harness/lib/harness/worker.rb index 40870d99..456e5328 100644 --- a/workers/ruby/harness/lib/harness/worker.rb +++ b/workers/ruby/harness/lib/harness/worker.rb @@ -4,6 +4,7 @@ require 'temporalio/worker' require_relative 'client' require_relative 'helpers' +require_relative 'profiles' module Harness WorkerContext = Data.define( @@ -16,7 +17,12 @@ module Harness module WorkerCLI module_function - def run_cli(worker_factory, client_factory, argv) + def run_cli( + worker_factory, + client_factory, + argv, + worker_profile: ENV.fetch(Harness::Profiles::WORKER_PROFILE_ENV_VAR, nil) + ) options = default_options build_parser(options).parse!(Array(argv).dup) @@ -43,7 +49,7 @@ def run_cli(worker_factory, client_factory, argv) options[:task_queue_suffix_index_start], options[:task_queue_suffix_index_end] ) - worker_kwargs = build_worker_kwargs(options) + worker_kwargs = build_worker_kwargs(options, worker_profile) workers = task_queues.map do |task_queue| worker_factory.call( client, @@ -115,7 +121,9 @@ def build_task_queues(logger, task_queue, suffix_start, suffix_end) task_queues end - def build_worker_kwargs(options) + def build_worker_kwargs(options, profile_name = nil) + return Harness::Profiles.lookup(profile_name) unless profile_name.nil? || profile_name.empty? + worker_kwargs = {} if options[:activity_poller_autoscale_max] worker_kwargs[:activity_task_poller_behavior] = Temporalio::Worker::PollerBehavior::Autoscaling.new( diff --git a/workers/ruby/harness/sig/harness.rbs b/workers/ruby/harness/sig/harness.rbs index 1d1b943a..01d5cc91 100644 --- a/workers/ruby/harness/sig/harness.rbs +++ b/workers/ruby/harness/sig/harness.rbs @@ -298,8 +298,8 @@ module Harness end module WorkerCLI - def run_cli: (worker_factory, client_factory, Array[String]) -> void - def self.run_cli: (worker_factory, client_factory, Array[String]) -> void + def run_cli: (worker_factory, client_factory, Array[String], ?worker_profile: String?) -> void + def self.run_cli: (worker_factory, client_factory, Array[String], ?worker_profile: String?) -> void def run: (worker_factory, client_factory, Array[String]) -> void def self.run: (worker_factory, client_factory, Array[String]) -> void def run_workers: (Array[untyped] workers) -> void @@ -308,8 +308,8 @@ module Harness def self.build_parser: (Hash[Symbol, untyped] options) -> OptionParser def build_task_queues: (Logger logger, String task_queue, Integer suffix_start, Integer suffix_end) -> Array[String] def self.build_task_queues: (Logger logger, String task_queue, Integer suffix_start, Integer suffix_end) -> Array[String] - def build_worker_kwargs: (Hash[Symbol, untyped] options) -> Hash[Symbol, untyped] - def self.build_worker_kwargs: (Hash[Symbol, untyped] options) -> Hash[Symbol, untyped] + def build_worker_kwargs: (Hash[Symbol, untyped] options, ?String? profile_name) -> Hash[Symbol, untyped] + def self.build_worker_kwargs: (Hash[Symbol, untyped] options, ?String? profile_name) -> Hash[Symbol, untyped] def parse_bool: (untyped value) -> bool def self.parse_bool: (untyped value) -> bool def default_options: -> Hash[Symbol, untyped] diff --git a/workers/ruby/harness/tests/test_worker.rb b/workers/ruby/harness/tests/test_worker.rb index d1097379..f8d43f1c 100644 --- a/workers/ruby/harness/tests/test_worker.rb +++ b/workers/ruby/harness/tests/test_worker.rb @@ -28,7 +28,8 @@ def test_run_passes_shared_client_and_context_to_each_worker_factory '--task-queue', 'omes', '--task-queue-suffix-index-start', '1', '--task-queue-suffix-index-end', '2' - ] + ], + worker_profile: nil ) end @@ -42,6 +43,51 @@ def test_run_passes_shared_client_and_context_to_each_worker_factory assert_equal 'omes-2', worker_factory_calls[1][1].task_queue end + def test_run_applies_resource_based_worker_profile + captured_context = nil + worker_factory = lambda do |_client, context| + captured_context = context + Object.new + end + + with_stubbed_run_all(->(*_workers, **_kwargs) {}) do + Harness::WorkerCLI.run_cli( + worker_factory, + ->(_config) { Object.new }, + [], + worker_profile: 'resource-based-default' + ) + end + + refute_nil captured_context.worker_kwargs[:tuner] + end + + def test_build_worker_kwargs_ignores_worker_option_flags_when_profile_is_selected + options = Harness::WorkerCLI.default_options + options[:max_concurrent_activities] = 50 + + worker_kwargs = Harness::WorkerCLI.build_worker_kwargs(options, 'resource-based-default') + + refute_nil worker_kwargs[:tuner] + refute_includes worker_kwargs, :max_concurrent_activities + end + + def test_build_worker_kwargs_applies_worker_option_flags_without_profile + options = Harness::WorkerCLI.default_options + options[:max_concurrent_activities] = 50 + + worker_kwargs = Harness::WorkerCLI.build_worker_kwargs(options, nil) + + assert_equal 50, worker_kwargs[:max_concurrent_activities] + end + + def test_build_worker_kwargs_rejects_unknown_worker_profile + error = assert_raises(ArgumentError) do + Harness::WorkerCLI.build_worker_kwargs(Harness::WorkerCLI.default_options, 'nope') + end + assert_match(/Unknown worker profile "nope"/, error.message) + end + private def with_stubbed_run_all(stub_implementation) diff --git a/workers/run.go b/workers/run.go index 041d2b85..c67a342e 100644 --- a/workers/run.go +++ b/workers/run.go @@ -19,6 +19,8 @@ import ( "go.temporal.io/sdk/testsuite" ) +const WorkerProfileEnvVar = "OMES_WORKER_PROFILE" + type Runner struct { Builder AppName string @@ -162,7 +164,9 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { if err != nil { return fmt.Errorf("failed creating command: %w", err) } - cmd.Env = withWorkerProfileEnv(cmd.Environ(), r.WorkerProfile) + if r.WorkerProfile != "" { + cmd.Env = append(cmd.Environ(), WorkerProfileEnvVar+"="+r.WorkerProfile) + } cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // set process group ID for shutdown // Direct logging output to provided logger, if available. @@ -234,23 +238,6 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { } } -const WorkerProfileEnvVar = "OMES_WORKER_PROFILE" - -func withWorkerProfileEnv(environ []string, profile string) []string { - const prefix = WorkerProfileEnvVar + "=" - nextEnv := make([]string, 0, len(environ)+1) - for _, item := range environ { - if strings.HasPrefix(item, prefix) { - continue - } - nextEnv = append(nextEnv, item) - } - if profile != "" { - nextEnv = append(nextEnv, prefix+profile) - } - return nextEnv -} - func passthrough(fs *pflag.FlagSet, prefix string) (flags []string) { fs.VisitAll(func(f *pflag.Flag) { if !f.Changed { diff --git a/workers/run_test.go b/workers/run_test.go deleted file mode 100644 index 654c2529..00000000 --- a/workers/run_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package workers - -import ( - "slices" - "strings" - "testing" -) - -func TestWithWorkerProfileEnvSetsProfile(t *testing.T) { - env := withWorkerProfileEnv([]string{"PATH=/bin", "HOME=/tmp"}, "resource-based-default") - - if !slices.Contains(env, "OMES_WORKER_PROFILE=resource-based-default") { - t.Fatalf("expected worker profile env, got %#v", env) - } -} - -func TestWithWorkerProfileEnvReplacesExistingProfile(t *testing.T) { - env := withWorkerProfileEnv( - []string{"OMES_WORKER_PROFILE=old", "PATH=/bin", "OMES_WORKER_PROFILE=older"}, - "resource-based-default", - ) - - var matches []string - for _, item := range env { - if strings.HasPrefix(item, "OMES_WORKER_PROFILE=") { - matches = append(matches, item) - } - } - if len(matches) != 1 || matches[0] != "OMES_WORKER_PROFILE=resource-based-default" { - t.Fatalf("expected one replaced worker profile env var, got %#v", matches) - } -} - -func TestWithWorkerProfileEnvClearsAmbientProfileWhenUnset(t *testing.T) { - env := withWorkerProfileEnv([]string{"OMES_WORKER_PROFILE=old", "PATH=/bin"}, "") - - for _, item := range env { - if strings.HasPrefix(item, "OMES_WORKER_PROFILE=") { - t.Fatalf("expected no worker profile env var, got %#v", env) - } - } -} From a814ad2f3d4f80925871118b58989ff87ed4e37e Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 14 May 2026 13:14:48 -0400 Subject: [PATCH 03/10] remove redundant worker profile test --- .../projects/harness/tests/WorkerTests.cs | 28 ++----------------- workers/go/harness/worker_test.go | 15 ---------- .../harness/tests/test_harness_worker.py | 10 ------- workers/ruby/harness/tests/test_worker.rb | 12 +------- .../typescript/harness/tests/worker.test.ts | 24 ---------------- 5 files changed, 3 insertions(+), 86 deletions(-) diff --git a/workers/dotnet/projects/harness/tests/WorkerTests.cs b/workers/dotnet/projects/harness/tests/WorkerTests.cs index b17fd987..7ef79a33 100644 --- a/workers/dotnet/projects/harness/tests/WorkerTests.cs +++ b/workers/dotnet/projects/harness/tests/WorkerTests.cs @@ -65,7 +65,7 @@ public async Task RunAppliesResourceBasedWorkerProfile() var sharedClient = HarnessTestSupport.CreateStrictTemporalClientProbe(); TemporalWorkerOptions? capturedOptions = null; var options = WorkerHarness.ParseWorkerOptions( - WorkerHarness.CreateWorkerCommand().Parse(["--log-level", "panic"])); + WorkerHarness.CreateWorkerCommand().Parse()); await WorkerHarness.RunCoreAsync( workerFactory: (_, context) => @@ -89,7 +89,7 @@ public async Task RunIgnoresWorkerOptionFlagsWhenProfileIsSelected() TemporalWorkerOptions? capturedOptions = null; var options = WorkerHarness.ParseWorkerOptions( WorkerHarness.CreateWorkerCommand().Parse( - ["--log-level", "panic", "--max-concurrent-activities", "50"])); + ["--max-concurrent-activities", "50"])); await WorkerHarness.RunCoreAsync( workerFactory: (_, context) => @@ -107,30 +107,6 @@ await WorkerHarness.RunCoreAsync( Assert.NotEqual(50, capturedOptions.MaxConcurrentActivities); } - [Fact] - public async Task RunAppliesWorkerOptionFlagsWithoutProfile() - { - var sharedClient = HarnessTestSupport.CreateStrictTemporalClientProbe(); - TemporalWorkerOptions? capturedOptions = null; - var options = WorkerHarness.ParseWorkerOptions( - WorkerHarness.CreateWorkerCommand().Parse( - ["--log-level", "panic", "--max-concurrent-activities", "50"])); - - await WorkerHarness.RunCoreAsync( - workerFactory: (_, context) => - { - capturedOptions = context.WorkerOptions; - return context.TaskQueue; - }, - clientFactory: _ => Task.FromResult(sharedClient), - options: options, - runWorkersAsync: _ => Task.CompletedTask, - workerProfile: null); - - Assert.NotNull(capturedOptions); - Assert.Equal(50, capturedOptions!.MaxConcurrentActivities); - } - [Fact] public async Task RunRejectsUnknownWorkerProfile() { diff --git a/workers/go/harness/worker_test.go b/workers/go/harness/worker_test.go index 651de85a..7b18d2f0 100644 --- a/workers/go/harness/worker_test.go +++ b/workers/go/harness/worker_test.go @@ -102,21 +102,6 @@ func TestBuildWorkerOptionsIgnoresWorkerOptionFlagsWhenProfileIsSelected(t *test } } -func TestBuildWorkerOptionsAppliesWorkerOptionFlagsWithoutProfile(t *testing.T) { - options := newWorkerCLIOptions() - if err := options.parse([]string{"--max-concurrent-activities", "50"}); err != nil { - t.Fatal(err) - } - - workerOptions, err := buildWorkerOptions(options.flags, options.workerOptions, "") - if err != nil { - t.Fatal(err) - } - if workerOptions.MaxConcurrentActivityExecutionSize != 50 { - t.Fatalf("expected max concurrent activities 50, got %d", workerOptions.MaxConcurrentActivityExecutionSize) - } -} - func TestBuildWorkerOptionsRejectsUnknownProfile(t *testing.T) { options := newWorkerCLIOptions() if err := options.parse(nil); err != nil { diff --git a/workers/python/harness/tests/test_harness_worker.py b/workers/python/harness/tests/test_harness_worker.py index 4e732000..3fcf1886 100644 --- a/workers/python/harness/tests/test_harness_worker.py +++ b/workers/python/harness/tests/test_harness_worker.py @@ -55,7 +55,6 @@ async def test_run_passes_shared_client_and_context_to_each_worker_factory( "--task-queue-suffix-index-end", "2", ], - None, ) build_client_config.assert_called_once_with( @@ -114,15 +113,6 @@ def test_build_worker_kwargs_ignores_worker_option_flags_when_profile_is_selecte self.assertIsInstance(worker_kwargs["tuner"], WorkerTuner) self.assertNotIn("max_concurrent_activities", worker_kwargs) - def test_build_worker_kwargs_applies_worker_option_flags_without_profile( - self, - ) -> None: - args = worker._build_parser().parse_args(["--max-concurrent-activities", "50"]) - - worker_kwargs = worker._build_worker_kwargs(args, None) - - self.assertEqual(worker_kwargs["max_concurrent_activities"], 50) - def test_build_worker_kwargs_rejects_unknown_profile(self) -> None: args = worker._build_parser().parse_args([]) diff --git a/workers/ruby/harness/tests/test_worker.rb b/workers/ruby/harness/tests/test_worker.rb index f8d43f1c..3cc3f955 100644 --- a/workers/ruby/harness/tests/test_worker.rb +++ b/workers/ruby/harness/tests/test_worker.rb @@ -28,8 +28,7 @@ def test_run_passes_shared_client_and_context_to_each_worker_factory '--task-queue', 'omes', '--task-queue-suffix-index-start', '1', '--task-queue-suffix-index-end', '2' - ], - worker_profile: nil + ] ) end @@ -72,15 +71,6 @@ def test_build_worker_kwargs_ignores_worker_option_flags_when_profile_is_selecte refute_includes worker_kwargs, :max_concurrent_activities end - def test_build_worker_kwargs_applies_worker_option_flags_without_profile - options = Harness::WorkerCLI.default_options - options[:max_concurrent_activities] = 50 - - worker_kwargs = Harness::WorkerCLI.build_worker_kwargs(options, nil) - - assert_equal 50, worker_kwargs[:max_concurrent_activities] - end - def test_build_worker_kwargs_rejects_unknown_worker_profile error = assert_raises(ArgumentError) do Harness::WorkerCLI.build_worker_kwargs(Harness::WorkerCLI.default_options, 'nope') diff --git a/workers/typescript/harness/tests/worker.test.ts b/workers/typescript/harness/tests/worker.test.ts index ba4db8e0..119eee89 100644 --- a/workers/typescript/harness/tests/worker.test.ts +++ b/workers/typescript/harness/tests/worker.test.ts @@ -171,30 +171,6 @@ void test('runWorker ignores worker option flags when profile is selected', asyn }); }); -void test('runWorker applies worker option flags when profile is not selected', async () => { - const client = makeClient(); - const workerFactoryCalls: WorkerFactoryCall[] = []; - const workerFactory: WorkerFactory = async (receivedClient, context) => { - workerFactoryCalls.push({ - client: receivedClient, - context, - }); - return makeTestWorker(async () => undefined) as unknown as Worker; - }; - const clientFactory = async (_config: ClientConfig): Promise => client; - - await runWorker( - workerFactory, - clientFactory, - neverInterrupt(), - ['--max-concurrent-activities', '50'], - ); - - assert.deepEqual(workerFactoryCalls[0]?.context.workerOptions, { - maxConcurrentActivityTaskExecutions: 50, - }); -}); - void test('runWorker rejects unknown worker profile', async () => { const client = makeClient(); const workerFactory: WorkerFactory = async () => From 7e301406b349503efd2750120012fe5aa6cf57fd Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 15 May 2026 09:04:23 -0400 Subject: [PATCH 04/10] Java worker profiles, clear ambient env vars, small change to .NET worker profile --- .gitignore | 1 + workers/dotnet/projects/harness/Profiles.cs | 9 +-- workers/dotnet/projects/harness/Worker.cs | 2 +- .../temporal/omes/harness/WorkerHarness.java | 10 ++- .../temporal/omes/harness/WorkerProfiles.java | 40 +++++++++++ .../omes/harness/WorkerHarnessTest.java | 34 ++++++++++ workers/run.go | 19 +++++- workers/run_test.go | 67 +++++++++++++++++++ 8 files changed, 173 insertions(+), 9 deletions(-) create mode 100644 workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java create mode 100644 workers/run_test.go diff --git a/.gitignore b/.gitignore index c49951c1..2eaaf44a 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ omes.sln /last_fuzz_run.proto workers/dotnet/Temporalio.Omes.temp.csproj +workers/java/harness/bin/ workers/python/**/__pycache__/ workers/typescript/harness/dist/ workers/typescript/harness/dist-test/ diff --git a/workers/dotnet/projects/harness/Profiles.cs b/workers/dotnet/projects/harness/Profiles.cs index a2a5e1d6..e00ee20f 100644 --- a/workers/dotnet/projects/harness/Profiles.cs +++ b/workers/dotnet/projects/harness/Profiles.cs @@ -1,5 +1,6 @@ using Temporalio.Worker; using Temporalio.Worker.Tuning; +using WorkerProfile = Temporalio.Worker.TemporalWorkerOptions; namespace Temporalio.Omes.Projects.Harness; @@ -8,8 +9,8 @@ internal static class WorkerProfiles public const string EnvVarName = "OMES_WORKER_PROFILE"; public const string ResourceBasedDefaultProfile = "resource-based-default"; - private static readonly IReadOnlyDictionary Profiles = - new Dictionary + private static readonly IReadOnlyDictionary Profiles = + new Dictionary { [ResourceBasedDefaultProfile] = new TemporalWorkerOptions { @@ -19,13 +20,13 @@ internal static class WorkerProfiles }, }; - public static TemporalWorkerOptions Lookup(string name) + public static WorkerProfile LookupWorkerProfile(string name) { if (!Profiles.TryGetValue(name, out var profile)) { throw new ArgumentException($"Unknown worker profile \"{name}\""); } - return (TemporalWorkerOptions)profile.Clone(); + return (WorkerProfile)profile.Clone(); } } diff --git a/workers/dotnet/projects/harness/Worker.cs b/workers/dotnet/projects/harness/Worker.cs index 76bcc075..d08f11fa 100644 --- a/workers/dotnet/projects/harness/Worker.cs +++ b/workers/dotnet/projects/harness/Worker.cs @@ -320,7 +320,7 @@ private static TemporalWorkerOptions BuildWorkerOptions(string taskQueue, Worker { if (!string.IsNullOrEmpty(settings.Profile)) { - var profileOptions = WorkerProfiles.Lookup(settings.Profile); + var profileOptions = WorkerProfiles.LookupWorkerProfile(settings.Profile); profileOptions.TaskQueue = taskQueue; return profileOptions; } diff --git a/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerHarness.java b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerHarness.java index 67a948f1..1c6f4b03 100644 --- a/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerHarness.java +++ b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerHarness.java @@ -77,7 +77,7 @@ static void runWorkerCli( buildTaskQueues( logger, args.taskQueue, args.taskQueueSuffixIndexStart, args.taskQueueSuffixIndexEnd), args.errOnUnimplemented, - buildWorkerOptions(args)); + buildWorkerOptions(args, System.getenv(WorkerProfiles.WORKER_PROFILE_ENV_VAR))); Runtime.getRuntime().addShutdownHook(shutdownHook); shutdownHookAdded = true; @@ -134,6 +134,14 @@ static List buildTaskQueues( } static WorkerOptions buildWorkerOptions(Arguments args) { + return buildWorkerOptions(args, null); + } + + static WorkerOptions buildWorkerOptions(Arguments args, String profileName) { + if (profileName != null && !profileName.isEmpty()) { + return WorkerProfiles.lookupWorkerProfile(profileName); + } + WorkerOptions.Builder workerOptions = WorkerOptions.newBuilder(); if (args.workflowPollerAutoscaleMax != null) { workerOptions.setWorkflowTaskPollersBehavior( diff --git a/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java new file mode 100644 index 00000000..d61688ad --- /dev/null +++ b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java @@ -0,0 +1,40 @@ +package io.temporal.omes.harness; + +import io.temporal.worker.WorkerOptions; +import io.temporal.worker.tuning.ResourceBasedControllerOptions; +import io.temporal.worker.tuning.ResourceBasedTuner; +import java.util.HashMap; +import java.util.Map; + +final class WorkerProfiles { + static final String WORKER_PROFILE_ENV_VAR = "OMES_WORKER_PROFILE"; + static final String RESOURCE_BASED_DEFAULT_PROFILE = "resource-based-default"; + + private static final Map PROFILES = new HashMap<>(); + + static { + register( + RESOURCE_BASED_DEFAULT_PROFILE, + WorkerOptions.newBuilder() + .setWorkerTuner( + ResourceBasedTuner.newBuilder() + .setControllerOptions( + ResourceBasedControllerOptions.newBuilder(0.8, 0.8).build()) + .build()) + .build()); + } + + private WorkerProfiles() {} + + static WorkerOptions lookupWorkerProfile(String name) { + WorkerOptions profile = PROFILES.get(name); + if (profile == null) { + throw new IllegalArgumentException(String.format("Unknown worker profile \"%s\"", name)); + } + return profile; + } + + private static void register(String name, WorkerOptions profile) { + PROFILES.put(name, profile); + } +} diff --git a/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java b/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java index b9d03e67..589bc021 100644 --- a/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java +++ b/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java @@ -1,6 +1,8 @@ package io.temporal.omes.harness; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -10,6 +12,7 @@ import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkerOptions; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -21,6 +24,37 @@ import org.junit.jupiter.api.Test; class WorkerHarnessTest { + @Test + void buildWorkerOptionsAppliesProfile() { + WorkerOptions workerOptions = + WorkerHarness.buildWorkerOptions( + WorkerHarness.parseArguments(), WorkerProfiles.RESOURCE_BASED_DEFAULT_PROFILE); + + assertNotNull(workerOptions.getWorkerTuner()); + } + + @Test + void buildWorkerOptionsIgnoresWorkerOptionFlagsWhenProfileIsSelected() { + WorkerHarness.Arguments args = + WorkerHarness.parseArguments("--max-concurrent-activities", "50"); + + WorkerOptions workerOptions = + WorkerHarness.buildWorkerOptions(args, WorkerProfiles.RESOURCE_BASED_DEFAULT_PROFILE); + + assertNotNull(workerOptions.getWorkerTuner()); + assertNotEquals(50, workerOptions.getMaxConcurrentActivityExecutionSize()); + } + + @Test + void buildWorkerOptionsRejectsUnknownProfile() { + IllegalArgumentException error = + assertThrows( + IllegalArgumentException.class, + () -> WorkerHarness.buildWorkerOptions(WorkerHarness.parseArguments(), "nope")); + + assertEquals("Unknown worker profile \"nope\"", error.getMessage()); + } + @Test void runWorkerFactoryShutsDownAllWorkersWhenStartFails() { LifecyclePlugin lifecycle = new LifecyclePlugin("omes-1", 2); diff --git a/workers/run.go b/workers/run.go index c67a342e..4ed666ab 100644 --- a/workers/run.go +++ b/workers/run.go @@ -164,9 +164,7 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { if err != nil { return fmt.Errorf("failed creating command: %w", err) } - if r.WorkerProfile != "" { - cmd.Env = append(cmd.Environ(), WorkerProfileEnvVar+"="+r.WorkerProfile) - } + cmd.Env = withEnv(cmd.Environ(), WorkerProfileEnvVar, r.WorkerProfile) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} // set process group ID for shutdown // Direct logging output to provided logger, if available. @@ -238,6 +236,21 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { } } +func withEnv(environ []string, name string, value string) []string { + prefix := name + "=" + nextEnv := make([]string, 0, len(environ)+1) + for _, item := range environ { + if strings.HasPrefix(item, prefix) { + continue + } + nextEnv = append(nextEnv, item) + } + if value != "" { + nextEnv = append(nextEnv, prefix+value) + } + return nextEnv +} + func passthrough(fs *pflag.FlagSet, prefix string) (flags []string) { fs.VisitAll(func(f *pflag.Flag) { if !f.Changed { diff --git a/workers/run_test.go b/workers/run_test.go new file mode 100644 index 00000000..6dca07aa --- /dev/null +++ b/workers/run_test.go @@ -0,0 +1,67 @@ +package workers + +import ( + "slices" + "testing" +) + +func TestWithEnvClearsInheritedEnvVar(t *testing.T) { + const name = "TEST_ENV_VAR" + + environ := []string{ + "PATH=/bin", + name + "=ambient", + "HOME=/tmp", + } + + got := withEnv(environ, name, "") + + want := []string{ + "PATH=/bin", + "HOME=/tmp", + } + if !slices.Equal(got, want) { + t.Fatalf("expected %v, got %v", want, got) + } +} + +func TestWithEnvSetsExplicitEnvVar(t *testing.T) { + const name = "TEST_ENV_VAR" + + environ := []string{ + "PATH=/bin", + "HOME=/tmp", + } + + got := withEnv(environ, name, "explicit") + + want := []string{ + "PATH=/bin", + "HOME=/tmp", + name + "=explicit", + } + if !slices.Equal(got, want) { + t.Fatalf("expected %v, got %v", want, got) + } +} + +func TestWithEnvExplicitEnvVarOverridesInheritedEnvVar(t *testing.T) { + const name = "TEST_ENV_VAR" + + environ := []string{ + "PATH=/bin", + name + "=ambient", + "HOME=/tmp", + } + + got := withEnv(environ, name, "explicit") + + want := []string{ + "PATH=/bin", + "HOME=/tmp", + name + "=explicit", + } + if !slices.Equal(got, want) { + t.Fatalf("expected %v, got %v", want, got) + } +} From ff6e0d94ab4585c356a2e5e7e072788b17f8eab4 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 15 May 2026 10:43:56 -0400 Subject: [PATCH 05/10] move env var helper to envvars.go --- workers/env.go | 17 +++++++++++++++++ workers/{run_test.go => env_test.go} | 0 workers/run.go | 15 --------------- 3 files changed, 17 insertions(+), 15 deletions(-) create mode 100644 workers/env.go rename workers/{run_test.go => env_test.go} (100%) diff --git a/workers/env.go b/workers/env.go new file mode 100644 index 00000000..69f73937 --- /dev/null +++ b/workers/env.go @@ -0,0 +1,17 @@ +package workers + +import ( + "slices" + "strings" +) + +func withEnv(environ []string, name string, value string) []string { + prefix := name + "=" + nextEnv := slices.DeleteFunc(slices.Clone(environ), func(item string) bool { + return strings.HasPrefix(item, prefix) + }) + if value != "" { + nextEnv = append(nextEnv, prefix+value) + } + return nextEnv +} diff --git a/workers/run_test.go b/workers/env_test.go similarity index 100% rename from workers/run_test.go rename to workers/env_test.go diff --git a/workers/run.go b/workers/run.go index 4ed666ab..58f9441d 100644 --- a/workers/run.go +++ b/workers/run.go @@ -236,21 +236,6 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { } } -func withEnv(environ []string, name string, value string) []string { - prefix := name + "=" - nextEnv := make([]string, 0, len(environ)+1) - for _, item := range environ { - if strings.HasPrefix(item, prefix) { - continue - } - nextEnv = append(nextEnv, item) - } - if value != "" { - nextEnv = append(nextEnv, prefix+value) - } - return nextEnv -} - func passthrough(fs *pflag.FlagSet, prefix string) (flags []string) { fs.VisitAll(func(f *pflag.Flag) { if !f.Changed { From 7f317e30cd238aaafb79d41342e3ed96c6937064 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Tue, 19 May 2026 10:56:40 -0400 Subject: [PATCH 06/10] fix ruby lint --- workers/ruby/harness/lib/harness/profiles.rb | 20 ++++++++++---------- workers/ruby/harness/sig/harness.rbs | 15 +++++++++++++++ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/workers/ruby/harness/lib/harness/profiles.rb b/workers/ruby/harness/lib/harness/profiles.rb index cc6e92a6..3a029eb7 100644 --- a/workers/ruby/harness/lib/harness/profiles.rb +++ b/workers/ruby/harness/lib/harness/profiles.rb @@ -9,19 +9,19 @@ module Profiles @registry = {} - module_function + class << self + def register(name, profile) + @registry[name] = profile + end - def register(name, profile) - @registry[name] = profile + def lookup(name) + @registry.fetch(name).dup + rescue KeyError + raise ArgumentError, "Unknown worker profile #{name.inspect}" + end end - def lookup(name) - @registry.fetch(name).dup - rescue KeyError - raise ArgumentError, "Unknown worker profile #{name.inspect}" - end - - register( + self.register( RESOURCE_BASED_DEFAULT_PROFILE, { tuner: Temporalio::Worker::Tuner.create_resource_based( diff --git a/workers/ruby/harness/sig/harness.rbs b/workers/ruby/harness/sig/harness.rbs index 01d5cc91..260bbe22 100644 --- a/workers/ruby/harness/sig/harness.rbs +++ b/workers/ruby/harness/sig/harness.rbs @@ -67,6 +67,13 @@ module Temporalio end class Worker + class Tuner + def self.create_resource_based: ( + target_memory_usage: Float, + target_cpu_usage: Float + ) -> untyped + end + class PollerBehavior class Autoscaling attr_reader maximum: Integer @@ -297,6 +304,14 @@ module Harness def self.configure_logger: (String log_level, String log_encoding) -> Logger end + module Profiles + WORKER_PROFILE_ENV_VAR: String + RESOURCE_BASED_DEFAULT_PROFILE: String + + def self.register: (String name, Hash[Symbol, untyped] profile) -> Hash[Symbol, untyped] + def self.lookup: (String name) -> Hash[Symbol, untyped] + end + module WorkerCLI def run_cli: (worker_factory, client_factory, Array[String], ?worker_profile: String?) -> void def self.run_cli: (worker_factory, client_factory, Array[String], ?worker_profile: String?) -> void From e77cb5d9b49a674b0b4fe17687e3d2d0c2c2356c Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 27 May 2026 12:12:42 -0400 Subject: [PATCH 07/10] worker profiles - wip --- README.md | 11 ++++ workers/go/harness/profiles.go | 89 +++++++++++++++++++++++++--- workers/go/harness/worker.go | 9 ++- workers/go/harness/worker_test.go | 97 +++++++++++++++++++++++++++++++ 4 files changed, 196 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index f3fea7d3..c62f70c3 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,17 @@ takes precedence over worker configuration flags such as poller counts, autoscal slot counts, and worker versioning options. Non-worker-configuration flags, such as task queue selection, client connection settings, logging, metrics, and `--worker-err-on-unimplemented`, continue to apply normally. +The shared `resource-based-default` profile uses the SDK resource-based worker tuner. The Go harness also provides +fixed ECS tuning candidates for 2 CPU / 4 GB workers that set direct worker options for workflow slots, activity slots, +workflow pollers, activity pollers, and sticky workflow cache size: + +- `ecs-2cpu-4gb-fixed-small` +- `ecs-2cpu-4gb-fixed-medium` +- `ecs-2cpu-4gb-fixed-medium-plus` +- `ecs-2cpu-4gb-fixed-large` +- `ecs-2cpu-4gb-fixed-large-plus` +- `ecs-2cpu-4gb-fixed-xlarge` + ```sh go run ./cmd run-scenario-with-worker --scenario throughput_stress --language go \ --run-id local-profile-test --worker-profile resource-based-default diff --git a/workers/go/harness/profiles.go b/workers/go/harness/profiles.go index 5fcfc054..b9c70ed3 100644 --- a/workers/go/harness/profiles.go +++ b/workers/go/harness/profiles.go @@ -12,30 +12,101 @@ const ( resourceBasedDefaultWorkerProfile = "resource-based-default" ) -var workerProfiles = map[string]sdkworker.Options{} +type workerProfile struct { + Options sdkworker.Options + StickyWorkflowCacheSize int +} -func registerWorkerProfile(name string, profile sdkworker.Options) { +func registerWorkerProfile(name string, profile workerProfile) { workerProfiles[name] = profile } -func lookupWorkerProfile(name string) (sdkworker.Options, error) { +func lookupWorkerProfile(name string) (workerProfile, error) { profile, ok := workerProfiles[name] if !ok { - return sdkworker.Options{}, fmt.Errorf("unknown worker profile %q", name) + return workerProfile{}, fmt.Errorf("unknown worker profile %q", name) } return profile, nil } +var workerProfiles = map[string]workerProfile{} + func init() { + registerWorkerProfile(resourceBasedDefaultWorkerProfile, workerProfile{ + Options: sdkworker.Options{ + Tuner: mustResourceBasedTuner(0.8, 0.8), + }, + }) + registerWorkerProfile("ecs-2cpu-4gb-fixed-small", workerProfile{ + StickyWorkflowCacheSize: 250, + Options: sdkworker.Options{ + MaxConcurrentWorkflowTaskExecutionSize: 64, + MaxConcurrentActivityExecutionSize: 256, + MaxConcurrentLocalActivityExecutionSize: 256, + MaxConcurrentWorkflowTaskPollers: 12, + MaxConcurrentActivityTaskPollers: 24, + }, + }) + registerWorkerProfile("ecs-2cpu-4gb-fixed-medium", workerProfile{ + StickyWorkflowCacheSize: 500, + Options: sdkworker.Options{ + MaxConcurrentWorkflowTaskExecutionSize: 96, + MaxConcurrentActivityExecutionSize: 384, + MaxConcurrentLocalActivityExecutionSize: 384, + MaxConcurrentWorkflowTaskPollers: 20, + MaxConcurrentActivityTaskPollers: 40, + }, + }) + registerWorkerProfile("ecs-2cpu-4gb-fixed-medium-plus", workerProfile{ + StickyWorkflowCacheSize: 750, + Options: sdkworker.Options{ + MaxConcurrentWorkflowTaskExecutionSize: 144, + MaxConcurrentActivityExecutionSize: 576, + MaxConcurrentLocalActivityExecutionSize: 576, + MaxConcurrentWorkflowTaskPollers: 30, + MaxConcurrentActivityTaskPollers: 60, + }, + }) + registerWorkerProfile("ecs-2cpu-4gb-fixed-large", workerProfile{ + StickyWorkflowCacheSize: 1000, + Options: sdkworker.Options{ + MaxConcurrentWorkflowTaskExecutionSize: 192, + MaxConcurrentActivityExecutionSize: 768, + MaxConcurrentLocalActivityExecutionSize: 768, + MaxConcurrentWorkflowTaskPollers: 40, + MaxConcurrentActivityTaskPollers: 80, + }, + }) + registerWorkerProfile("ecs-2cpu-4gb-fixed-large-plus", workerProfile{ + StickyWorkflowCacheSize: 1250, + Options: sdkworker.Options{ + MaxConcurrentWorkflowTaskExecutionSize: 256, + MaxConcurrentActivityExecutionSize: 1024, + MaxConcurrentLocalActivityExecutionSize: 1024, + MaxConcurrentWorkflowTaskPollers: 50, + MaxConcurrentActivityTaskPollers: 100, + }, + }) + registerWorkerProfile("ecs-2cpu-4gb-fixed-xlarge", workerProfile{ + StickyWorkflowCacheSize: 1500, + Options: sdkworker.Options{ + MaxConcurrentWorkflowTaskExecutionSize: 320, + MaxConcurrentActivityExecutionSize: 1280, + MaxConcurrentLocalActivityExecutionSize: 1280, + MaxConcurrentWorkflowTaskPollers: 60, + MaxConcurrentActivityTaskPollers: 120, + }, + }) +} + +func mustResourceBasedTuner(targetMem, targetCpu float64) sdkworker.WorkerTuner { tuner, err := sdkworker.NewResourceBasedTuner(sdkworker.ResourceBasedTunerOptions{ - TargetMem: 0.8, - TargetCpu: 0.8, + TargetMem: targetMem, + TargetCpu: targetCpu, InfoSupplier: sysinfo.SysInfoProvider(), }) if err != nil { panic(err) } - registerWorkerProfile(resourceBasedDefaultWorkerProfile, sdkworker.Options{ - Tuner: tuner, - }) + return tuner } diff --git a/workers/go/harness/worker.go b/workers/go/harness/worker.go index cbc57241..7ecdc3b8 100644 --- a/workers/go/harness/worker.go +++ b/workers/go/harness/worker.go @@ -130,7 +130,14 @@ func parseVersioningBehavior(value string) (workflow.VersioningBehavior, error) func buildWorkerOptions(flags *pflag.FlagSet, args clioptions.WorkerOptions, profileName string) (sdkworker.Options, error) { if profileName != "" { - return lookupWorkerProfile(profileName) + profile, err := lookupWorkerProfile(profileName) + if err != nil { + return sdkworker.Options{}, err + } + if profile.StickyWorkflowCacheSize > 0 { + sdkworker.SetStickyWorkflowCacheSize(profile.StickyWorkflowCacheSize) + } + return profile.Options, nil } options := sdkworker.Options{} diff --git a/workers/go/harness/worker_test.go b/workers/go/harness/worker_test.go index 7b18d2f0..ac63a6ae 100644 --- a/workers/go/harness/worker_test.go +++ b/workers/go/harness/worker_test.go @@ -84,6 +84,103 @@ func TestBuildWorkerOptionsAppliesResourceBasedProfile(t *testing.T) { } } +func TestLookupWorkerProfileIncludesFixedECSWorkerProfiles(t *testing.T) { + tests := []struct { + name string + profile string + stickyWorkflowCacheSize int + workflowSlots int + activitySlots int + workflowPollers int + activityPollers int + }{ + { + name: "small", + profile: "ecs-2cpu-4gb-fixed-small", + stickyWorkflowCacheSize: 250, + workflowSlots: 64, + activitySlots: 256, + workflowPollers: 12, + activityPollers: 24, + }, + { + name: "medium", + profile: "ecs-2cpu-4gb-fixed-medium", + stickyWorkflowCacheSize: 500, + workflowSlots: 96, + activitySlots: 384, + workflowPollers: 20, + activityPollers: 40, + }, + { + name: "medium plus", + profile: "ecs-2cpu-4gb-fixed-medium-plus", + stickyWorkflowCacheSize: 750, + workflowSlots: 144, + activitySlots: 576, + workflowPollers: 30, + activityPollers: 60, + }, + { + name: "large", + profile: "ecs-2cpu-4gb-fixed-large", + stickyWorkflowCacheSize: 1000, + workflowSlots: 192, + activitySlots: 768, + workflowPollers: 40, + activityPollers: 80, + }, + { + name: "large plus", + profile: "ecs-2cpu-4gb-fixed-large-plus", + stickyWorkflowCacheSize: 1250, + workflowSlots: 256, + activitySlots: 1024, + workflowPollers: 50, + activityPollers: 100, + }, + { + name: "xlarge", + profile: "ecs-2cpu-4gb-fixed-xlarge", + stickyWorkflowCacheSize: 1500, + workflowSlots: 320, + activitySlots: 1280, + workflowPollers: 60, + activityPollers: 120, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + profile, err := lookupWorkerProfile(test.profile) + if err != nil { + t.Fatal(err) + } + if profile.Options.Tuner != nil { + t.Fatal("expected fixed worker profile to use direct worker options, got tuner") + } + if profile.StickyWorkflowCacheSize != test.stickyWorkflowCacheSize { + t.Fatalf("expected sticky workflow cache size %d, got %d", test.stickyWorkflowCacheSize, profile.StickyWorkflowCacheSize) + } + if profile.Options.MaxConcurrentWorkflowTaskExecutionSize != test.workflowSlots { + t.Fatalf("expected workflow slots %d, got %d", test.workflowSlots, profile.Options.MaxConcurrentWorkflowTaskExecutionSize) + } + if profile.Options.MaxConcurrentActivityExecutionSize != test.activitySlots { + t.Fatalf("expected activity slots %d, got %d", test.activitySlots, profile.Options.MaxConcurrentActivityExecutionSize) + } + if profile.Options.MaxConcurrentLocalActivityExecutionSize != test.activitySlots { + t.Fatalf("expected local activity slots %d, got %d", test.activitySlots, profile.Options.MaxConcurrentLocalActivityExecutionSize) + } + if profile.Options.MaxConcurrentWorkflowTaskPollers != test.workflowPollers { + t.Fatalf("expected workflow pollers %d, got %d", test.workflowPollers, profile.Options.MaxConcurrentWorkflowTaskPollers) + } + if profile.Options.MaxConcurrentActivityTaskPollers != test.activityPollers { + t.Fatalf("expected activity pollers %d, got %d", test.activityPollers, profile.Options.MaxConcurrentActivityTaskPollers) + } + }) + } +} + func TestBuildWorkerOptionsIgnoresWorkerOptionFlagsWhenProfileIsSelected(t *testing.T) { options := newWorkerCLIOptions() if err := options.parse([]string{"--max-concurrent-activities", "50"}); err != nil { From 80b330883acdf91ef8c88f9fca4064afacb8a03b Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 29 May 2026 12:50:04 -0400 Subject: [PATCH 08/10] add equivalent ecs profiles for other workers (i.e. non-Go) --- README.md | 6 +- workers/dotnet/projects/harness/Profiles.cs | 52 ++++++++++++++ .../projects/harness/tests/WorkerTests.cs | 35 +++++++++ .../temporal/omes/harness/WorkerHarness.java | 18 ++++- .../temporal/omes/harness/WorkerProfiles.java | 54 +++++++++++++- .../omes/harness/WorkerHarnessTest.java | 53 ++++++++++++++ .../python/harness/src/harness/profiles.py | 32 +++++++++ .../harness/tests/test_harness_worker.py | 32 +++++++++ workers/ruby/harness/lib/harness/profiles.rb | 72 +++++++++++++++++++ workers/ruby/harness/sig/harness.rbs | 16 +++++ workers/ruby/harness/tests/test_worker.rb | 26 +++++++ workers/typescript/harness/profiles.ts | 27 +++++++ .../typescript/harness/tests/worker.test.ts | 31 +++++++- 13 files changed, 445 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index c62f70c3..570c8f94 100644 --- a/README.md +++ b/README.md @@ -153,9 +153,9 @@ takes precedence over worker configuration flags such as poller counts, autoscal slot counts, and worker versioning options. Non-worker-configuration flags, such as task queue selection, client connection settings, logging, metrics, and `--worker-err-on-unimplemented`, continue to apply normally. -The shared `resource-based-default` profile uses the SDK resource-based worker tuner. The Go harness also provides -fixed ECS tuning candidates for 2 CPU / 4 GB workers that set direct worker options for workflow slots, activity slots, -workflow pollers, activity pollers, and sticky workflow cache size: +The shared `resource-based-default` profile uses the SDK resource-based worker tuner. The worker harnesses also provide +fixed ECS tuning candidates for 2 CPU / 4 GB workers that set equivalent fixed slot, poller, and workflow-cache +settings in each SDK: - `ecs-2cpu-4gb-fixed-small` - `ecs-2cpu-4gb-fixed-medium` diff --git a/workers/dotnet/projects/harness/Profiles.cs b/workers/dotnet/projects/harness/Profiles.cs index e00ee20f..fca1d767 100644 --- a/workers/dotnet/projects/harness/Profiles.cs +++ b/workers/dotnet/projects/harness/Profiles.cs @@ -18,6 +18,42 @@ internal static class WorkerProfiles targetMemoryUsage: 0.8, targetCpuUsage: 0.8), }, + ["ecs-2cpu-4gb-fixed-small"] = CreateFixedEcsProfile( + maxCachedWorkflows: 250, + workflowSlots: 64, + activitySlots: 256, + workflowPollers: 12, + activityPollers: 24), + ["ecs-2cpu-4gb-fixed-medium"] = CreateFixedEcsProfile( + maxCachedWorkflows: 500, + workflowSlots: 96, + activitySlots: 384, + workflowPollers: 20, + activityPollers: 40), + ["ecs-2cpu-4gb-fixed-medium-plus"] = CreateFixedEcsProfile( + maxCachedWorkflows: 750, + workflowSlots: 144, + activitySlots: 576, + workflowPollers: 30, + activityPollers: 60), + ["ecs-2cpu-4gb-fixed-large"] = CreateFixedEcsProfile( + maxCachedWorkflows: 1000, + workflowSlots: 192, + activitySlots: 768, + workflowPollers: 40, + activityPollers: 80), + ["ecs-2cpu-4gb-fixed-large-plus"] = CreateFixedEcsProfile( + maxCachedWorkflows: 1250, + workflowSlots: 256, + activitySlots: 1024, + workflowPollers: 50, + activityPollers: 100), + ["ecs-2cpu-4gb-fixed-xlarge"] = CreateFixedEcsProfile( + maxCachedWorkflows: 1500, + workflowSlots: 320, + activitySlots: 1280, + workflowPollers: 60, + activityPollers: 120), }; public static WorkerProfile LookupWorkerProfile(string name) @@ -29,4 +65,20 @@ public static WorkerProfile LookupWorkerProfile(string name) return (WorkerProfile)profile.Clone(); } + + private static WorkerProfile CreateFixedEcsProfile( + int maxCachedWorkflows, + int workflowSlots, + int activitySlots, + int workflowPollers, + int activityPollers) => + new TemporalWorkerOptions + { + MaxCachedWorkflows = maxCachedWorkflows, + MaxConcurrentWorkflowTasks = workflowSlots, + MaxConcurrentActivities = activitySlots, + MaxConcurrentLocalActivities = activitySlots, + MaxConcurrentWorkflowTaskPolls = workflowPollers, + MaxConcurrentActivityTaskPolls = activityPollers, + }; } diff --git a/workers/dotnet/projects/harness/tests/WorkerTests.cs b/workers/dotnet/projects/harness/tests/WorkerTests.cs index 7ef79a33..57c1d2ac 100644 --- a/workers/dotnet/projects/harness/tests/WorkerTests.cs +++ b/workers/dotnet/projects/harness/tests/WorkerTests.cs @@ -107,6 +107,33 @@ await WorkerHarness.RunCoreAsync( Assert.NotEqual(50, capturedOptions.MaxConcurrentActivities); } + [Fact] + public void LookupWorkerProfileIncludesFixedEcsWorkerProfiles() + { + var profiles = new[] + { + new FixedEcsProfile("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120), + }; + + foreach (var expected in profiles) + { + var profile = WorkerProfiles.LookupWorkerProfile(expected.Name); + + Assert.Null(profile.Tuner); + Assert.Equal(expected.MaxCachedWorkflows, profile.MaxCachedWorkflows); + Assert.Equal(expected.WorkflowSlots, profile.MaxConcurrentWorkflowTasks); + Assert.Equal(expected.ActivitySlots, profile.MaxConcurrentActivities); + Assert.Equal(expected.ActivitySlots, profile.MaxConcurrentLocalActivities); + Assert.Equal(expected.WorkflowPollers, profile.MaxConcurrentWorkflowTaskPolls); + Assert.Equal(expected.ActivityPollers, profile.MaxConcurrentActivityTaskPolls); + } + } + [Fact] public async Task RunRejectsUnknownWorkerProfile() { @@ -217,3 +244,11 @@ public async Task RunAsync(CancellationToken cancellationToken) public void Dispose() => DisposeCalls++; } + +file sealed record FixedEcsProfile( + string Name, + int MaxCachedWorkflows, + int WorkflowSlots, + int ActivitySlots, + int WorkflowPollers, + int ActivityPollers); diff --git a/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerHarness.java b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerHarness.java index 1c6f4b03..7641b6b1 100644 --- a/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerHarness.java +++ b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerHarness.java @@ -56,7 +56,8 @@ static void runWorkerCli( args.promHandlerPath)); WorkerFactory workerFactory = WorkerFactory.newInstance( - client, WorkerFactoryOptions.newBuilder().setMaxWorkflowThreadCount(1000).build()); + client, + buildWorkerFactoryOptions(System.getenv(WorkerProfiles.WORKER_PROFILE_ENV_VAR))); AtomicBoolean shutdown = new AtomicBoolean(false); CountDownLatch stopSignal = new CountDownLatch(1); Thread shutdownHook = @@ -139,7 +140,7 @@ static WorkerOptions buildWorkerOptions(Arguments args) { static WorkerOptions buildWorkerOptions(Arguments args, String profileName) { if (profileName != null && !profileName.isEmpty()) { - return WorkerProfiles.lookupWorkerProfile(profileName); + return WorkerProfiles.lookupWorkerProfile(profileName).workerOptions(); } WorkerOptions.Builder workerOptions = WorkerOptions.newBuilder(); @@ -169,6 +170,19 @@ static WorkerOptions buildWorkerOptions(Arguments args, String profileName) { return workerOptions.build(); } + static WorkerFactoryOptions buildWorkerFactoryOptions(String profileName) { + WorkerFactoryOptions.Builder factoryOptions = + WorkerFactoryOptions.newBuilder().setMaxWorkflowThreadCount(1000); + if (profileName != null && !profileName.isEmpty()) { + Integer workflowCacheSize = + WorkerProfiles.lookupWorkerProfile(profileName).workflowCacheSize(); + if (workflowCacheSize != null) { + factoryOptions.setWorkflowCacheSize(workflowCacheSize); + } + } + return factoryOptions.build(); + } + static void runWorkerFactory( WorkerFactory workerFactory, WorkflowClient client, CountDownLatch stopSignal) throws InterruptedException { diff --git a/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java index d61688ad..55e437c9 100644 --- a/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java +++ b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java @@ -10,7 +10,25 @@ final class WorkerProfiles { static final String WORKER_PROFILE_ENV_VAR = "OMES_WORKER_PROFILE"; static final String RESOURCE_BASED_DEFAULT_PROFILE = "resource-based-default"; - private static final Map PROFILES = new HashMap<>(); + static final class WorkerProfile { + private final WorkerOptions workerOptions; + private final Integer workflowCacheSize; + + private WorkerProfile(WorkerOptions workerOptions, Integer workflowCacheSize) { + this.workerOptions = workerOptions; + this.workflowCacheSize = workflowCacheSize; + } + + WorkerOptions workerOptions() { + return workerOptions; + } + + Integer workflowCacheSize() { + return workflowCacheSize; + } + } + + private static final Map PROFILES = new HashMap<>(); static { register( @@ -22,12 +40,18 @@ final class WorkerProfiles { ResourceBasedControllerOptions.newBuilder(0.8, 0.8).build()) .build()) .build()); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120); } private WorkerProfiles() {} - static WorkerOptions lookupWorkerProfile(String name) { - WorkerOptions profile = PROFILES.get(name); + static WorkerProfile lookupWorkerProfile(String name) { + WorkerProfile profile = PROFILES.get(name); if (profile == null) { throw new IllegalArgumentException(String.format("Unknown worker profile \"%s\"", name)); } @@ -35,6 +59,30 @@ static WorkerOptions lookupWorkerProfile(String name) { } private static void register(String name, WorkerOptions profile) { + PROFILES.put(name, new WorkerProfile(profile, null)); + } + + private static void register(String name, WorkerProfile profile) { PROFILES.put(name, profile); } + + private static void registerFixedEcsProfile( + String name, + int workflowCacheSize, + int workflowSlots, + int activitySlots, + int workflowPollers, + int activityPollers) { + register( + name, + new WorkerProfile( + WorkerOptions.newBuilder() + .setMaxConcurrentWorkflowTaskExecutionSize(workflowSlots) + .setMaxConcurrentActivityExecutionSize(activitySlots) + .setMaxConcurrentLocalActivityExecutionSize(activitySlots) + .setMaxConcurrentWorkflowTaskPollers(workflowPollers) + .setMaxConcurrentActivityTaskPollers(activityPollers) + .build(), + workflowCacheSize)); + } } diff --git a/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java b/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java index 589bc021..bc699f2d 100644 --- a/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java +++ b/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -45,6 +46,34 @@ void buildWorkerOptionsIgnoresWorkerOptionFlagsWhenProfileIsSelected() { assertNotEquals(50, workerOptions.getMaxConcurrentActivityExecutionSize()); } + @Test + void buildWorkerOptionsIncludesFixedEcsWorkerProfiles() { + List profiles = + List.of( + new FixedEcsProfile("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120)); + + for (FixedEcsProfile expected : profiles) { + WorkerOptions workerOptions = + WorkerHarness.buildWorkerOptions(WorkerHarness.parseArguments(), expected.name); + WorkerFactoryOptions factoryOptions = WorkerHarness.buildWorkerFactoryOptions(expected.name); + + assertNull(workerOptions.getWorkerTuner()); + assertEquals( + expected.workflowSlots, workerOptions.getMaxConcurrentWorkflowTaskExecutionSize()); + assertEquals(expected.activitySlots, workerOptions.getMaxConcurrentActivityExecutionSize()); + assertEquals( + expected.activitySlots, workerOptions.getMaxConcurrentLocalActivityExecutionSize()); + assertEquals(expected.workflowPollers, workerOptions.getMaxConcurrentWorkflowTaskPollers()); + assertEquals(expected.activityPollers, workerOptions.getMaxConcurrentActivityTaskPollers()); + assertEquals(expected.workflowCacheSize, factoryOptions.getWorkflowCacheSize()); + } + } + @Test void buildWorkerOptionsRejectsUnknownProfile() { IllegalArgumentException error = @@ -166,4 +195,28 @@ private synchronized Set shutdownTaskQueues() { return new HashSet<>(shutdownTaskQueues); } } + + private static final class FixedEcsProfile { + private final String name; + private final int workflowCacheSize; + private final int workflowSlots; + private final int activitySlots; + private final int workflowPollers; + private final int activityPollers; + + private FixedEcsProfile( + String name, + int workflowCacheSize, + int workflowSlots, + int activitySlots, + int workflowPollers, + int activityPollers) { + this.name = name; + this.workflowCacheSize = workflowCacheSize; + this.workflowSlots = workflowSlots; + this.activitySlots = activitySlots; + this.workflowPollers = workflowPollers; + this.activityPollers = activityPollers; + } + } } diff --git a/workers/python/harness/src/harness/profiles.py b/workers/python/harness/src/harness/profiles.py index 019cb692..99e3fd3a 100644 --- a/workers/python/harness/src/harness/profiles.py +++ b/workers/python/harness/src/harness/profiles.py @@ -33,3 +33,35 @@ def lookup_profile(name: str) -> dict[str, Any]: ) }, ) + + +def _register_fixed_ecs_profile( + name: str, + max_cached_workflows: int, + workflow_slots: int, + activity_slots: int, + workflow_pollers: int, + activity_pollers: int, +) -> None: + _register_profile( + name, + { + "max_cached_workflows": max_cached_workflows, + "max_concurrent_workflow_tasks": workflow_slots, + "max_concurrent_activities": activity_slots, + "max_concurrent_local_activities": activity_slots, + "max_concurrent_workflow_task_polls": workflow_pollers, + "max_concurrent_activity_task_polls": activity_pollers, + }, + ) + + +for _profile in ( + ("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24), + ("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40), + ("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60), + ("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80), + ("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100), + ("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120), +): + _register_fixed_ecs_profile(*_profile) diff --git a/workers/python/harness/tests/test_harness_worker.py b/workers/python/harness/tests/test_harness_worker.py index 3fcf1886..14b324cf 100644 --- a/workers/python/harness/tests/test_harness_worker.py +++ b/workers/python/harness/tests/test_harness_worker.py @@ -10,6 +10,7 @@ from temporalio.worker import Worker, WorkerTuner from harness import worker +from harness.profiles import lookup_profile class FakeWorker: @@ -113,6 +114,37 @@ def test_build_worker_kwargs_ignores_worker_option_flags_when_profile_is_selecte self.assertIsInstance(worker_kwargs["tuner"], WorkerTuner) self.assertNotIn("max_concurrent_activities", worker_kwargs) + def test_lookup_profile_includes_fixed_ecs_worker_profiles(self) -> None: + profiles = ( + ("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24), + ("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40), + ("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60), + ("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80), + ("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100), + ("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120), + ) + + for ( + name, + max_cached_workflows, + workflow_slots, + activity_slots, + workflow_pollers, + activity_pollers, + ) in profiles: + with self.subTest(name=name): + self.assertEqual( + lookup_profile(name), + { + "max_cached_workflows": max_cached_workflows, + "max_concurrent_workflow_tasks": workflow_slots, + "max_concurrent_activities": activity_slots, + "max_concurrent_local_activities": activity_slots, + "max_concurrent_workflow_task_polls": workflow_pollers, + "max_concurrent_activity_task_polls": activity_pollers, + }, + ) + def test_build_worker_kwargs_rejects_unknown_profile(self) -> None: args = worker._build_parser().parse_args([]) diff --git a/workers/ruby/harness/lib/harness/profiles.rb b/workers/ruby/harness/lib/harness/profiles.rb index 3a029eb7..d8cdeda2 100644 --- a/workers/ruby/harness/lib/harness/profiles.rb +++ b/workers/ruby/harness/lib/harness/profiles.rb @@ -19,6 +19,29 @@ def lookup(name) rescue KeyError raise ArgumentError, "Unknown worker profile #{name.inspect}" end + + def register_fixed_ecs_profile( + name, + max_cached_workflows:, + workflow_slots:, + activity_slots:, + workflow_pollers:, + activity_pollers: + ) + register( + name, + { + tuner: Temporalio::Worker::Tuner.create_fixed( + workflow_slots:, + activity_slots:, + local_activity_slots: activity_slots + ), + max_cached_workflows:, + max_concurrent_workflow_task_polls: workflow_pollers, + max_concurrent_activity_task_polls: activity_pollers + } + ) + end end self.register( @@ -30,5 +53,54 @@ def lookup(name) ) } ) + + register_fixed_ecs_profile( + 'ecs-2cpu-4gb-fixed-small', + max_cached_workflows: 250, + workflow_slots: 64, + activity_slots: 256, + workflow_pollers: 12, + activity_pollers: 24 + ) + register_fixed_ecs_profile( + 'ecs-2cpu-4gb-fixed-medium', + max_cached_workflows: 500, + workflow_slots: 96, + activity_slots: 384, + workflow_pollers: 20, + activity_pollers: 40 + ) + register_fixed_ecs_profile( + 'ecs-2cpu-4gb-fixed-medium-plus', + max_cached_workflows: 750, + workflow_slots: 144, + activity_slots: 576, + workflow_pollers: 30, + activity_pollers: 60 + ) + register_fixed_ecs_profile( + 'ecs-2cpu-4gb-fixed-large', + max_cached_workflows: 1000, + workflow_slots: 192, + activity_slots: 768, + workflow_pollers: 40, + activity_pollers: 80 + ) + register_fixed_ecs_profile( + 'ecs-2cpu-4gb-fixed-large-plus', + max_cached_workflows: 1250, + workflow_slots: 256, + activity_slots: 1024, + workflow_pollers: 50, + activity_pollers: 100 + ) + register_fixed_ecs_profile( + 'ecs-2cpu-4gb-fixed-xlarge', + max_cached_workflows: 1500, + workflow_slots: 320, + activity_slots: 1280, + workflow_pollers: 60, + activity_pollers: 120 + ) end end diff --git a/workers/ruby/harness/sig/harness.rbs b/workers/ruby/harness/sig/harness.rbs index 260bbe22..67cd32da 100644 --- a/workers/ruby/harness/sig/harness.rbs +++ b/workers/ruby/harness/sig/harness.rbs @@ -68,6 +68,22 @@ module Temporalio class Worker class Tuner + class SlotSupplier + class Fixed < SlotSupplier + attr_reader slots: Integer + end + end + + attr_reader workflow_slot_supplier: SlotSupplier + attr_reader activity_slot_supplier: SlotSupplier + attr_reader local_activity_slot_supplier: SlotSupplier + + def self.create_fixed: ( + ?workflow_slots: Integer, + ?activity_slots: Integer, + ?local_activity_slots: Integer + ) -> Tuner + def self.create_resource_based: ( target_memory_usage: Float, target_cpu_usage: Float diff --git a/workers/ruby/harness/tests/test_worker.rb b/workers/ruby/harness/tests/test_worker.rb index 3cc3f955..2d362703 100644 --- a/workers/ruby/harness/tests/test_worker.rb +++ b/workers/ruby/harness/tests/test_worker.rb @@ -71,6 +71,27 @@ def test_build_worker_kwargs_ignores_worker_option_flags_when_profile_is_selecte refute_includes worker_kwargs, :max_concurrent_activities end + def test_lookup_profile_includes_fixed_ecs_worker_profiles + [ + ['ecs-2cpu-4gb-fixed-small', 250, 64, 256, 12, 24], + ['ecs-2cpu-4gb-fixed-medium', 500, 96, 384, 20, 40], + ['ecs-2cpu-4gb-fixed-medium-plus', 750, 144, 576, 30, 60], + ['ecs-2cpu-4gb-fixed-large', 1000, 192, 768, 40, 80], + ['ecs-2cpu-4gb-fixed-large-plus', 1250, 256, 1024, 50, 100], + ['ecs-2cpu-4gb-fixed-xlarge', 1500, 320, 1280, 60, 120] + ].each do |name, max_cached_workflows, workflow_slots, activity_slots, workflow_pollers, activity_pollers| + profile = Harness::Profiles.lookup(name) + tuner = profile.fetch(:tuner) + + assert_fixed_slot_supplier tuner.workflow_slot_supplier, workflow_slots + assert_fixed_slot_supplier tuner.activity_slot_supplier, activity_slots + assert_fixed_slot_supplier tuner.local_activity_slot_supplier, activity_slots + assert_equal max_cached_workflows, profile.fetch(:max_cached_workflows) + assert_equal workflow_pollers, profile.fetch(:max_concurrent_workflow_task_polls) + assert_equal activity_pollers, profile.fetch(:max_concurrent_activity_task_polls) + end + end + def test_build_worker_kwargs_rejects_unknown_worker_profile error = assert_raises(ArgumentError) do Harness::WorkerCLI.build_worker_kwargs(Harness::WorkerCLI.default_options, 'nope') @@ -80,6 +101,11 @@ def test_build_worker_kwargs_rejects_unknown_worker_profile private + def assert_fixed_slot_supplier(slot_supplier, slots) + assert_instance_of Temporalio::Worker::Tuner::SlotSupplier::Fixed, slot_supplier + assert_equal slots, slot_supplier.slots + end + def with_stubbed_run_all(stub_implementation) singleton = Temporalio::Worker.singleton_class singleton.send(:alias_method, :__original_run_all_for_test, :run_all) diff --git a/workers/typescript/harness/profiles.ts b/workers/typescript/harness/profiles.ts index 1b1c1342..294a232e 100644 --- a/workers/typescript/harness/profiles.ts +++ b/workers/typescript/harness/profiles.ts @@ -27,3 +27,30 @@ registerWorkerProfile(RESOURCE_BASED_DEFAULT_PROFILE, { }, }, }); + +const ecsFixedProfiles = [ + ['ecs-2cpu-4gb-fixed-small', 250, 64, 256, 12, 24], + ['ecs-2cpu-4gb-fixed-medium', 500, 96, 384, 20, 40], + ['ecs-2cpu-4gb-fixed-medium-plus', 750, 144, 576, 30, 60], + ['ecs-2cpu-4gb-fixed-large', 1000, 192, 768, 40, 80], + ['ecs-2cpu-4gb-fixed-large-plus', 1250, 256, 1024, 50, 100], + ['ecs-2cpu-4gb-fixed-xlarge', 1500, 320, 1280, 60, 120], +] as const; + +for (const [ + name, + maxCachedWorkflows, + workflowSlots, + activitySlots, + workflowPollers, + activityPollers, +] of ecsFixedProfiles) { + registerWorkerProfile(name, { + maxCachedWorkflows, + maxConcurrentWorkflowTaskExecutions: workflowSlots, + maxConcurrentActivityTaskExecutions: activitySlots, + maxConcurrentLocalActivityExecutions: activitySlots, + maxConcurrentWorkflowTaskPolls: workflowPollers, + maxConcurrentActivityTaskPolls: activityPollers, + }); +} diff --git a/workers/typescript/harness/tests/worker.test.ts b/workers/typescript/harness/tests/worker.test.ts index 119eee89..7f10244f 100644 --- a/workers/typescript/harness/tests/worker.test.ts +++ b/workers/typescript/harness/tests/worker.test.ts @@ -5,7 +5,7 @@ import type { ClientConfig } from '../client.js'; import type { Worker } from '@temporalio/worker'; import { runWorker, runWorkers, type WorkerContext, type WorkerFactory } from '../worker.js'; import { makeClient } from './test-helpers.js'; -import { RESOURCE_BASED_DEFAULT_PROFILE } from '../profiles.js'; +import { lookupWorkerProfile, RESOURCE_BASED_DEFAULT_PROFILE } from '../profiles.js'; interface WorkerFactoryCall { client: Client; @@ -141,6 +141,35 @@ void test('runWorker applies resource based worker profile', async () => { }); }); +void test('lookupWorkerProfile includes fixed ECS worker profiles', () => { + const profiles = [ + ['ecs-2cpu-4gb-fixed-small', 250, 64, 256, 12, 24], + ['ecs-2cpu-4gb-fixed-medium', 500, 96, 384, 20, 40], + ['ecs-2cpu-4gb-fixed-medium-plus', 750, 144, 576, 30, 60], + ['ecs-2cpu-4gb-fixed-large', 1000, 192, 768, 40, 80], + ['ecs-2cpu-4gb-fixed-large-plus', 1250, 256, 1024, 50, 100], + ['ecs-2cpu-4gb-fixed-xlarge', 1500, 320, 1280, 60, 120], + ] as const; + + for (const [ + name, + maxCachedWorkflows, + workflowSlots, + activitySlots, + workflowPollers, + activityPollers, + ] of profiles) { + assert.deepEqual(lookupWorkerProfile(name), { + maxCachedWorkflows, + maxConcurrentWorkflowTaskExecutions: workflowSlots, + maxConcurrentActivityTaskExecutions: activitySlots, + maxConcurrentLocalActivityExecutions: activitySlots, + maxConcurrentWorkflowTaskPolls: workflowPollers, + maxConcurrentActivityTaskPolls: activityPollers, + }); + } +}); + void test('runWorker ignores worker option flags when profile is selected', async () => { const client = makeClient(); const workerFactoryCalls: WorkerFactoryCall[] = []; From a8a718b7e6163470ef098825ef6c86a45ae31677 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 29 May 2026 14:57:19 -0400 Subject: [PATCH 09/10] add temp typescript fix when building from source from dockerfile --- .github/workflows/ci.yml | 14 ++++++++++++++ dockerfiles/typescript.Dockerfile | 7 +++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5ceca017..905ee3c6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -175,6 +175,20 @@ jobs: --option continue-as-new-after-iterations=1 \ --option sleep-time=1ms \ --option visibility-count-timeout=2m + - name: Checkout TypeScript SDK for local SDK path image build + if: matrix.sdk == 'typescript' + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + with: + repository: temporalio/sdk-typescript + submodules: recursive + path: checked-out-sdk + - name: Build TypeScript worker image from local SDK path + if: matrix.sdk == 'typescript' + run: | + go run ./cmd/dev build-worker-image \ + --language typescript \ + --version checked-out-sdk/ \ + --image-tag typescript-local-sdk-path-test build-project: runs-on: ubuntu-latest diff --git a/dockerfiles/typescript.Dockerfile b/dockerfiles/typescript.Dockerfile index 022e30ae..6fd6ade5 100644 --- a/dockerfiles/typescript.Dockerfile +++ b/dockerfiles/typescript.Dockerfile @@ -50,11 +50,14 @@ ENV BUILD_CORE_RELEASE=${BUILD_CORE_RELEASE} COPY workers/proto ./workers/proto COPY workers/typescript ./workers/typescript -# Pin pnpm through Corepack because sdkbuild invokes `corepack pnpm`. +# Pin pnpm through Corepack because sdkbuild invokes `corepack pnpm` and +# TypeScript SDK repo scripts invoke bare `pnpm`. RUN . ./versions.env \ && test -n "${PNPM_VERSION}" \ + && corepack enable \ && corepack prepare "pnpm@${PNPM_VERSION}" --activate \ - && corepack pnpm --version + && corepack pnpm --version \ + && pnpm --version # prepare-worker builds the TypeScript workspace itself: it installs npm deps, # runs the root build, and generates the prepared sdkbuild package. From 119bec4dd8a961ea8f735efd0e06a4aa4d4f0e27 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Sun, 31 May 2026 07:40:17 -0400 Subject: [PATCH 10/10] modified profile sizes --- README.md | 3 +- workers/dotnet/projects/harness/Profiles.cs | 46 +++++------- .../projects/harness/tests/WorkerTests.cs | 11 ++- workers/go/harness/profiles.go | 74 ++++++++----------- workers/go/harness/worker_test.go | 67 ++++++++--------- .../temporal/omes/harness/WorkerProfiles.java | 11 ++- .../omes/harness/WorkerHarnessTest.java | 11 ++- .../python/harness/src/harness/profiles.py | 11 ++- .../harness/tests/test_harness_worker.py | 11 ++- workers/ruby/harness/lib/harness/profiles.rb | 60 +++++++-------- workers/ruby/harness/tests/test_worker.rb | 11 ++- workers/typescript/harness/profiles.ts | 11 ++- .../typescript/harness/tests/worker.test.ts | 11 ++- 13 files changed, 148 insertions(+), 190 deletions(-) diff --git a/README.md b/README.md index 570c8f94..920f127c 100644 --- a/README.md +++ b/README.md @@ -157,11 +157,10 @@ The shared `resource-based-default` profile uses the SDK resource-based worker t fixed ECS tuning candidates for 2 CPU / 4 GB workers that set equivalent fixed slot, poller, and workflow-cache settings in each SDK: +- `ecs-2cpu-4gb-fixed-xsmall` - `ecs-2cpu-4gb-fixed-small` - `ecs-2cpu-4gb-fixed-medium` -- `ecs-2cpu-4gb-fixed-medium-plus` - `ecs-2cpu-4gb-fixed-large` -- `ecs-2cpu-4gb-fixed-large-plus` - `ecs-2cpu-4gb-fixed-xlarge` ```sh diff --git a/workers/dotnet/projects/harness/Profiles.cs b/workers/dotnet/projects/harness/Profiles.cs index fca1d767..81a22e8a 100644 --- a/workers/dotnet/projects/harness/Profiles.cs +++ b/workers/dotnet/projects/harness/Profiles.cs @@ -18,42 +18,36 @@ internal static class WorkerProfiles targetMemoryUsage: 0.8, targetCpuUsage: 0.8), }, + ["ecs-2cpu-4gb-fixed-xsmall"] = CreateFixedEcsProfile( + maxCachedWorkflows: 50, + workflowSlots: 8, + activitySlots: 32, + workflowPollers: 2, + activityPollers: 4), ["ecs-2cpu-4gb-fixed-small"] = CreateFixedEcsProfile( + maxCachedWorkflows: 100, + workflowSlots: 16, + activitySlots: 64, + workflowPollers: 4, + activityPollers: 8), + ["ecs-2cpu-4gb-fixed-medium"] = CreateFixedEcsProfile( maxCachedWorkflows: 250, + workflowSlots: 32, + activitySlots: 128, + workflowPollers: 8, + activityPollers: 16), + ["ecs-2cpu-4gb-fixed-large"] = CreateFixedEcsProfile( + maxCachedWorkflows: 500, workflowSlots: 64, activitySlots: 256, workflowPollers: 12, activityPollers: 24), - ["ecs-2cpu-4gb-fixed-medium"] = CreateFixedEcsProfile( - maxCachedWorkflows: 500, + ["ecs-2cpu-4gb-fixed-xlarge"] = CreateFixedEcsProfile( + maxCachedWorkflows: 750, workflowSlots: 96, activitySlots: 384, workflowPollers: 20, activityPollers: 40), - ["ecs-2cpu-4gb-fixed-medium-plus"] = CreateFixedEcsProfile( - maxCachedWorkflows: 750, - workflowSlots: 144, - activitySlots: 576, - workflowPollers: 30, - activityPollers: 60), - ["ecs-2cpu-4gb-fixed-large"] = CreateFixedEcsProfile( - maxCachedWorkflows: 1000, - workflowSlots: 192, - activitySlots: 768, - workflowPollers: 40, - activityPollers: 80), - ["ecs-2cpu-4gb-fixed-large-plus"] = CreateFixedEcsProfile( - maxCachedWorkflows: 1250, - workflowSlots: 256, - activitySlots: 1024, - workflowPollers: 50, - activityPollers: 100), - ["ecs-2cpu-4gb-fixed-xlarge"] = CreateFixedEcsProfile( - maxCachedWorkflows: 1500, - workflowSlots: 320, - activitySlots: 1280, - workflowPollers: 60, - activityPollers: 120), }; public static WorkerProfile LookupWorkerProfile(string name) diff --git a/workers/dotnet/projects/harness/tests/WorkerTests.cs b/workers/dotnet/projects/harness/tests/WorkerTests.cs index 57c1d2ac..b6a712a4 100644 --- a/workers/dotnet/projects/harness/tests/WorkerTests.cs +++ b/workers/dotnet/projects/harness/tests/WorkerTests.cs @@ -112,12 +112,11 @@ public void LookupWorkerProfileIncludesFixedEcsWorkerProfiles() { var profiles = new[] { - new FixedEcsProfile("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-xsmall", 50, 8, 32, 2, 4), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-small", 100, 16, 64, 4, 8), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 250, 32, 128, 8, 16), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-large", 500, 64, 256, 12, 24), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 750, 96, 384, 20, 40), }; foreach (var expected in profiles) diff --git a/workers/go/harness/profiles.go b/workers/go/harness/profiles.go index b9c70ed3..da618536 100644 --- a/workers/go/harness/profiles.go +++ b/workers/go/harness/profiles.go @@ -37,8 +37,38 @@ func init() { Tuner: mustResourceBasedTuner(0.8, 0.8), }, }) + registerWorkerProfile("ecs-2cpu-4gb-fixed-xsmall", workerProfile{ + StickyWorkflowCacheSize: 50, + Options: sdkworker.Options{ + MaxConcurrentWorkflowTaskExecutionSize: 8, + MaxConcurrentActivityExecutionSize: 32, + MaxConcurrentLocalActivityExecutionSize: 32, + MaxConcurrentWorkflowTaskPollers: 2, + MaxConcurrentActivityTaskPollers: 4, + }, + }) registerWorkerProfile("ecs-2cpu-4gb-fixed-small", workerProfile{ + StickyWorkflowCacheSize: 100, + Options: sdkworker.Options{ + MaxConcurrentWorkflowTaskExecutionSize: 16, + MaxConcurrentActivityExecutionSize: 64, + MaxConcurrentLocalActivityExecutionSize: 64, + MaxConcurrentWorkflowTaskPollers: 4, + MaxConcurrentActivityTaskPollers: 8, + }, + }) + registerWorkerProfile("ecs-2cpu-4gb-fixed-medium", workerProfile{ StickyWorkflowCacheSize: 250, + Options: sdkworker.Options{ + MaxConcurrentWorkflowTaskExecutionSize: 32, + MaxConcurrentActivityExecutionSize: 128, + MaxConcurrentLocalActivityExecutionSize: 128, + MaxConcurrentWorkflowTaskPollers: 8, + MaxConcurrentActivityTaskPollers: 16, + }, + }) + registerWorkerProfile("ecs-2cpu-4gb-fixed-large", workerProfile{ + StickyWorkflowCacheSize: 500, Options: sdkworker.Options{ MaxConcurrentWorkflowTaskExecutionSize: 64, MaxConcurrentActivityExecutionSize: 256, @@ -47,8 +77,8 @@ func init() { MaxConcurrentActivityTaskPollers: 24, }, }) - registerWorkerProfile("ecs-2cpu-4gb-fixed-medium", workerProfile{ - StickyWorkflowCacheSize: 500, + registerWorkerProfile("ecs-2cpu-4gb-fixed-xlarge", workerProfile{ + StickyWorkflowCacheSize: 750, Options: sdkworker.Options{ MaxConcurrentWorkflowTaskExecutionSize: 96, MaxConcurrentActivityExecutionSize: 384, @@ -57,46 +87,6 @@ func init() { MaxConcurrentActivityTaskPollers: 40, }, }) - registerWorkerProfile("ecs-2cpu-4gb-fixed-medium-plus", workerProfile{ - StickyWorkflowCacheSize: 750, - Options: sdkworker.Options{ - MaxConcurrentWorkflowTaskExecutionSize: 144, - MaxConcurrentActivityExecutionSize: 576, - MaxConcurrentLocalActivityExecutionSize: 576, - MaxConcurrentWorkflowTaskPollers: 30, - MaxConcurrentActivityTaskPollers: 60, - }, - }) - registerWorkerProfile("ecs-2cpu-4gb-fixed-large", workerProfile{ - StickyWorkflowCacheSize: 1000, - Options: sdkworker.Options{ - MaxConcurrentWorkflowTaskExecutionSize: 192, - MaxConcurrentActivityExecutionSize: 768, - MaxConcurrentLocalActivityExecutionSize: 768, - MaxConcurrentWorkflowTaskPollers: 40, - MaxConcurrentActivityTaskPollers: 80, - }, - }) - registerWorkerProfile("ecs-2cpu-4gb-fixed-large-plus", workerProfile{ - StickyWorkflowCacheSize: 1250, - Options: sdkworker.Options{ - MaxConcurrentWorkflowTaskExecutionSize: 256, - MaxConcurrentActivityExecutionSize: 1024, - MaxConcurrentLocalActivityExecutionSize: 1024, - MaxConcurrentWorkflowTaskPollers: 50, - MaxConcurrentActivityTaskPollers: 100, - }, - }) - registerWorkerProfile("ecs-2cpu-4gb-fixed-xlarge", workerProfile{ - StickyWorkflowCacheSize: 1500, - Options: sdkworker.Options{ - MaxConcurrentWorkflowTaskExecutionSize: 320, - MaxConcurrentActivityExecutionSize: 1280, - MaxConcurrentLocalActivityExecutionSize: 1280, - MaxConcurrentWorkflowTaskPollers: 60, - MaxConcurrentActivityTaskPollers: 120, - }, - }) } func mustResourceBasedTuner(targetMem, targetCpu float64) sdkworker.WorkerTuner { diff --git a/workers/go/harness/worker_test.go b/workers/go/harness/worker_test.go index ac63a6ae..40d19457 100644 --- a/workers/go/harness/worker_test.go +++ b/workers/go/harness/worker_test.go @@ -94,59 +94,50 @@ func TestLookupWorkerProfileIncludesFixedECSWorkerProfiles(t *testing.T) { workflowPollers int activityPollers int }{ + { + name: "xsmall", + profile: "ecs-2cpu-4gb-fixed-xsmall", + stickyWorkflowCacheSize: 50, + workflowSlots: 8, + activitySlots: 32, + workflowPollers: 2, + activityPollers: 4, + }, { name: "small", profile: "ecs-2cpu-4gb-fixed-small", - stickyWorkflowCacheSize: 250, - workflowSlots: 64, - activitySlots: 256, - workflowPollers: 12, - activityPollers: 24, + stickyWorkflowCacheSize: 100, + workflowSlots: 16, + activitySlots: 64, + workflowPollers: 4, + activityPollers: 8, }, { name: "medium", profile: "ecs-2cpu-4gb-fixed-medium", - stickyWorkflowCacheSize: 500, - workflowSlots: 96, - activitySlots: 384, - workflowPollers: 20, - activityPollers: 40, - }, - { - name: "medium plus", - profile: "ecs-2cpu-4gb-fixed-medium-plus", - stickyWorkflowCacheSize: 750, - workflowSlots: 144, - activitySlots: 576, - workflowPollers: 30, - activityPollers: 60, + stickyWorkflowCacheSize: 250, + workflowSlots: 32, + activitySlots: 128, + workflowPollers: 8, + activityPollers: 16, }, { name: "large", profile: "ecs-2cpu-4gb-fixed-large", - stickyWorkflowCacheSize: 1000, - workflowSlots: 192, - activitySlots: 768, - workflowPollers: 40, - activityPollers: 80, - }, - { - name: "large plus", - profile: "ecs-2cpu-4gb-fixed-large-plus", - stickyWorkflowCacheSize: 1250, - workflowSlots: 256, - activitySlots: 1024, - workflowPollers: 50, - activityPollers: 100, + stickyWorkflowCacheSize: 500, + workflowSlots: 64, + activitySlots: 256, + workflowPollers: 12, + activityPollers: 24, }, { name: "xlarge", profile: "ecs-2cpu-4gb-fixed-xlarge", - stickyWorkflowCacheSize: 1500, - workflowSlots: 320, - activitySlots: 1280, - workflowPollers: 60, - activityPollers: 120, + stickyWorkflowCacheSize: 750, + workflowSlots: 96, + activitySlots: 384, + workflowPollers: 20, + activityPollers: 40, }, } diff --git a/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java index 55e437c9..35c8a8e9 100644 --- a/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java +++ b/workers/java/harness/src/main/java/io/temporal/omes/harness/WorkerProfiles.java @@ -40,12 +40,11 @@ Integer workflowCacheSize() { ResourceBasedControllerOptions.newBuilder(0.8, 0.8).build()) .build()) .build()); - registerFixedEcsProfile("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24); - registerFixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40); - registerFixedEcsProfile("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60); - registerFixedEcsProfile("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80); - registerFixedEcsProfile("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100); - registerFixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-xsmall", 50, 8, 32, 2, 4); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-small", 100, 16, 64, 4, 8); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 250, 32, 128, 8, 16); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-large", 500, 64, 256, 12, 24); + registerFixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 750, 96, 384, 20, 40); } private WorkerProfiles() {} diff --git a/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java b/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java index bc699f2d..1f785954 100644 --- a/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java +++ b/workers/java/harness/src/test/java/io/temporal/omes/harness/WorkerHarnessTest.java @@ -50,12 +50,11 @@ void buildWorkerOptionsIgnoresWorkerOptionFlagsWhenProfileIsSelected() { void buildWorkerOptionsIncludesFixedEcsWorkerProfiles() { List profiles = List.of( - new FixedEcsProfile("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100), - new FixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120)); + new FixedEcsProfile("ecs-2cpu-4gb-fixed-xsmall", 50, 8, 32, 2, 4), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-small", 100, 16, 64, 4, 8), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 250, 32, 128, 8, 16), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-large", 500, 64, 256, 12, 24), + new FixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 750, 96, 384, 20, 40)); for (FixedEcsProfile expected : profiles) { WorkerOptions workerOptions = diff --git a/workers/python/harness/src/harness/profiles.py b/workers/python/harness/src/harness/profiles.py index 99e3fd3a..accba50c 100644 --- a/workers/python/harness/src/harness/profiles.py +++ b/workers/python/harness/src/harness/profiles.py @@ -57,11 +57,10 @@ def _register_fixed_ecs_profile( for _profile in ( - ("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24), - ("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40), - ("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60), - ("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80), - ("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100), - ("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120), + ("ecs-2cpu-4gb-fixed-xsmall", 50, 8, 32, 2, 4), + ("ecs-2cpu-4gb-fixed-small", 100, 16, 64, 4, 8), + ("ecs-2cpu-4gb-fixed-medium", 250, 32, 128, 8, 16), + ("ecs-2cpu-4gb-fixed-large", 500, 64, 256, 12, 24), + ("ecs-2cpu-4gb-fixed-xlarge", 750, 96, 384, 20, 40), ): _register_fixed_ecs_profile(*_profile) diff --git a/workers/python/harness/tests/test_harness_worker.py b/workers/python/harness/tests/test_harness_worker.py index 14b324cf..a32de23c 100644 --- a/workers/python/harness/tests/test_harness_worker.py +++ b/workers/python/harness/tests/test_harness_worker.py @@ -116,12 +116,11 @@ def test_build_worker_kwargs_ignores_worker_option_flags_when_profile_is_selecte def test_lookup_profile_includes_fixed_ecs_worker_profiles(self) -> None: profiles = ( - ("ecs-2cpu-4gb-fixed-small", 250, 64, 256, 12, 24), - ("ecs-2cpu-4gb-fixed-medium", 500, 96, 384, 20, 40), - ("ecs-2cpu-4gb-fixed-medium-plus", 750, 144, 576, 30, 60), - ("ecs-2cpu-4gb-fixed-large", 1000, 192, 768, 40, 80), - ("ecs-2cpu-4gb-fixed-large-plus", 1250, 256, 1024, 50, 100), - ("ecs-2cpu-4gb-fixed-xlarge", 1500, 320, 1280, 60, 120), + ("ecs-2cpu-4gb-fixed-xsmall", 50, 8, 32, 2, 4), + ("ecs-2cpu-4gb-fixed-small", 100, 16, 64, 4, 8), + ("ecs-2cpu-4gb-fixed-medium", 250, 32, 128, 8, 16), + ("ecs-2cpu-4gb-fixed-large", 500, 64, 256, 12, 24), + ("ecs-2cpu-4gb-fixed-xlarge", 750, 96, 384, 20, 40), ) for ( diff --git a/workers/ruby/harness/lib/harness/profiles.rb b/workers/ruby/harness/lib/harness/profiles.rb index d8cdeda2..4874f5f8 100644 --- a/workers/ruby/harness/lib/harness/profiles.rb +++ b/workers/ruby/harness/lib/harness/profiles.rb @@ -54,53 +54,45 @@ def register_fixed_ecs_profile( } ) + register_fixed_ecs_profile( + 'ecs-2cpu-4gb-fixed-xsmall', + max_cached_workflows: 50, + workflow_slots: 8, + activity_slots: 32, + workflow_pollers: 2, + activity_pollers: 4 + ) register_fixed_ecs_profile( 'ecs-2cpu-4gb-fixed-small', + max_cached_workflows: 100, + workflow_slots: 16, + activity_slots: 64, + workflow_pollers: 4, + activity_pollers: 8 + ) + register_fixed_ecs_profile( + 'ecs-2cpu-4gb-fixed-medium', max_cached_workflows: 250, + workflow_slots: 32, + activity_slots: 128, + workflow_pollers: 8, + activity_pollers: 16 + ) + register_fixed_ecs_profile( + 'ecs-2cpu-4gb-fixed-large', + max_cached_workflows: 500, workflow_slots: 64, activity_slots: 256, workflow_pollers: 12, activity_pollers: 24 ) register_fixed_ecs_profile( - 'ecs-2cpu-4gb-fixed-medium', - max_cached_workflows: 500, + 'ecs-2cpu-4gb-fixed-xlarge', + max_cached_workflows: 750, workflow_slots: 96, activity_slots: 384, workflow_pollers: 20, activity_pollers: 40 ) - register_fixed_ecs_profile( - 'ecs-2cpu-4gb-fixed-medium-plus', - max_cached_workflows: 750, - workflow_slots: 144, - activity_slots: 576, - workflow_pollers: 30, - activity_pollers: 60 - ) - register_fixed_ecs_profile( - 'ecs-2cpu-4gb-fixed-large', - max_cached_workflows: 1000, - workflow_slots: 192, - activity_slots: 768, - workflow_pollers: 40, - activity_pollers: 80 - ) - register_fixed_ecs_profile( - 'ecs-2cpu-4gb-fixed-large-plus', - max_cached_workflows: 1250, - workflow_slots: 256, - activity_slots: 1024, - workflow_pollers: 50, - activity_pollers: 100 - ) - register_fixed_ecs_profile( - 'ecs-2cpu-4gb-fixed-xlarge', - max_cached_workflows: 1500, - workflow_slots: 320, - activity_slots: 1280, - workflow_pollers: 60, - activity_pollers: 120 - ) end end diff --git a/workers/ruby/harness/tests/test_worker.rb b/workers/ruby/harness/tests/test_worker.rb index 2d362703..12a6a9d3 100644 --- a/workers/ruby/harness/tests/test_worker.rb +++ b/workers/ruby/harness/tests/test_worker.rb @@ -73,12 +73,11 @@ def test_build_worker_kwargs_ignores_worker_option_flags_when_profile_is_selecte def test_lookup_profile_includes_fixed_ecs_worker_profiles [ - ['ecs-2cpu-4gb-fixed-small', 250, 64, 256, 12, 24], - ['ecs-2cpu-4gb-fixed-medium', 500, 96, 384, 20, 40], - ['ecs-2cpu-4gb-fixed-medium-plus', 750, 144, 576, 30, 60], - ['ecs-2cpu-4gb-fixed-large', 1000, 192, 768, 40, 80], - ['ecs-2cpu-4gb-fixed-large-plus', 1250, 256, 1024, 50, 100], - ['ecs-2cpu-4gb-fixed-xlarge', 1500, 320, 1280, 60, 120] + ['ecs-2cpu-4gb-fixed-xsmall', 50, 8, 32, 2, 4], + ['ecs-2cpu-4gb-fixed-small', 100, 16, 64, 4, 8], + ['ecs-2cpu-4gb-fixed-medium', 250, 32, 128, 8, 16], + ['ecs-2cpu-4gb-fixed-large', 500, 64, 256, 12, 24], + ['ecs-2cpu-4gb-fixed-xlarge', 750, 96, 384, 20, 40] ].each do |name, max_cached_workflows, workflow_slots, activity_slots, workflow_pollers, activity_pollers| profile = Harness::Profiles.lookup(name) tuner = profile.fetch(:tuner) diff --git a/workers/typescript/harness/profiles.ts b/workers/typescript/harness/profiles.ts index 294a232e..24e3e067 100644 --- a/workers/typescript/harness/profiles.ts +++ b/workers/typescript/harness/profiles.ts @@ -29,12 +29,11 @@ registerWorkerProfile(RESOURCE_BASED_DEFAULT_PROFILE, { }); const ecsFixedProfiles = [ - ['ecs-2cpu-4gb-fixed-small', 250, 64, 256, 12, 24], - ['ecs-2cpu-4gb-fixed-medium', 500, 96, 384, 20, 40], - ['ecs-2cpu-4gb-fixed-medium-plus', 750, 144, 576, 30, 60], - ['ecs-2cpu-4gb-fixed-large', 1000, 192, 768, 40, 80], - ['ecs-2cpu-4gb-fixed-large-plus', 1250, 256, 1024, 50, 100], - ['ecs-2cpu-4gb-fixed-xlarge', 1500, 320, 1280, 60, 120], + ['ecs-2cpu-4gb-fixed-xsmall', 50, 8, 32, 2, 4], + ['ecs-2cpu-4gb-fixed-small', 100, 16, 64, 4, 8], + ['ecs-2cpu-4gb-fixed-medium', 250, 32, 128, 8, 16], + ['ecs-2cpu-4gb-fixed-large', 500, 64, 256, 12, 24], + ['ecs-2cpu-4gb-fixed-xlarge', 750, 96, 384, 20, 40], ] as const; for (const [ diff --git a/workers/typescript/harness/tests/worker.test.ts b/workers/typescript/harness/tests/worker.test.ts index 7f10244f..150ae0ab 100644 --- a/workers/typescript/harness/tests/worker.test.ts +++ b/workers/typescript/harness/tests/worker.test.ts @@ -143,12 +143,11 @@ void test('runWorker applies resource based worker profile', async () => { void test('lookupWorkerProfile includes fixed ECS worker profiles', () => { const profiles = [ - ['ecs-2cpu-4gb-fixed-small', 250, 64, 256, 12, 24], - ['ecs-2cpu-4gb-fixed-medium', 500, 96, 384, 20, 40], - ['ecs-2cpu-4gb-fixed-medium-plus', 750, 144, 576, 30, 60], - ['ecs-2cpu-4gb-fixed-large', 1000, 192, 768, 40, 80], - ['ecs-2cpu-4gb-fixed-large-plus', 1250, 256, 1024, 50, 100], - ['ecs-2cpu-4gb-fixed-xlarge', 1500, 320, 1280, 60, 120], + ['ecs-2cpu-4gb-fixed-xsmall', 50, 8, 32, 2, 4], + ['ecs-2cpu-4gb-fixed-small', 100, 16, 64, 4, 8], + ['ecs-2cpu-4gb-fixed-medium', 250, 32, 128, 8, 16], + ['ecs-2cpu-4gb-fixed-large', 500, 64, 256, 12, 24], + ['ecs-2cpu-4gb-fixed-xlarge', 750, 96, 384, 20, 40], ] as const; for (const [