Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions workers/test_env.go → internal/workertest/env.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workers
package workertest

import (
"context"
Expand All @@ -15,6 +15,7 @@ import (
"github.com/temporalio/omes/devserver"
"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/versions"
"github.com/temporalio/omes/workers"
"go.temporal.io/api/nexus/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/sdk/client"
Expand Down Expand Up @@ -107,7 +108,7 @@ func SetupTestEnvironment(t *testing.T, opts ...TestEnvOption) *TestEnvironment
Ref: serverRef,
Namespace: testNamespace,
DynamicConfigValues: cfg.dynamicConfig,
Output: &logWriter{logger: serverLogger},
Output: workers.NewLogWriter(serverLogger),
Logger: serverLogger,
})
require.NoError(t, err, "Failed to start dev server")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workers
package workertest

import (
"encoding/json"
Expand Down
13 changes: 7 additions & 6 deletions workers/test_workers.go → internal/workertest/workerpool.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workers
package workertest

import (
"context"
Expand All @@ -10,6 +10,7 @@ import (

"github.com/temporalio/omes/clioptions"
"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/workers"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -52,7 +53,7 @@ func (w *workerPool) ensureWorkerBuilt(
w.mutex.Unlock()

once.Do(func() {
baseDir := BaseDir(w.env.repoDir, sdk)
baseDir := workers.BaseDir(w.env.repoDir, sdk)
buildDir := filepath.Join(baseDir, w.env.buildDirName())

w.mutex.Lock()
Expand All @@ -63,7 +64,7 @@ func (w *workerPool) ensureWorkerBuilt(
})
w.mutex.Unlock()

builder := Builder{
builder := workers.Builder{
DirName: w.env.buildDirName(),
SdkOptions: clioptions.SdkOptions{Language: sdk},
Logger: logger.Named(fmt.Sprintf("%s-builder", sdk)),
Expand Down Expand Up @@ -97,9 +98,9 @@ func (w *workerPool) startWorker(

go func() {
defer close(workerDone)
baseDir := BaseDir(w.env.repoDir, sdk)
runner := &Runner{
Builder: Builder{
baseDir := workers.BaseDir(w.env.repoDir, sdk)
runner := &workers.Runner{
Builder: workers.Builder{
DirName: w.env.buildDirName(),
SdkOptions: clioptions.SdkOptions{Language: sdk},
Logger: logger.Named(fmt.Sprintf("%s-worker-builder", sdk)),
Expand Down
2 changes: 1 addition & 1 deletion loadgen/kitchen_sink_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

"github.com/stretchr/testify/require"
"github.com/temporalio/omes/clioptions"
. "github.com/temporalio/omes/internal/workertest"
. "github.com/temporalio/omes/loadgen"
. "github.com/temporalio/omes/loadgen/kitchensink"
. "github.com/temporalio/omes/workers"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
Expand Down
6 changes: 3 additions & 3 deletions scenarios/ebb_and_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (

"github.com/stretchr/testify/require"
"github.com/temporalio/omes/clioptions"
"github.com/temporalio/omes/internal/workertest"
"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/workers"
)

func TestEbbAndFlow(t *testing.T) {
t.Parallel()

env := workers.SetupTestEnvironment(t,
workers.WithExecutorTimeout(2*time.Minute))
env := workertest.SetupTestEnvironment(t,
workertest.WithExecutorTimeout(2*time.Minute))

sleepActivityJson := `{
"count": {
Expand Down
6 changes: 3 additions & 3 deletions scenarios/serverless_burst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (

"github.com/stretchr/testify/require"
"github.com/temporalio/omes/clioptions"
"github.com/temporalio/omes/internal/workertest"
"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/workers"
)

func TestServerlessBurst(t *testing.T) {
t.Parallel()

env := workers.SetupTestEnvironment(t,
workers.WithExecutorTimeout(1*time.Minute))
env := workertest.SetupTestEnvironment(t,
workertest.WithExecutorTimeout(1*time.Minute))

baseRunID := fmt.Sprintf("sb-%d", time.Now().Unix())

Expand Down
8 changes: 4 additions & 4 deletions scenarios/throughput_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import (

"github.com/stretchr/testify/require"
"github.com/temporalio/omes/clioptions"
"github.com/temporalio/omes/internal/workertest"
"github.com/temporalio/omes/loadgen"
"github.com/temporalio/omes/workers"
)

func TestThroughputStress(t *testing.T) {
t.Parallel()

runID := fmt.Sprintf("tps-%d", time.Now().Unix())

env := workers.SetupTestEnvironment(t,
workers.WithExecutorTimeout(1*time.Minute),
workers.WithNexusEndpoint(runID))
env := workertest.SetupTestEnvironment(t,
workertest.WithExecutorTimeout(1*time.Minute),
workertest.WithNexusEndpoint(runID))

scenarioInfo := loadgen.ScenarioInfo{
RunID: runID,
Expand Down
4 changes: 2 additions & 2 deletions workers/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func (b *Builder) Build(ctx context.Context, baseDir string) (sdkbuild.Program,
}

if b.stdout == nil {
b.stdout = &logWriter{logger: b.Logger}
b.stdout = NewLogWriter(b.Logger)
}
if b.stderr == nil {
b.stderr = &logWriter{logger: b.Logger}
b.stderr = NewLogWriter(b.Logger)
}

switch b.SdkOptions.Language {
Expand Down
11 changes: 8 additions & 3 deletions workers/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import (
"go.uber.org/zap"
)

// logWriter implements io.Writer and streams output line by line to a logger
type logWriter struct {
// LogWriter implements io.Writer and streams output line by line to a logger
type LogWriter struct {
logger *zap.SugaredLogger
buffer bytes.Buffer
}

func (w *logWriter) Write(p []byte) (n int, err error) {
// NewLogWriter returns a LogWriter that streams output line by line to logger.
func NewLogWriter(logger *zap.SugaredLogger) *LogWriter {
return &LogWriter{logger: logger}
}

func (w *LogWriter) Write(p []byte) (n int, err error) {
w.buffer.Write(p)

for {
Expand Down
4 changes: 2 additions & 2 deletions workers/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error {

// Direct logging output to provided logger, if available.
if r.LoggingOptions.PreparedLogger != nil {
cmd.Stdout = &logWriter{logger: r.LoggingOptions.PreparedLogger}
cmd.Stderr = &logWriter{logger: r.LoggingOptions.PreparedLogger}
cmd.Stdout = NewLogWriter(r.LoggingOptions.PreparedLogger)
cmd.Stderr = NewLogWriter(r.LoggingOptions.PreparedLogger)
}

// Start the command. Do not use the context so we can send interrupt.
Expand Down
Loading