Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions core/cli/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,23 @@ type AgentWorkerCMD struct {
NatsServiceJWT string `env:"LOCALAI_NATS_SERVICE_JWT" help:"Fallback NATS service JWT when registration does not mint agent JWT" group:"distributed"`
NatsServiceSeed string `env:"LOCALAI_NATS_SERVICE_SEED" help:"Fallback NATS service seed paired with LOCALAI_NATS_SERVICE_JWT" group:"distributed"`
NatsRequireAuth bool `env:"LOCALAI_NATS_REQUIRE_AUTH" default:"false" help:"Require NATS JWT+seed to connect" group:"distributed"`
NatsTLSCA string `env:"LOCALAI_NATS_TLS_CA" type:"existingfile" help:"PEM file for NATS server CA (private PKI)" group:"distributed"`
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`
// DistributedRequireAuth is the umbrella switch; for the agent worker (which
// has no file-transfer server) it implies NATS auth is required.
DistributedRequireAuth bool `env:"LOCALAI_DISTRIBUTED_REQUIRE_AUTH" default:"false" help:"Umbrella switch implying --nats-require-auth (agent workers have no file-transfer server)" group:"distributed"`
NatsTLSCA string `env:"LOCALAI_NATS_TLS_CA" type:"existingfile" help:"PEM file for NATS server CA (private PKI)" group:"distributed"`
NatsTLSCert string `env:"LOCALAI_NATS_TLS_CERT" type:"existingfile" help:"Client certificate for NATS mTLS" group:"distributed"`
NatsTLSKey string `env:"LOCALAI_NATS_TLS_KEY" type:"existingfile" help:"Client private key for NATS mTLS" group:"distributed"`

// Timeouts
MCPCIJobTimeout string `env:"LOCALAI_MCP_CI_JOB_TIMEOUT" default:"10m" help:"Timeout for MCP CI job execution" group:"distributed"`
}

// natsAuthRequired reports whether NATS JWT credentials must be present — the
// granular flag or the umbrella (LOCALAI_DISTRIBUTED_REQUIRE_AUTH).
func (cmd *AgentWorkerCMD) natsAuthRequired() bool {
return cmd.NatsRequireAuth || cmd.DistributedRequireAuth
}

func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
xlog.Info("Starting agent worker", "nats", sanitize.URL(cmd.NatsURL), "register_to", cmd.RegisterTo)

Expand Down Expand Up @@ -102,7 +111,7 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
func(ctx context.Context) (*workerregistry.RegisterResponse, error) {
return regClient.RegisterFull(ctx, registrationBody)
},
cmd.NatsRequireAuth && cmd.NatsJWT == "" && cmd.NatsServiceJWT == "",
cmd.natsAuthRequired() && cmd.NatsJWT == "" && cmd.NatsServiceJWT == "",
)
res, err := credMgr.Acquire(shutdownCtx)
if err != nil {
Expand Down Expand Up @@ -149,7 +158,7 @@ func (cmd *AgentWorkerCMD) Run(ctx *cliContext.Context) error {
return fmt.Errorf("LOCALAI_NATS_SERVICE_JWT and LOCALAI_NATS_SERVICE_SEED must be set together")
}
natsOpts = append(natsOpts, messaging.WithUserJWT(cmd.NatsServiceJWT, cmd.NatsServiceSeed))
case cmd.NatsRequireAuth:
case cmd.natsAuthRequired():
return fmt.Errorf("NATS JWT+seed required: enable frontend minting or set LOCALAI_NATS_* env vars")
}
if natsTLS.Enabled() {
Expand Down
8 changes: 8 additions & 0 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type RunCMD struct {
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key ID" group:"distributed"`
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret access key" group:"distributed"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token that backend nodes must provide to register (empty = no auth required)" group:"distributed"`
RegistrationRequireAuth bool `env:"LOCALAI_REGISTRATION_REQUIRE_AUTH" default:"false" help:"Fail startup when distributed mode is enabled but LOCALAI_REGISTRATION_TOKEN is empty (node endpoints and worker file-transfer server would otherwise be unauthenticated)" group:"distributed"`
DistributedRequireAuth bool `env:"LOCALAI_DISTRIBUTED_REQUIRE_AUTH" default:"false" help:"Umbrella switch: require BOTH NATS JWT credentials and a registration token when distributed mode is enabled (implies --nats-require-auth and --registration-require-auth)" group:"distributed"`
AutoApproveNodes bool `env:"LOCALAI_AUTO_APPROVE_NODES" default:"false" help:"Auto-approve new worker nodes (skip admin approval)" group:"distributed"`
DistributedPrefixCache bool `env:"LOCALAI_DISTRIBUTED_PREFIX_CACHE" default:"true" help:"Enable prefix-cache-aware routing in distributed mode (default true). When false, routing falls back to round-robin." group:"distributed"`
DistributedPrefixCacheTTL string `env:"LOCALAI_DISTRIBUTED_PREFIX_CACHE_TTL" help:"Idle-timeout for prefix-cache index entries; also drives the background eviction cadence (every TTL/2). Default 5m." group:"distributed"`
Expand Down Expand Up @@ -291,6 +293,12 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if r.RegistrationToken != "" {
opts = append(opts, config.WithRegistrationToken(r.RegistrationToken))
}
if r.RegistrationRequireAuth {
opts = append(opts, config.EnableRegistrationRequireAuth)
}
if r.DistributedRequireAuth {
opts = append(opts, config.EnableDistributedRequireAuth)
}
if r.NatsAccountSeed != "" {
opts = append(opts, config.WithNatsAccountSeed(r.NatsAccountSeed))
}
Expand Down
50 changes: 46 additions & 4 deletions core/config/distributed_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,19 @@ type DistributedConfig struct {
NatsURL string // --nats-url / LOCALAI_NATS_URL
StorageURL string // --storage-url / LOCALAI_STORAGE_URL (S3 endpoint)
RegistrationToken string // --registration-token / LOCALAI_REGISTRATION_TOKEN (required token for node registration)
AutoApproveNodes bool // --auto-approve-nodes / LOCALAI_AUTO_APPROVE_NODES (skip admin approval for new workers)
// RegistrationRequireAuth fails startup when distributed mode is enabled but
// RegistrationToken is empty. The default (false) keeps the historical
// fail-open behavior with a loud warning; production should set it so the
// node-register endpoints and the worker file-transfer server cannot run
// unauthenticated. Mirrors NatsRequireAuth for the NATS bus.
RegistrationRequireAuth bool // LOCALAI_REGISTRATION_REQUIRE_AUTH
// RequireAuth is the umbrella switch (LOCALAI_DISTRIBUTED_REQUIRE_AUTH) for
// distributed-mode auth: when true it implies BOTH NatsRequireAuth and
// RegistrationRequireAuth, so a single knob locks down the bus and the
// registration/file-transfer layer together. The granular flags remain
// available to enforce just one layer.
RequireAuth bool // LOCALAI_DISTRIBUTED_REQUIRE_AUTH
AutoApproveNodes bool // --auto-approve-nodes / LOCALAI_AUTO_APPROVE_NODES (skip admin approval for new workers)

// NATS JWT auth (optional; see pkg/natsauth and docs/features/distributed-mode.md)
NatsAccountSeed string // LOCALAI_NATS_ACCOUNT_SEED — account signing seed to mint per-node worker JWTs
Expand Down Expand Up @@ -88,9 +100,15 @@ func (c DistributedConfig) Validate() error {
(c.StorageAccessKey == "" && c.StorageSecretKey != "") {
return fmt.Errorf("storage-access-key and storage-secret-key must both be set or both empty")
}
// Warn about missing registration token (not an error)
// The registration token guards both the node HTTP register/heartbeat
// endpoints and the worker file-transfer server (which fails open on an
// empty token). Enforce it when registration auth is required (the granular
// flag or the umbrella); otherwise warn.
if c.RegistrationToken == "" {
xlog.Warn("distributed mode running without registration token — node endpoints are unprotected")
if c.RegistrationAuthRequired() {
return fmt.Errorf("registration auth is required (LOCALAI_REGISTRATION_REQUIRE_AUTH or LOCALAI_DISTRIBUTED_REQUIRE_AUTH) but LOCALAI_REGISTRATION_TOKEN is empty")
}
xlog.Warn("distributed mode running without registration token — node endpoints and the worker file-transfer server are unprotected; set LOCALAI_REGISTRATION_TOKEN, or LOCALAI_DISTRIBUTED_REQUIRE_AUTH=true to fail closed")
}
if err := c.NatsAuthConfig().Validate(); err != nil {
return err
Expand Down Expand Up @@ -170,6 +188,30 @@ var EnableNatsRequireAuth = func(o *ApplicationConfig) {
o.Distributed.NatsRequireAuth = true
}

// EnableRegistrationRequireAuth makes an empty registration token a hard error
// in distributed mode (see DistributedConfig.RegistrationRequireAuth).
var EnableRegistrationRequireAuth = func(o *ApplicationConfig) {
o.Distributed.RegistrationRequireAuth = true
}

// EnableDistributedRequireAuth is the umbrella switch implying both
// NatsRequireAuth and RegistrationRequireAuth (see DistributedConfig.RequireAuth).
var EnableDistributedRequireAuth = func(o *ApplicationConfig) {
o.Distributed.RequireAuth = true
}

// RegistrationAuthRequired reports whether an empty registration token must be
// treated as a fatal misconfiguration — the granular flag or the umbrella.
func (c DistributedConfig) RegistrationAuthRequired() bool {
return c.RegistrationRequireAuth || c.RequireAuth
}

// NatsAuthRequired reports whether NATS JWT credentials must be present — the
// granular flag or the umbrella.
func (c DistributedConfig) NatsAuthRequired() bool {
return c.NatsRequireAuth || c.RequireAuth
}

func WithNatsTLSCA(path string) AppOption {
return func(o *ApplicationConfig) {
o.Distributed.NatsTLSCA = path
Expand Down Expand Up @@ -316,7 +358,7 @@ func (c DistributedConfig) NatsAuthConfig() natsauth.Config {
ServiceUserJWT: c.NatsServiceJWT,
ServiceUserSeed: c.NatsServiceSeed,
WorkerJWTTTL: c.NatsWorkerJWTTTL,
RequireAuth: c.NatsRequireAuth,
RequireAuth: c.NatsAuthRequired(),
}
}

Expand Down
63 changes: 63 additions & 0 deletions core/config/distributed_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,66 @@ var _ = Describe("DistributedConfig.Validate negative-duration errors", func() {
Expect(c.Validate()).To(Succeed())
})
})

var _ = Describe("DistributedConfig.Validate registration auth", func() {
It("rejects an empty registration token when RequireAuth is set", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
RegistrationRequireAuth: true,
}
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("LOCALAI_REGISTRATION_REQUIRE_AUTH"))
Expect(err.Error()).To(ContainSubstring("LOCALAI_REGISTRATION_TOKEN"))
})

It("accepts a set registration token when RequireAuth is set", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
RegistrationToken: "s3cret",
RegistrationRequireAuth: true,
}
Expect(c.Validate()).To(Succeed())
})

It("warns but succeeds with an empty token when RequireAuth is unset", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
}
Expect(c.Validate()).To(Succeed())
})

It("rejects an empty token when the umbrella RequireAuth is set", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
RequireAuth: true,
// Provide NATS creds so only the registration-token gap remains.
NatsServiceJWT: "jwt",
NatsServiceSeed: "seed",
NatsAccountSeed: "acct",
}
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("LOCALAI_DISTRIBUTED_REQUIRE_AUTH"))
Expect(err.Error()).To(ContainSubstring("LOCALAI_REGISTRATION_TOKEN"))
})

It("the umbrella implies NATS auth is required", func() {
c := config.DistributedConfig{
Enabled: true,
NatsURL: "nats://localhost:4222",
RegistrationToken: "tok", // registration layer satisfied
RequireAuth: true, // umbrella → NATS creds now required
}
Expect(c.NatsAuthRequired()).To(BeTrue())
Expect(c.RegistrationAuthRequired()).To(BeTrue())
// Missing NATS service JWT/seed must now be fatal.
err := c.Validate()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("LOCALAI_NATS_REQUIRE_AUTH"))
})
})
11 changes: 11 additions & 0 deletions core/services/nodes/file_transfer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ func StartFileTransferServerWithListener(lis net.Listener, stagingDir, modelsDir
return nil, fmt.Errorf("creating staging dir %s: %w", stagingDir, err)
}

// An empty token makes checkBearerToken fail open: every /v1/files,
// /v1/files-list and /v1/backend-logs request is served unauthenticated,
// granting read/write to the staging/models/data directories to anyone who
// can reach this port. Surface that loudly — the worker process does not
// run DistributedConfig.Validate(), so this is the only signal an operator
// gets. Set LOCALAI_REGISTRATION_TOKEN (and LOCALAI_REGISTRATION_REQUIRE_AUTH
// to fail closed) to protect it.
if token == "" {
xlog.Warn("HTTP file transfer server starting WITHOUT a registration token — read/write to models/staging/data is unauthenticated for anyone who can reach this port; set LOCALAI_REGISTRATION_TOKEN")
}

mux := http.NewServeMux()

// PUT /v1/files/{key} — upload file
Expand Down
48 changes: 48 additions & 0 deletions core/services/nodes/file_transfer_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/hex"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -893,3 +894,50 @@ func sha256Hex(data []byte) string {
h := sha256.Sum256(data)
return hex.EncodeToString(h[:])
}

var _ = Describe("StartFileTransferServerWithListener", func() {
start := func(token string) (string, func()) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
Expect(err).NotTo(HaveOccurred())
staging := GinkgoT().TempDir()
models := GinkgoT().TempDir()
data := GinkgoT().TempDir()
srv, err := StartFileTransferServerWithListener(lis, staging, models, data, token, 0)
Expect(err).NotTo(HaveOccurred())
base := "http://" + lis.Addr().String()
return base, func() { ShutdownFileTransferServer(srv) }
}

// Exercises the empty-token fail-open warning branch: the server serves
// file requests with no Authorization header at all.
It("serves unauthenticated when started without a token", func() {
base, stop := start("")
defer stop()

resp, err := http.Get(base + "/v1/files/missing.bin")
Expect(err).NotTo(HaveOccurred())
defer func() { _ = resp.Body.Close() }()
// No 401 — the empty token fails open. The file is absent so we get 404.
Expect(resp.StatusCode).To(Equal(http.StatusNotFound))
})

It("rejects requests without the bearer token when a token is set", func() {
base, stop := start("s3cret")
defer stop()

resp, err := http.Get(base + "/v1/files/missing.bin")
Expect(err).NotTo(HaveOccurred())
defer func() { _ = resp.Body.Close() }()
Expect(resp.StatusCode).To(Equal(http.StatusUnauthorized))
})

It("serves the unauthenticated health endpoints regardless of token", func() {
base, stop := start("s3cret")
defer stop()

resp, err := http.Get(base + "/healthz")
Expect(err).NotTo(HaveOccurred())
defer func() { _ = resp.Body.Close() }()
Expect(resp.StatusCode).To(Equal(http.StatusOK))
})
})
30 changes: 30 additions & 0 deletions core/services/worker/auth_required_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package worker

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Worker auth-required helpers", func() {
DescribeTable("NatsAuthRequired",
func(nats, umbrella, want bool) {
cfg := &Config{NatsRequireAuth: nats, DistributedRequireAuth: umbrella}
Expect(cfg.NatsAuthRequired()).To(Equal(want))
},
Entry("neither", false, false, false),
Entry("granular only", true, false, true),
Entry("umbrella only", false, true, true),
Entry("both", true, true, true),
)

DescribeTable("RegistrationAuthRequired",
func(reg, umbrella, want bool) {
cfg := &Config{RegistrationRequireAuth: reg, DistributedRequireAuth: umbrella}
Expect(cfg.RegistrationAuthRequired()).To(Equal(want))
},
Entry("neither", false, false, false),
Entry("granular only", true, false, true),
Entry("umbrella only", false, true, true),
Entry("both", true, true, true),
)
})
26 changes: 20 additions & 6 deletions core/services/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ type Config struct {
AdvertiseHTTPAddr string `env:"LOCALAI_ADVERTISE_HTTP_ADDR" help:"HTTP address the frontend uses to reach this node for file transfer" group:"server" hidden:""`

// Registration (required)
AdvertiseAddr string `env:"LOCALAI_ADVERTISE_ADDR" help:"Address the frontend uses to reach this node (defaults to hostname:port from Addr)" group:"registration" hidden:""`
RegisterTo string `env:"LOCALAI_REGISTER_TO" required:"" help:"Frontend URL for registration" group:"registration"`
NodeName string `env:"LOCALAI_NODE_NAME" help:"Node name for registration (defaults to hostname)" group:"registration"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token for authenticating with the frontend" group:"registration"`
HeartbeatInterval string `env:"LOCALAI_HEARTBEAT_INTERVAL" default:"10s" help:"Interval between heartbeats" group:"registration"`
NodeLabels string `env:"LOCALAI_NODE_LABELS" help:"Comma-separated key=value labels for this node (e.g. tier=fast,gpu=a100)" group:"registration"`
AdvertiseAddr string `env:"LOCALAI_ADVERTISE_ADDR" help:"Address the frontend uses to reach this node (defaults to hostname:port from Addr)" group:"registration" hidden:""`
RegisterTo string `env:"LOCALAI_REGISTER_TO" required:"" help:"Frontend URL for registration" group:"registration"`
NodeName string `env:"LOCALAI_NODE_NAME" help:"Node name for registration (defaults to hostname)" group:"registration"`
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token for authenticating with the frontend" group:"registration"`
RegistrationRequireAuth bool `env:"LOCALAI_REGISTRATION_REQUIRE_AUTH" default:"false" help:"Refuse to start the HTTP file-transfer server when no registration token is set (otherwise it fails open and serves read/write to models/staging/data unauthenticated)" group:"registration"`
DistributedRequireAuth bool `env:"LOCALAI_DISTRIBUTED_REQUIRE_AUTH" default:"false" help:"Umbrella switch implying both --nats-require-auth and --registration-require-auth" group:"distributed"`
HeartbeatInterval string `env:"LOCALAI_HEARTBEAT_INTERVAL" default:"10s" help:"Interval between heartbeats" group:"registration"`
NodeLabels string `env:"LOCALAI_NODE_LABELS" help:"Comma-separated key=value labels for this node (e.g. tier=fast,gpu=a100)" group:"registration"`
// MaxReplicasPerModel caps how many replicas of any one model can run on
// this worker concurrently. Default 1 = historical single-replica
// behavior. Set higher when a node has enough VRAM to host multiple
Expand All @@ -75,3 +77,15 @@ type Config struct {
StorageAccessKey string `env:"LOCALAI_STORAGE_ACCESS_KEY" help:"S3 access key" group:"distributed"`
StorageSecretKey string `env:"LOCALAI_STORAGE_SECRET_KEY" help:"S3 secret key" group:"distributed"`
}

// NatsAuthRequired reports whether NATS JWT credentials must be present — the
// granular flag or the umbrella (LOCALAI_DISTRIBUTED_REQUIRE_AUTH).
func (c Config) NatsAuthRequired() bool {
return c.NatsRequireAuth || c.DistributedRequireAuth
}

// RegistrationAuthRequired reports whether a registration token must be set
// before the file-transfer server may start — the granular flag or the umbrella.
func (c Config) RegistrationAuthRequired() bool {
return c.RegistrationRequireAuth || c.DistributedRequireAuth
}
Loading
Loading