diff --git a/dockerfiles/go.Dockerfile b/dockerfiles/go.Dockerfile index d07f1d0de..5d0ac2f57 100644 --- a/dockerfiles/go.Dockerfile +++ b/dockerfiles/go.Dockerfile @@ -12,6 +12,7 @@ COPY scenarios ./scenarios COPY metrics ./metrics COPY devserver ./devserver COPY versions ./versions +COPY internal ./internal COPY workers/*.go ./workers/ COPY workers/go/harness/api ./workers/go/harness/api COPY go.mod go.sum ./ diff --git a/workers/test_env.go b/internal/workertest/env.go similarity index 98% rename from workers/test_env.go rename to internal/workertest/env.go index ab3fbadd3..5d0a1bc53 100644 --- a/workers/test_env.go +++ b/internal/workertest/env.go @@ -1,4 +1,4 @@ -package workers +package workertest import ( "context" @@ -15,6 +15,7 @@ import ( "github.com/temporalio/omes/devserver" "github.com/temporalio/omes/loadgen" "github.com/temporalio/omes/versions" + "github.com/temporalio/omes/workers" "go.temporal.io/api/nexus/v1" "go.temporal.io/api/operatorservice/v1" "go.temporal.io/sdk/client" @@ -107,7 +108,7 @@ func SetupTestEnvironment(t *testing.T, opts ...TestEnvOption) *TestEnvironment Ref: serverRef, Namespace: testNamespace, DynamicConfigValues: cfg.dynamicConfig, - Output: &logWriter{logger: serverLogger}, + Output: workers.NewLogWriter(serverLogger), Logger: serverLogger, }) require.NoError(t, err, "Failed to start dev server") diff --git a/workers/test_historyrequire.go b/internal/workertest/historyrequire.go similarity index 99% rename from workers/test_historyrequire.go rename to internal/workertest/historyrequire.go index 6a359bd0b..e5a64ddee 100644 --- a/workers/test_historyrequire.go +++ b/internal/workertest/historyrequire.go @@ -1,4 +1,4 @@ -package workers +package workertest import ( "encoding/json" diff --git a/workers/test_workers.go b/internal/workertest/workerpool.go similarity index 91% rename from workers/test_workers.go rename to internal/workertest/workerpool.go index 11c1e45db..f27a087c1 100644 --- a/workers/test_workers.go +++ b/internal/workertest/workerpool.go @@ -1,4 +1,4 @@ -package workers +package workertest import ( "context" @@ -10,6 +10,7 @@ import ( "github.com/temporalio/omes/clioptions" "github.com/temporalio/omes/loadgen" + "github.com/temporalio/omes/workers" "go.uber.org/zap" ) @@ -52,7 +53,7 @@ func (w *workerPool) ensureWorkerBuilt( w.mutex.Unlock() once.Do(func() { - baseDir := BaseDir(w.env.repoDir, sdk) + baseDir := workers.BaseDir(w.env.repoDir, sdk) buildDir := filepath.Join(baseDir, w.env.buildDirName()) w.mutex.Lock() @@ -63,7 +64,7 @@ func (w *workerPool) ensureWorkerBuilt( }) w.mutex.Unlock() - builder := Builder{ + builder := workers.Builder{ DirName: w.env.buildDirName(), SdkOptions: clioptions.SdkOptions{Language: sdk}, Logger: logger.Named(fmt.Sprintf("%s-builder", sdk)), @@ -97,9 +98,9 @@ func (w *workerPool) startWorker( go func() { defer close(workerDone) - baseDir := BaseDir(w.env.repoDir, sdk) - runner := &Runner{ - Builder: Builder{ + baseDir := workers.BaseDir(w.env.repoDir, sdk) + runner := &workers.Runner{ + Builder: workers.Builder{ DirName: w.env.buildDirName(), SdkOptions: clioptions.SdkOptions{Language: sdk}, Logger: logger.Named(fmt.Sprintf("%s-worker-builder", sdk)), diff --git a/loadgen/kitchen_sink_executor_test.go b/loadgen/kitchen_sink_executor_test.go index 99347e00c..56b0967c3 100644 --- a/loadgen/kitchen_sink_executor_test.go +++ b/loadgen/kitchen_sink_executor_test.go @@ -11,9 +11,9 @@ import ( "github.com/stretchr/testify/require" "github.com/temporalio/omes/clioptions" + . "github.com/temporalio/omes/internal/workertest" . "github.com/temporalio/omes/loadgen" . "github.com/temporalio/omes/loadgen/kitchensink" - . "github.com/temporalio/omes/workers" "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" "go.temporal.io/api/history/v1" diff --git a/scenarios/ebb_and_flow_test.go b/scenarios/ebb_and_flow_test.go index b7977842f..06aa10178 100644 --- a/scenarios/ebb_and_flow_test.go +++ b/scenarios/ebb_and_flow_test.go @@ -7,15 +7,15 @@ import ( "github.com/stretchr/testify/require" "github.com/temporalio/omes/clioptions" + "github.com/temporalio/omes/internal/workertest" "github.com/temporalio/omes/loadgen" - "github.com/temporalio/omes/workers" ) func TestEbbAndFlow(t *testing.T) { t.Parallel() - env := workers.SetupTestEnvironment(t, - workers.WithExecutorTimeout(2*time.Minute)) + env := workertest.SetupTestEnvironment(t, + workertest.WithExecutorTimeout(2*time.Minute)) sleepActivityJson := `{ "count": { diff --git a/scenarios/long_idle_workflow_test.go b/scenarios/long_idle_workflow_test.go index 7abade771..c2abb5877 100644 --- a/scenarios/long_idle_workflow_test.go +++ b/scenarios/long_idle_workflow_test.go @@ -7,8 +7,8 @@ import ( "github.com/stretchr/testify/require" "github.com/temporalio/omes/clioptions" + "github.com/temporalio/omes/internal/workertest" "github.com/temporalio/omes/loadgen" - "github.com/temporalio/omes/workers" ) func TestLongIdleWorkflow_ParseConfig(t *testing.T) { @@ -103,8 +103,8 @@ func TestLongIdleWorkflow_BuildActions(t *testing.T) { func TestLongIdleWorkflow(t *testing.T) { t.Parallel() - env := workers.SetupTestEnvironment(t, - workers.WithExecutorTimeout(1*time.Minute)) + env := workertest.SetupTestEnvironment(t, + workertest.WithExecutorTimeout(1*time.Minute)) baseRunID := fmt.Sprintf("liw-%d", time.Now().Unix()) diff --git a/scenarios/out_of_order_signals_test.go b/scenarios/out_of_order_signals_test.go index b7ca54379..a29870d2c 100644 --- a/scenarios/out_of_order_signals_test.go +++ b/scenarios/out_of_order_signals_test.go @@ -9,9 +9,9 @@ import ( "google.golang.org/protobuf/proto" "github.com/temporalio/omes/clioptions" + "github.com/temporalio/omes/internal/workertest" "github.com/temporalio/omes/loadgen" "github.com/temporalio/omes/loadgen/kitchensink" - "github.com/temporalio/omes/workers" ) // oooSignalKvs extracts the SetWorkflowState key/value map carried by a single @@ -132,8 +132,8 @@ func TestOutOfOrderSignals_BuildOrderedAwaitActions(t *testing.T) { func TestOutOfOrderSignals(t *testing.T) { t.Parallel() - env := workers.SetupTestEnvironment(t, - workers.WithExecutorTimeout(1*time.Minute)) + env := workertest.SetupTestEnvironment(t, + workertest.WithExecutorTimeout(1*time.Minute)) baseRunID := fmt.Sprintf("ooo-%d", time.Now().Unix()) diff --git a/scenarios/serverless_burst_test.go b/scenarios/serverless_burst_test.go index 55ebe8c22..73ece2d3c 100644 --- a/scenarios/serverless_burst_test.go +++ b/scenarios/serverless_burst_test.go @@ -9,15 +9,15 @@ import ( "github.com/stretchr/testify/require" "github.com/temporalio/omes/clioptions" + "github.com/temporalio/omes/internal/workertest" "github.com/temporalio/omes/loadgen" - "github.com/temporalio/omes/workers" ) func TestServerlessBurst(t *testing.T) { t.Parallel() - env := workers.SetupTestEnvironment(t, - workers.WithExecutorTimeout(1*time.Minute)) + env := workertest.SetupTestEnvironment(t, + workertest.WithExecutorTimeout(1*time.Minute)) baseRunID := fmt.Sprintf("sb-%d", time.Now().Unix()) diff --git a/scenarios/throughput_stress_test.go b/scenarios/throughput_stress_test.go index 3c5c3440b..a19561c0d 100644 --- a/scenarios/throughput_stress_test.go +++ b/scenarios/throughput_stress_test.go @@ -7,8 +7,8 @@ import ( "github.com/stretchr/testify/require" "github.com/temporalio/omes/clioptions" + "github.com/temporalio/omes/internal/workertest" "github.com/temporalio/omes/loadgen" - "github.com/temporalio/omes/workers" ) func TestThroughputStress(t *testing.T) { @@ -16,9 +16,9 @@ func TestThroughputStress(t *testing.T) { runID := fmt.Sprintf("tps-%d", time.Now().Unix()) - env := workers.SetupTestEnvironment(t, - workers.WithExecutorTimeout(1*time.Minute), - workers.WithNexusEndpoint(runID)) + env := workertest.SetupTestEnvironment(t, + workertest.WithExecutorTimeout(1*time.Minute), + workertest.WithNexusEndpoint(runID)) scenarioInfo := loadgen.ScenarioInfo{ RunID: runID, diff --git a/scenarios/workflow_with_many_timers_test.go b/scenarios/workflow_with_many_timers_test.go index 76d41649c..ebfc39a64 100644 --- a/scenarios/workflow_with_many_timers_test.go +++ b/scenarios/workflow_with_many_timers_test.go @@ -7,8 +7,8 @@ import ( "github.com/stretchr/testify/require" "github.com/temporalio/omes/clioptions" + "github.com/temporalio/omes/internal/workertest" "github.com/temporalio/omes/loadgen" - "github.com/temporalio/omes/workers" ) func TestWorkflowWithManyTimers_ParseConfig(t *testing.T) { @@ -125,8 +125,8 @@ func TestWorkflowWithManyTimers_BuildActions(t *testing.T) { func TestWorkflowWithManyTimers(t *testing.T) { t.Parallel() - env := workers.SetupTestEnvironment(t, - workers.WithExecutorTimeout(1*time.Minute)) + env := workertest.SetupTestEnvironment(t, + workertest.WithExecutorTimeout(1*time.Minute)) baseRunID := fmt.Sprintf("wmt-%d", time.Now().Unix()) diff --git a/workers/build.go b/workers/build.go index 6d091e988..26c58792b 100644 --- a/workers/build.go +++ b/workers/build.go @@ -42,10 +42,10 @@ func (b *Builder) Build(ctx context.Context, baseDir string) (sdkbuild.Program, } if b.stdout == nil { - b.stdout = &logWriter{logger: b.Logger} + b.stdout = NewLogWriter(b.Logger) } if b.stderr == nil { - b.stderr = &logWriter{logger: b.Logger} + b.stderr = NewLogWriter(b.Logger) } switch b.SdkOptions.Language { diff --git a/workers/log.go b/workers/log.go index 109fe32c7..7feea5d3b 100644 --- a/workers/log.go +++ b/workers/log.go @@ -6,13 +6,18 @@ import ( "go.uber.org/zap" ) -// logWriter implements io.Writer and streams output line by line to a logger -type logWriter struct { +// LogWriter implements io.Writer and streams output line by line to a logger +type LogWriter struct { logger *zap.SugaredLogger buffer bytes.Buffer } -func (w *logWriter) Write(p []byte) (n int, err error) { +// NewLogWriter returns a LogWriter that streams output line by line to logger. +func NewLogWriter(logger *zap.SugaredLogger) *LogWriter { + return &LogWriter{logger: logger} +} + +func (w *LogWriter) Write(p []byte) (n int, err error) { w.buffer.Write(p) for { diff --git a/workers/run.go b/workers/run.go index 517c778a8..4d523da12 100644 --- a/workers/run.go +++ b/workers/run.go @@ -157,8 +157,8 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { // Direct logging output to provided logger, if available. if r.LoggingOptions.PreparedLogger != nil { - cmd.Stdout = &logWriter{logger: r.LoggingOptions.PreparedLogger} - cmd.Stderr = &logWriter{logger: r.LoggingOptions.PreparedLogger} + cmd.Stdout = NewLogWriter(r.LoggingOptions.PreparedLogger) + cmd.Stderr = NewLogWriter(r.LoggingOptions.PreparedLogger) } // Start the command. Do not use the context so we can send interrupt.