Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,20 @@ jobs:
--option continue-as-new-after-iterations=1 \
--option sleep-time=1ms \
--option visibility-count-timeout=2m
- name: Checkout TypeScript SDK for local SDK path image build
if: matrix.sdk == 'typescript'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
repository: temporalio/sdk-typescript
submodules: recursive
path: checked-out-sdk
- name: Build TypeScript worker image from local SDK path
if: matrix.sdk == 'typescript'
run: |
go run ./cmd/dev build-worker-image \
--language typescript \
--version checked-out-sdk/ \
--image-tag typescript-local-sdk-path-test

build-project:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ omes.sln
/last_fuzz_run.proto
workers/dotnet/Temporalio.Omes.temp.csproj

workers/java/harness/bin/
workers/python/**/__pycache__/
workers/typescript/harness/dist/
workers/typescript/harness/dist-test/
Expand Down
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,28 @@ Notes:
- `--task-queue-suffix-index-start` and `--task-queue-suffix-index-end` represent an inclusive range for running the
worker on multiple task queues. The process will create a worker for every task queue from `<task-queue>-<start>`
through `<task-queue>-end`. This only applies to multi-task-queue scenarios.
- `--worker-profile` selects a code-defined worker configuration profile in the language harness. Omes forwards the
selected profile to the worker process via `OMES_WORKER_PROFILE`; the profile is not a worker CLI flag.

Worker profiles provide a named worker configuration selected with `--worker-profile`. When a profile is selected, it
takes precedence over worker configuration flags such as poller counts, autoscale poller settings, activity rate limits,
slot counts, and worker versioning options. Non-worker-configuration flags, such as task queue selection, client
connection settings, logging, metrics, and `--worker-err-on-unimplemented`, continue to apply normally.

The shared `resource-based-default` profile uses the SDK resource-based worker tuner. The worker harnesses also provide
fixed ECS tuning candidates for 2 CPU / 4 GB workers that set equivalent fixed slot, poller, and workflow-cache
settings in each SDK:

- `ecs-2cpu-4gb-fixed-xsmall`
- `ecs-2cpu-4gb-fixed-small`
- `ecs-2cpu-4gb-fixed-medium`
- `ecs-2cpu-4gb-fixed-large`
- `ecs-2cpu-4gb-fixed-xlarge`

```sh
go run ./cmd run-scenario-with-worker --scenario throughput_stress --language go \
--run-id local-profile-test --worker-profile resource-based-default
```

### Run a test scenario

Expand Down
1 change: 1 addition & 0 deletions cmd/cli/run_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (r *workerRunner) addCLIFlags(fs *pflag.FlagSet) {
fs.StringVar(&r.EmbeddedServerAddress, "embedded-server-address", "", "Address to bind local embedded server to")
fs.IntVar(&r.TaskQueueIndexSuffixStart, "task-queue-suffix-index-start", 0, "Inclusive start for task queue suffix range")
fs.IntVar(&r.TaskQueueIndexSuffixEnd, "task-queue-suffix-index-end", 0, "Inclusive end for task queue suffix range")
fs.StringVar(&r.WorkerProfile, "worker-profile", "", "Worker configuration profile to apply in the worker harness")
fs.AddFlagSet(r.ClientOptions.FlagSet())
fs.AddFlagSet(r.MetricsOptions.FlagSet("worker-"))
fs.AddFlagSet(r.WorkerOptions.FlagSet())
Expand Down
7 changes: 5 additions & 2 deletions dockerfiles/typescript.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ ENV BUILD_CORE_RELEASE=${BUILD_CORE_RELEASE}
COPY workers/proto ./workers/proto
COPY workers/typescript ./workers/typescript

# Pin pnpm through Corepack because sdkbuild invokes `corepack pnpm`.
# Pin pnpm through Corepack because sdkbuild invokes `corepack pnpm` and
# TypeScript SDK repo scripts invoke bare `pnpm`.
RUN . ./versions.env \
&& test -n "${PNPM_VERSION}" \
&& corepack enable \
&& corepack prepare "pnpm@${PNPM_VERSION}" --activate \
&& corepack pnpm --version
&& corepack pnpm --version \
&& pnpm --version

# prepare-worker builds the TypeScript workspace itself: it installs npm deps,
# runs the root build, and generates the prepared sdkbuild package.
Expand Down
78 changes: 78 additions & 0 deletions workers/dotnet/projects/harness/Profiles.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using Temporalio.Worker;
using Temporalio.Worker.Tuning;
using WorkerProfile = Temporalio.Worker.TemporalWorkerOptions;

namespace Temporalio.Omes.Projects.Harness;

internal static class WorkerProfiles
{
public const string EnvVarName = "OMES_WORKER_PROFILE";
public const string ResourceBasedDefaultProfile = "resource-based-default";

private static readonly IReadOnlyDictionary<string, WorkerProfile> Profiles =
new Dictionary<string, WorkerProfile>
{
[ResourceBasedDefaultProfile] = new TemporalWorkerOptions
{
Tuner = WorkerTuner.CreateResourceBased(
targetMemoryUsage: 0.8,
targetCpuUsage: 0.8),
},
["ecs-2cpu-4gb-fixed-xsmall"] = CreateFixedEcsProfile(
maxCachedWorkflows: 50,
workflowSlots: 8,
activitySlots: 32,
workflowPollers: 2,
activityPollers: 4),
["ecs-2cpu-4gb-fixed-small"] = CreateFixedEcsProfile(
maxCachedWorkflows: 100,
workflowSlots: 16,
activitySlots: 64,
workflowPollers: 4,
activityPollers: 8),
["ecs-2cpu-4gb-fixed-medium"] = CreateFixedEcsProfile(
maxCachedWorkflows: 250,
workflowSlots: 32,
activitySlots: 128,
workflowPollers: 8,
activityPollers: 16),
["ecs-2cpu-4gb-fixed-large"] = CreateFixedEcsProfile(
maxCachedWorkflows: 500,
workflowSlots: 64,
activitySlots: 256,
workflowPollers: 12,
activityPollers: 24),
["ecs-2cpu-4gb-fixed-xlarge"] = CreateFixedEcsProfile(
maxCachedWorkflows: 750,
workflowSlots: 96,
activitySlots: 384,
workflowPollers: 20,
activityPollers: 40),
};

public static WorkerProfile LookupWorkerProfile(string name)
{
if (!Profiles.TryGetValue(name, out var profile))
{
throw new ArgumentException($"Unknown worker profile \"{name}\"");
}

return (WorkerProfile)profile.Clone();
}

private static WorkerProfile CreateFixedEcsProfile(
int maxCachedWorkflows,
int workflowSlots,
int activitySlots,
int workflowPollers,
int activityPollers) =>
new TemporalWorkerOptions
{
MaxCachedWorkflows = maxCachedWorkflows,
MaxConcurrentWorkflowTasks = workflowSlots,
MaxConcurrentActivities = activitySlots,
MaxConcurrentLocalActivities = activitySlots,
MaxConcurrentWorkflowTaskPolls = workflowPollers,
MaxConcurrentActivityTaskPolls = activityPollers,
};
}
19 changes: 15 additions & 4 deletions workers/dotnet/projects/harness/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ await RunCoreAsync(
workerFactory: (client, workerContext) => app.Worker(client, workerContext),
clientFactory: app.ClientFactory,
options: options,
runWorkersAsync: RunWorkersAsync);
runWorkersAsync: RunWorkersAsync,
workerProfile: Environment.GetEnvironmentVariable(WorkerProfiles.EnvVarName));
context.ExitCode = 0;
}
catch (ArgumentException err)
Expand All @@ -154,7 +155,8 @@ internal static async Task RunCoreAsync<TWorker>(
WorkerFactoryCore<TWorker> workerFactory,
ClientFactory clientFactory,
WorkerCliOptions options,
Func<IReadOnlyList<TWorker>, Task> runWorkersAsync)
Func<IReadOnlyList<TWorker>, Task> runWorkersAsync,
string? workerProfile = null)
{
if (options.TaskQueueSuffixIndexStart > options.TaskQueueSuffixIndexEnd)
{
Expand All @@ -173,7 +175,7 @@ internal static async Task RunCoreAsync<TWorker>(
promListenAddress: options.PromListenAddress,
loggerFactory: loggerFactory);

var workerSettings = BuildWorkerSettings(options);
var workerSettings = BuildWorkerSettings(options, workerProfile);
var client = await clientFactory(clientConfig);
var taskQueues = BuildTaskQueues(
logger,
Expand Down Expand Up @@ -303,8 +305,9 @@ private static List<string> BuildTaskQueues(
return taskQueues;
}

private static WorkerSettings BuildWorkerSettings(WorkerCliOptions options) =>
private static WorkerSettings BuildWorkerSettings(WorkerCliOptions options, string? profile) =>
new(
Profile: profile,
MaxConcurrentActivities: options.MaxConcurrentActivities,
MaxConcurrentWorkflowTasks: options.MaxConcurrentWorkflowTasks,
ActivitiesPerSecond: options.ActivitiesPerSecond,
Expand All @@ -315,6 +318,13 @@ private static WorkerSettings BuildWorkerSettings(WorkerCliOptions options) =>

private static TemporalWorkerOptions BuildWorkerOptions(string taskQueue, WorkerSettings settings)
{
if (!string.IsNullOrEmpty(settings.Profile))
{
var profileOptions = WorkerProfiles.LookupWorkerProfile(settings.Profile);
profileOptions.TaskQueue = taskQueue;
return profileOptions;
}

var workerOptions = new TemporalWorkerOptions(taskQueue);
if (settings.MaxConcurrentWorkflowTasks is { } maxConcurrentWorkflowTasks)
{
Expand Down Expand Up @@ -406,6 +416,7 @@ internal static RootCommand CreateWorkerCommand()
}

private sealed record WorkerSettings(
string? Profile,
int? MaxConcurrentActivities,
int? MaxConcurrentWorkflowTasks,
double? ActivitiesPerSecond,
Expand Down
105 changes: 104 additions & 1 deletion workers/dotnet/projects/harness/tests/WorkerTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.CommandLine;
using Temporalio.Client;
using Temporalio.Omes.Projects.Harness;
using Temporalio.Worker;
using Xunit;

namespace Temporalio.Omes.Projects.Tests.HarnessTests;
Expand Down Expand Up @@ -45,7 +46,8 @@ await WorkerHarness.RunCoreAsync(
{
runWorkersInput = workers;
return Task.CompletedTask;
});
},
workerProfile: null);

Assert.All(seenClients, client => Assert.Same(sharedClient, client));
Assert.Equal(["omes-1", "omes-2"], createdWorkers);
Expand All @@ -57,6 +59,99 @@ await WorkerHarness.RunCoreAsync(
Assert.Null(capturedConfig.Tls);
}

[Fact]
public async Task RunAppliesResourceBasedWorkerProfile()
{
var sharedClient = HarnessTestSupport.CreateStrictTemporalClientProbe();
TemporalWorkerOptions? capturedOptions = null;
var options = WorkerHarness.ParseWorkerOptions(
WorkerHarness.CreateWorkerCommand().Parse());

await WorkerHarness.RunCoreAsync(
workerFactory: (_, context) =>
{
capturedOptions = context.WorkerOptions;
return context.TaskQueue;
},
clientFactory: _ => Task.FromResult(sharedClient),
options: options,
runWorkersAsync: _ => Task.CompletedTask,
workerProfile: WorkerProfiles.ResourceBasedDefaultProfile);

Assert.NotNull(capturedOptions);
Assert.NotNull(capturedOptions!.Tuner);
}

[Fact]
public async Task RunIgnoresWorkerOptionFlagsWhenProfileIsSelected()
{
var sharedClient = HarnessTestSupport.CreateStrictTemporalClientProbe();
TemporalWorkerOptions? capturedOptions = null;
var options = WorkerHarness.ParseWorkerOptions(
WorkerHarness.CreateWorkerCommand().Parse(
["--max-concurrent-activities", "50"]));

await WorkerHarness.RunCoreAsync(
workerFactory: (_, context) =>
{
capturedOptions = context.WorkerOptions;
return context.TaskQueue;
},
clientFactory: _ => Task.FromResult(sharedClient),
options: options,
runWorkersAsync: _ => Task.CompletedTask,
workerProfile: WorkerProfiles.ResourceBasedDefaultProfile);

Assert.NotNull(capturedOptions);
Assert.NotNull(capturedOptions!.Tuner);
Assert.NotEqual(50, capturedOptions.MaxConcurrentActivities);
}

[Fact]
public void LookupWorkerProfileIncludesFixedEcsWorkerProfiles()
{
var profiles = new[]
{
new FixedEcsProfile("ecs-2cpu-4gb-fixed-xsmall", 50, 8, 32, 2, 4),
new FixedEcsProfile("ecs-2cpu-4gb-fixed-small", 100, 16, 64, 4, 8),
new FixedEcsProfile("ecs-2cpu-4gb-fixed-medium", 250, 32, 128, 8, 16),
new FixedEcsProfile("ecs-2cpu-4gb-fixed-large", 500, 64, 256, 12, 24),
new FixedEcsProfile("ecs-2cpu-4gb-fixed-xlarge", 750, 96, 384, 20, 40),
};

foreach (var expected in profiles)
{
var profile = WorkerProfiles.LookupWorkerProfile(expected.Name);

Assert.Null(profile.Tuner);
Assert.Equal(expected.MaxCachedWorkflows, profile.MaxCachedWorkflows);
Assert.Equal(expected.WorkflowSlots, profile.MaxConcurrentWorkflowTasks);
Assert.Equal(expected.ActivitySlots, profile.MaxConcurrentActivities);
Assert.Equal(expected.ActivitySlots, profile.MaxConcurrentLocalActivities);
Assert.Equal(expected.WorkflowPollers, profile.MaxConcurrentWorkflowTaskPolls);
Assert.Equal(expected.ActivityPollers, profile.MaxConcurrentActivityTaskPolls);
}
}

[Fact]
public async Task RunRejectsUnknownWorkerProfile()
{
var sharedClient = HarnessTestSupport.CreateStrictTemporalClientProbe();
var options = WorkerHarness.ParseWorkerOptions(
WorkerHarness.CreateWorkerCommand().Parse(["--log-level", "panic"]));

var error = await Assert.ThrowsAsync<ArgumentException>(
() =>
WorkerHarness.RunCoreAsync(
workerFactory: (_, context) => context.TaskQueue,
clientFactory: _ => Task.FromResult(sharedClient),
options: options,
runWorkersAsync: _ => Task.CompletedTask,
workerProfile: "nope"));

Assert.Contains("Unknown worker profile \"nope\"", error.Message);
}

[Fact]
public async Task WorkerModeCancelsRemainingWorkersWhenOneFails()
{
Expand Down Expand Up @@ -148,3 +243,11 @@ public async Task RunAsync(CancellationToken cancellationToken)

public void Dispose() => DisposeCalls++;
}

file sealed record FixedEcsProfile(
string Name,
int MaxCachedWorkflows,
int WorkflowSlots,
int ActivitySlots,
int WorkflowPollers,
int ActivityPollers);
17 changes: 17 additions & 0 deletions workers/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package workers

import (
"slices"
"strings"
)

func withEnv(environ []string, name string, value string) []string {
prefix := name + "="
nextEnv := slices.DeleteFunc(slices.Clone(environ), func(item string) bool {
return strings.HasPrefix(item, prefix)
})
if value != "" {
nextEnv = append(nextEnv, prefix+value)
}
return nextEnv
}
Loading
Loading