diff --git a/adapter/sqs.go b/adapter/sqs.go index acff4ff0..aaf15047 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -326,6 +326,18 @@ func (s *SQSServer) Run() error { // request hot path never pays the O(N) sweep cost. Cleaned up by // the same reaperCtx cancellation that stops the message reaper. go s.throttle.runSweepLoop(s.reaperCtx) + if s.listen == nil { + // Listenless mode: the SQS adapter was constructed without a + // public HTTP listener (--sqsAddress empty). Admin endpoints + // in adapter/sqs_admin.go still work because they go through + // the coordinator/store — only the SigV4 wire surface is + // suppressed. Block until Stop() cancels reaperCtx so the + // errgroup task lifetime matches the listening branch + // (callers wait on Run() returning to know it's safe to tear + // down the underlying coordinator/store). + <-s.reaperCtx.Done() + return nil + } if err := s.httpServer.Serve(s.listen); err != nil && !errors.Is(err, http.ErrServerClosed) { return errors.WithStack(err) } diff --git a/adapter/sqs_test.go b/adapter/sqs_test.go index 3083119a..21ddd776 100644 --- a/adapter/sqs_test.go +++ b/adapter/sqs_test.go @@ -205,3 +205,33 @@ func TestSQSServer_StopShutsDown(t *testing.T) { t.Fatal("Run did not return within timeout after Stop") } } + +// TestSQSServer_ListenlessRunStopRoundTrip pins the listenless +// construction path used by startSQSServer when --sqsAddress is +// empty: NewSQSServer must accept a nil net.Listener, Run() must +// return cleanly when Stop() cancels the reaper context, and the +// reaper / throttle-sweep goroutines must wind down with the same +// cancellation. Regression guard for the admin-only deployment +// shape — without this branch the SQS admin endpoints would be +// dark on a build whose operator deliberately closed the public +// SigV4 surface. +func TestSQSServer_ListenlessRunStopRoundTrip(t *testing.T) { + t.Parallel() + srv := NewSQSServer(nil, nil, nil) + done := make(chan error, 1) + go func() { done <- srv.Run() }() + // Give Run() a tick to enter the reaperCtx.Done() block. Unlike + // the listening path there's no socket Accept loop to enter, so + // a shorter pause is enough. + time.Sleep(20 * time.Millisecond) + srv.Stop() + + select { + case err := <-done: + if err != nil { + t.Fatalf("run: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("listenless Run did not return within timeout after Stop") + } +} diff --git a/main.go b/main.go index f94f6089..eae3ece1 100644 --- a/main.go +++ b/main.go @@ -1469,9 +1469,11 @@ type runtimeServerRunner struct { s3Server *adapter.S3Server // sqsServer plays the same role for the SQS admin entrypoints - // (adapter/sqs_admin.go). Nil when --sqsAddress is empty; the - // admin listener then leaves /admin/api/v1/sqs/* off the wire - // (the mux 404s those paths). + // (adapter/sqs_admin.go). Always non-nil after startup — + // startSQSServer constructs a listenless SQSServer when + // --sqsAddress is empty (the public SigV4 listener is + // suppressed but the admin bridge stays wired since the admin + // handlers only need the coordinator/store, not the listener). sqsServer *adapter.SQSServer // sqsPartitionResolver is the concrete pointer to the same diff --git a/main_sqs.go b/main_sqs.go index 23ad1603..a47c17b7 100644 --- a/main_sqs.go +++ b/main_sqs.go @@ -11,11 +11,17 @@ import ( "golang.org/x/sync/errgroup" ) -// startSQSServer stands up the SQS adapter on sqsAddr and returns the -// running *adapter.SQSServer so the admin listener can call SigV4-bypass -// admin entrypoints against it (see adapter/sqs_admin.go). Returns -// (nil, nil) when sqsAddr is empty — that is the "SQS disabled" branch -// and the admin listener leaves /admin/api/v1/sqs/* off the wire. +// startSQSServer constructs the SQS adapter and returns the running +// *adapter.SQSServer. The admin listener calls SigV4-bypass admin +// entrypoints against this server (see adapter/sqs_admin.go); those +// admin methods only need the coordinator/store, NOT the public SQS +// HTTP listener. So when sqsAddr is empty the function still +// constructs the server (with a nil net.Listener) — Run() then skips +// httpServer.Serve while the reaper and throttle-sweep goroutines +// still run, keeping retention math behind the admin counters +// correct. The admin bridge in main_admin.go therefore wires +// /admin/api/v1/sqs/* on the wire even on builds that disabled the +// public SigV4 endpoint. func startSQSServer( ctx context.Context, lc *net.ListenConfig, @@ -30,16 +36,19 @@ func startSQSServer( partitionObserver adapter.SQSPartitionObserver, ) (*adapter.SQSServer, error) { sqsAddr = strings.TrimSpace(sqsAddr) - if sqsAddr == "" { - return nil, nil - } - sqsL, err := lc.Listen(ctx, "tcp", sqsAddr) - if err != nil { - return nil, errors.Wrapf(err, "failed to listen on %s", sqsAddr) + var sqsL net.Listener + if sqsAddr != "" { + var err error + sqsL, err = lc.Listen(ctx, "tcp", sqsAddr) + if err != nil { + return nil, errors.Wrapf(err, "failed to listen on %s", sqsAddr) + } } staticCreds, err := loadSigV4StaticCredentialsFile(credentialsFile, "sqs") if err != nil { - _ = sqsL.Close() + if sqsL != nil { + _ = sqsL.Close() + } return nil, err } sqsServer := adapter.NewSQSServer(