diff --git a/cmd/index.go b/cmd/index.go index 334b64f..4d6592d 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -251,6 +251,11 @@ func runIndexer(cmd *cobra.Command, cfg *config.ConfigService, emb *embedder.Fai } defer lock.Release() + // Seed before opening the DB, under the index lock acquired above so only + // one indexer seeds. Doing it here (not just in the MCP getOrCreate path) + // covers the background indexer that lumen index runs at SessionStart. + seedFromDonorIfNew(dbPath, projectPath, emb.ModelName(), logger) + // Cancel context on SIGTERM or SIGINT so the indexer stops cleanly and // the deferred lock.Release() runs before exit. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) diff --git a/cmd/seed.go b/cmd/seed.go new file mode 100644 index 0000000..2c599d2 --- /dev/null +++ b/cmd/seed.go @@ -0,0 +1,56 @@ +// Copyright 2026 Aeneas Rekkas +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "log/slog" + "os" + + "github.com/ory/lumen/internal/config" + "github.com/ory/lumen/internal/index" +) + +// seedFromDonorIfNew copies a sibling git-worktree's index to dbPath when no +// index exists there yet, so indexing a fresh worktree reuses the parent's +// embeddings and only re-indexes changed files instead of re-embedding the +// whole tree from scratch. It is best-effort: any failure is logged and +// indexing proceeds from scratch. +// +// This mirrors the seeding the MCP server performs in +// indexerCache.getOrCreate. The background indexer that lumen index runs at +// SessionStart also creates a fresh DB, and it normally wins the race against +// the MCP path. Without seeding here, that background indexer leaves the +// worktree fully re-indexed from scratch and the getOrCreate seed is then +// skipped because the DB already exists. +func seedFromDonorIfNew(dbPath, projectPath, model string, logger *slog.Logger) { + if _, err := os.Stat(dbPath); !os.IsNotExist(err) { + // The DB already exists (or stat failed unexpectedly) — nothing to seed. + return + } + donorPath := config.FindDonorIndex(projectPath, model) + if donorPath == "" { + return + } + seeded, err := index.SeedFromDonor(donorPath, dbPath) + if err != nil { + logger.Warn("seed from donor worktree failed", + "project", projectPath, "donor_path", donorPath, "error", err) + return + } + if seeded { + logger.Info("seeded index from donor worktree", + "project", projectPath, "donor_path", donorPath) + } +} diff --git a/cmd/seed_test.go b/cmd/seed_test.go new file mode 100644 index 0000000..937ccac --- /dev/null +++ b/cmd/seed_test.go @@ -0,0 +1,106 @@ +// Copyright 2026 Aeneas Rekkas +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "io" + "log/slog" + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/ory/lumen/internal/config" + "github.com/ory/lumen/internal/store" +) + +// TestSeedFromDonorIfNew_SeedsWorktreeFromSibling reproduces the bug where +// `lumen index` (the background indexer spawned by the SessionStart hook) does +// not copy an existing sibling-worktree index before indexing a fresh +// worktree, causing a full re-index from scratch instead of an incremental +// merkle update. A worktree whose sibling is already indexed must be seeded. +func TestSeedFromDonorIfNew_SeedsWorktreeFromSibling(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not on PATH") + } + + t.Setenv("XDG_DATA_HOME", t.TempDir()) + const model = "test-model" + + // Main repo with a worktree, mirroring `git worktree add`. + main := t.TempDir() + runGit(t, main, "init") + runGit(t, main, "config", "user.email", "test@test.com") + runGit(t, main, "config", "user.name", "test") + runGit(t, main, "commit", "--allow-empty", "-m", "init") + + wt := filepath.Join(t.TempDir(), "wt") + runGit(t, main, "worktree", "add", wt) + + mainResolved, err := filepath.EvalSymlinks(main) + if err != nil { + t.Fatal(err) + } + wtResolved, err := filepath.EvalSymlinks(wt) + if err != nil { + t.Fatal(err) + } + + // Build a minimal but valid donor index for the main worktree: a real + // SQLite store carrying a non-empty root_hash, which is what SeedFromDonor + // requires to treat the donor as a completed index. + donorDB := config.DBPathForProject(mainResolved, model) + if err := os.MkdirAll(filepath.Dir(donorDB), 0o755); err != nil { + t.Fatal(err) + } + donor, err := store.New(donorDB, 4) + if err != nil { + t.Fatal(err) + } + if err := donor.SetMeta("root_hash", "donor-root-hash"); err != nil { + t.Fatal(err) + } + if err := donor.SetMeta("project_path", mainResolved); err != nil { + t.Fatal(err) + } + if err := donor.Close(); err != nil { + t.Fatal(err) + } + + dstDB := config.DBPathForProject(wtResolved, model) + if _, err := os.Stat(dstDB); !os.IsNotExist(err) { + t.Fatalf("precondition failed: worktree DB should not exist yet (stat err=%v)", err) + } + + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) + seedFromDonorIfNew(dstDB, wtResolved, model, logger) + + if _, err := os.Stat(dstDB); err != nil { + t.Fatalf("worktree index was NOT seeded from sibling donor: %v", err) + } + + seeded, err := store.New(dstDB, 4) + if err != nil { + t.Fatal(err) + } + defer func() { _ = seeded.Close() }() + got, err := seeded.GetMeta("root_hash") + if err != nil { + t.Fatal(err) + } + if got != "donor-root-hash" { + t.Fatalf("expected seeded DB to carry donor root_hash, got %q", got) + } +} diff --git a/cmd/status.go b/cmd/status.go new file mode 100644 index 0000000..8839598 --- /dev/null +++ b/cmd/status.go @@ -0,0 +1,230 @@ +// Copyright 2026 Aeneas Rekkas +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "sync" + + "github.com/ory/lumen/internal/config" + "github.com/ory/lumen/internal/embedder" + "github.com/ory/lumen/internal/index" + "github.com/ory/lumen/internal/merkle" + "github.com/spf13/cobra" +) + +// serverStatus is the reachability outcome for a single configured embedding +// server. +type serverStatus struct { + server config.ServerConfig + reachable bool + message string +} + +// statusResult is the combined outcome of the two status checks: embedding +// service reachability (one entry per configured server) and index +// presence/freshness. +type statusResult struct { + servers []serverStatus + + projectPath string + indexed bool + totalFiles int + indexedFiles int + totalChunks int + model string + lastIndexedAt string + stale bool +} + +// anyServerReachable reports whether at least one configured embedding server +// responded. Lumen uses failover, so the service is usable as long as one +// server is up. +func anyServerReachable(r statusResult) bool { + for _, s := range r.servers { + if s.reachable { + return true + } + } + return false +} + +// statusExitCode returns 0 only when at least one server is reachable, the +// index exists, and the index is fresh; otherwise 1. +func statusExitCode(r statusResult) int { + if anyServerReachable(r) && r.indexed && !r.stale { + return 0 + } + return 1 +} + +// formatStatus renders a human-readable status report: one line per configured +// embedding server, followed by the index block (or a "not indexed" line). +func formatStatus(r statusResult) string { + var b strings.Builder + + if len(r.servers) == 0 { + fmt.Fprintf(&b, "Embedding service: ERROR — no servers configured\n") + } + for _, s := range r.servers { + if s.reachable { + fmt.Fprintf(&b, "Embedding service: OK (%s, %s, %s)\n", s.server.Backend, s.server.Host, s.server.Model) + } else { + fmt.Fprintf(&b, "Embedding service: ERROR (%s, %s) — %s\n", s.server.Backend, s.server.Host, s.message) + } + } + + if !r.indexed { + fmt.Fprintf(&b, "Index: %s — not indexed (run `lumen index .`, or it will seed on first search)", r.projectPath) + return b.String() + } + + stale := "no" + if r.stale { + stale = "yes" + } + lastIndexed := r.lastIndexedAt + if lastIndexed == "" { + lastIndexed = "never" + } + fmt.Fprintf(&b, "Index: %s\n", r.projectPath) + fmt.Fprintf(&b, " Files: %d | Indexed: %d | Chunks: %d | Model: %s\n", r.totalFiles, r.indexedFiles, r.totalChunks, r.model) + fmt.Fprintf(&b, " Last indexed: %s | Stale: %s", lastIndexed, stale) + return b.String() +} + +// errStatusUnhealthy signals a non-zero exit without printing a cobra error +// banner; the human-readable report has already been written to stdout. +var errStatusUnhealthy = errors.New("status: unhealthy") + +func init() { + statusCmd.Flags().StringP("model", "m", "", "embedding model (default: $LUMEN_EMBED_MODEL or "+embedder.DefaultModel+")") + statusCmd.Flags().StringP("backend", "b", "", "embedding backend to select (\"ollama\" or \"lmstudio\"); disambiguates when --model is configured on multiple backends") + // runStatus prints the report itself; suppress cobra's error/usage output so + // a non-zero exit on stale/unreachable does not double-print. + statusCmd.SilenceErrors = true + statusCmd.SilenceUsage = true + rootCmd.AddCommand(statusCmd) +} + +var statusCmd = &cobra.Command{ + Use: "status [path]", + Short: "Report embedding-service health and index freshness", + Long: `Reports whether the configured embedding server(s) are reachable and +whether the project's index exists and is fresh. Every configured server is +reported on its own line. + +With no argument, inspects the current directory. The path is normalized to the +git repository root (or an existing ancestor index) so it reports on the same +index that searches use. + +status performs no indexing and never creates an index database. Exit code is 0 +only when at least one server is reachable and the index exists and is fresh; +otherwise 1.`, + Args: cobra.MaximumNArgs(1), + RunE: runStatus, +} + +func runStatus(cmd *cobra.Command, args []string) error { + target := "." + if len(args) == 1 { + target = args[0] + } + cfg, err := loadConfigWithFlags(cmd) + if err != nil { + return err + } + emb := newEmbedder(cfg) + + indexRoot, _, err := resolveIndexRoot(target, "", emb.ModelName()) + if err != nil { + return err + } + + r := collectStatus(cmd.Context(), cfg, emb, indexRoot) + if _, err := fmt.Fprintln(cmd.OutOrStdout(), formatStatus(r)); err != nil { + return err + } + + if statusExitCode(r) != 0 { + return errStatusUnhealthy + } + return nil +} + +func collectStatus(ctx context.Context, cfg *config.ConfigService, emb *embedder.FailoverEmbedder, projectPath string) statusResult { + r := statusResult{projectPath: projectPath} + + // Probe every configured server concurrently, preserving config order in + // the result. + servers := cfg.Servers() + r.servers = make([]serverStatus, len(servers)) + var wg sync.WaitGroup + for i := range servers { + wg.Add(1) + go func(i int) { + defer wg.Done() + reachable, message := probeEmbeddingService(ctx, servers[i]) + r.servers[i] = serverStatus{server: servers[i], reachable: reachable, message: message} + }(i) + } + wg.Wait() + + modelName := emb.ModelName() + + if unindexable, _ := merkle.IsRootUnindexable(projectPath); unindexable { + r.indexed = false + return r + } + + // Detect a missing index without creating one: stat the DB file first. + dbPath := config.DBPathForProject(projectPath, modelName) + if _, statErr := os.Stat(dbPath); statErr != nil { + r.indexed = false + return r + } + + idx, openErr := index.NewIndexer(dbPath, emb, cfg.MaxChunkTokens()) + if openErr != nil { + r.indexed = false + return r + } + defer func() { _ = idx.Close() }() + + info, infoErr := idx.Status(projectPath) + if infoErr != nil { + r.indexed = false + return r + } + r.indexed = true + r.totalFiles = info.TotalFiles + r.indexedFiles = info.IndexedFiles + r.totalChunks = info.TotalChunks + r.model = info.EmbeddingModel + r.lastIndexedAt = info.LastIndexedAt + + fresh, freshErr := idx.IsFresh(projectPath) + if freshErr != nil { + // Treat an un-checkable index as stale rather than silently fresh. + r.stale = true + } else { + r.stale = !fresh + } + return r +} diff --git a/cmd/status_test.go b/cmd/status_test.go new file mode 100644 index 0000000..2d78751 --- /dev/null +++ b/cmd/status_test.go @@ -0,0 +1,201 @@ +// Copyright 2026 Aeneas Rekkas +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/ory/lumen/internal/config" +) + +func TestStatusExitCode(t *testing.T) { + // reach builds a single-server slice with the given reachability. + reach := func(ok bool) []serverStatus { return []serverStatus{{reachable: ok}} } + tests := []struct { + name string + r statusResult + want int + }{ + {"healthy and fresh", statusResult{servers: reach(true), indexed: true, stale: false}, 0}, + {"service unreachable", statusResult{servers: reach(false), indexed: true, stale: false}, 1}, + {"index missing", statusResult{servers: reach(true), indexed: false}, 1}, + {"index stale", statusResult{servers: reach(true), indexed: true, stale: true}, 1}, + {"all bad", statusResult{servers: reach(false), indexed: false, stale: true}, 1}, + // Failover semantics: at least one reachable server means the service + // is usable, so this is healthy. + {"one of two reachable", statusResult{servers: []serverStatus{{reachable: false}, {reachable: true}}, indexed: true, stale: false}, 0}, + {"all servers unreachable", statusResult{servers: []serverStatus{{reachable: false}, {reachable: false}}, indexed: true, stale: false}, 1}, + {"no servers configured", statusResult{servers: nil, indexed: true, stale: false}, 1}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := statusExitCode(tt.r); got != tt.want { + t.Errorf("statusExitCode(%+v) = %d, want %d", tt.r, got, tt.want) + } + }) + } +} + +func TestFormatStatus(t *testing.T) { + okServer := func() serverStatus { + return serverStatus{ + server: config.ServerConfig{Backend: "ollama", Host: "http://localhost:11434", Model: "jina"}, + reachable: true, + message: "service is healthy", + } + } + + t.Run("healthy and fresh", func(t *testing.T) { + r := statusResult{ + servers: []serverStatus{okServer()}, + projectPath: "/repo", + indexed: true, + totalFiles: 158, + indexedFiles: 158, + totalChunks: 1646, + model: "jina", + lastIndexedAt: "2026-05-30T12:00:00Z", + stale: false, + } + out := formatStatus(r) + if !strings.Contains(out, "Embedding service: OK (ollama, http://localhost:11434, jina)") { + t.Errorf("missing healthy service line:\n%s", out) + } + if !strings.Contains(out, "Files: 158 | Indexed: 158 | Chunks: 1646 | Model: jina") { + t.Errorf("missing index stats line:\n%s", out) + } + if !strings.Contains(out, "Stale: no") { + t.Errorf("expected Stale: no:\n%s", out) + } + }) + + t.Run("service unreachable", func(t *testing.T) { + r := statusResult{ + servers: []serverStatus{{ + server: config.ServerConfig{Backend: "ollama", Host: "http://localhost:11434"}, + reachable: false, + message: "service unreachable: connection refused", + }}, + } + out := formatStatus(r) + if !strings.Contains(out, "Embedding service: ERROR (ollama, http://localhost:11434) — service unreachable: connection refused") { + t.Errorf("missing error service line:\n%s", out) + } + }) + + t.Run("multiple servers each reported", func(t *testing.T) { + r := statusResult{ + servers: []serverStatus{ + okServer(), + { + server: config.ServerConfig{Backend: "lmstudio", Host: "http://localhost:1234"}, + reachable: false, + message: "service unreachable: nope", + }, + }, + projectPath: "/repo", + indexed: false, + } + out := formatStatus(r) + if !strings.Contains(out, "Embedding service: OK (ollama, http://localhost:11434, jina)") { + t.Errorf("missing ollama OK line:\n%s", out) + } + if !strings.Contains(out, "Embedding service: ERROR (lmstudio, http://localhost:1234) — service unreachable: nope") { + t.Errorf("missing lmstudio ERROR line:\n%s", out) + } + }) + + t.Run("no servers configured", func(t *testing.T) { + r := statusResult{servers: nil, projectPath: "/repo", indexed: false} + out := formatStatus(r) + if !strings.Contains(out, "Embedding service: ERROR — no servers configured") { + t.Errorf("missing no-servers line:\n%s", out) + } + }) + + t.Run("not indexed", func(t *testing.T) { + r := statusResult{ + servers: []serverStatus{okServer()}, + projectPath: "/repo", + indexed: false, + } + out := formatStatus(r) + if !strings.Contains(out, "/repo — not indexed") { + t.Errorf("missing not-indexed line:\n%s", out) + } + }) + + t.Run("indexed but never indexed shows never", func(t *testing.T) { + r := statusResult{ + servers: []serverStatus{okServer()}, + projectPath: "/repo", + indexed: true, + lastIndexedAt: "", + stale: true, + } + out := formatStatus(r) + if !strings.Contains(out, "Last indexed: never") { + t.Errorf("expected 'Last indexed: never' when lastIndexedAt empty:\n%s", out) + } + }) + + t.Run("stale shows yes", func(t *testing.T) { + r := statusResult{ + servers: []serverStatus{okServer()}, + projectPath: "/repo", + indexed: true, + lastIndexedAt: "2026-05-30T12:00:00Z", + stale: true, + } + out := formatStatus(r) + if !strings.Contains(out, "Stale: yes") { + t.Errorf("expected Stale: yes:\n%s", out) + } + }) +} + +func TestRunStatusMissingIndexNoSideEffect(t *testing.T) { + // A temp dir with no git repo and no existing index: collectStatus must + // report not-indexed and must NOT create a DB file. + tmp := t.TempDir() + + // Isolate XDG_DATA_HOME so we observe DB creation cleanly. + t.Setenv("XDG_DATA_HOME", t.TempDir()) + + cfg, err := config.NewConfigService(config.DefaultConfigPath()) + if err != nil { + t.Fatalf("config: %v", err) + } + emb := newEmbedder(cfg) + + r := collectStatus(context.Background(), cfg, emb, tmp) + + if r.indexed { + t.Error("expected not indexed for empty temp dir, got indexed=true") + } + + // collectStatus must probe every configured server. + if len(r.servers) != len(cfg.Servers()) { + t.Errorf("expected %d server results, got %d", len(cfg.Servers()), len(r.servers)) + } + + dbPath := config.DBPathForProject(tmp, emb.ModelName()) + if _, statErr := os.Stat(dbPath); statErr == nil { + t.Errorf("collectStatus created a DB file at %s; it must be read-only", dbPath) + } +} diff --git a/cmd/stdio.go b/cmd/stdio.go index 551840d..8730ab0 100644 --- a/cmd/stdio.go +++ b/cmd/stdio.go @@ -139,6 +139,18 @@ const defaultStaleEmbedTimeout = 3 * time.Second const backgroundReindexMaxDuration = 10 * time.Minute +// defaultCreateWaitTimeout bounds how long getOrCreate waits for a peer +// process (the background lumen index spawned at SessionStart) to publish a +// new index DB it is creating + seeding under the exclusive index lock. The +// peer publishes the file early (a donor copy or an empty DB) before the slow +// embedding pass, so this only needs to cover that brief window. On timeout +// getOrCreate falls back to creating the DB itself. +const defaultCreateWaitTimeout = 3 * time.Second + +// createWaitPollInterval is how often getOrCreate re-checks for the peer's DB +// file while waiting up to defaultCreateWaitTimeout. +const createWaitPollInterval = 25 * time.Millisecond + // staleIndexWarning is returned to the caller whenever ensureIndexed cannot // produce a fresh index synchronously (background indexer holds the flock, // in-process goroutine is already running, or reindex timed out). The text @@ -199,6 +211,7 @@ type indexerCache struct { reindexTimeout time.Duration // override for tests; 0 reads from cfg, then defaultReindexTimeout embedTimeout time.Duration // override for tests; 0 means defaultEmbedTimeout staleEmbedTimeout time.Duration // override for tests; 0 means defaultStaleEmbedTimeout + createWaitTimeout time.Duration // override for tests; 0 means defaultCreateWaitTimeout findDonorFunc func(string, string) string // nil uses config.FindDonorIndex seedFunc func(string, string) (bool, error) // nil uses index.SeedFromDonor ensureFreshFunc func(ctx context.Context, idx *index.Indexer, projectDir string, progress index.ProgressFunc) (bool, index.Stats, error) // nil uses idx.EnsureFresh @@ -256,6 +269,15 @@ func (ic *indexerCache) getStaleEmbedTimeout() time.Duration { return defaultStaleEmbedTimeout } +// getCreateWaitTimeout returns how long getOrCreate waits for a peer process +// to publish a new index DB before creating it itself. +func (ic *indexerCache) getCreateWaitTimeout() time.Duration { + if ic.createWaitTimeout != 0 { + return ic.createWaitTimeout + } + return defaultCreateWaitTimeout +} + // logger returns ic.log, falling back to a discarding logger when the field // is nil (e.g. in unit tests that construct indexerCache directly). func (ic *indexerCache) logger() *slog.Logger { @@ -494,27 +516,34 @@ func (ic *indexerCache) getOrCreate(projectPath string, preferredRoot string, mo "model", modelName, "index_version", config.IndexVersion, ) - findDonor := ic.findDonorFunc - if findDonor == nil { - findDonor = config.FindDonorIndex - } - if donorPath := findDonor(effectiveRoot, modelName); donorPath != "" { - ic.logger().Info("seeding index from donor worktree", - "effective_root", effectiveRoot, - "donor_path", donorPath, - ) - seedFn := ic.seedFunc - if seedFn == nil { - seedFn = index.SeedFromDonor - } - if _, seedErr := seedFn(donorPath, dbPath); seedErr != nil { - ic.logger().Warn("seed from donor worktree failed", - "effective_root", effectiveRoot, - "donor_path", donorPath, - "error", seedErr, - ) - seedWarning = fmt.Sprintf("index seeded from scratch (sibling copy failed: %v)", seedErr) - } + + // Serialize creation + seeding with the background indexer + // (lumen index, spawned by the SessionStart hook) via the same + // exclusive index lock it holds. Without this, both processes can run + // SeedFromDonor against the same dbPath concurrently — corrupting the + // SQLite file — or one can create an empty DB that makes the other skip + // seeding and re-index the worktree from scratch. + lockPath := indexlock.LockPathForDB(dbPath) + lk, lockErr := indexlock.TryAcquire(lockPath) + switch { + case lockErr == nil && lk != nil: + // We own creation. Defer release so the lock is freed even if + // seedFromDonor panics; the IIFE keeps it scoped to the seed block + // rather than the whole function. + func() { + defer lk.Release() + // Re-check under the lock — a peer may have created the DB + // between our stat above and acquiring the lock. + if _, st := os.Stat(dbPath); os.IsNotExist(st) { + seedWarning = ic.seedFromDonor(effectiveRoot, modelName, dbPath) + } + }() + default: + // A background indexer holds the lock and is creating + seeding the + // DB. Wait briefly for it to publish the file so NewIndexer opens + // the seeded copy instead of creating an empty DB that would clobber + // the seed. + ic.waitForDB(dbPath) } } @@ -551,6 +580,56 @@ func (ic *indexerCache) getOrCreate(projectPath string, preferredRoot string, mo return idx, effectiveRoot, seedWarning, nil } +// seedFromDonor copies a sibling worktree's index into dbPath, reusing the +// parent's embeddings instead of indexing from scratch. The caller must hold +// the exclusive index lock for dbPath. Returns a non-empty seedWarning when a +// donor was found but the copy failed (indexing will proceed from scratch). +func (ic *indexerCache) seedFromDonor(effectiveRoot, modelName, dbPath string) (seedWarning string) { + findDonor := ic.findDonorFunc + if findDonor == nil { + findDonor = config.FindDonorIndex + } + donorPath := findDonor(effectiveRoot, modelName) + if donorPath == "" { + return "" + } + ic.logger().Info("seeding index from donor worktree", + "effective_root", effectiveRoot, + "donor_path", donorPath, + ) + seedFn := ic.seedFunc + if seedFn == nil { + seedFn = index.SeedFromDonor + } + if _, seedErr := seedFn(donorPath, dbPath); seedErr != nil { + ic.logger().Warn("seed from donor worktree failed", + "effective_root", effectiveRoot, + "donor_path", donorPath, + "error", seedErr, + ) + return fmt.Sprintf("index seeded from scratch (sibling copy failed: %v)", seedErr) + } + return "" +} + +// waitForDB polls for dbPath to appear, up to getCreateWaitTimeout. It is used +// when another process holds the index lock and is creating + seeding the DB: +// waiting for it to publish the file lets the subsequent NewIndexer open the +// seeded copy rather than creating an empty DB that clobbers the seed. +func (ic *indexerCache) waitForDB(dbPath string) { + deadline := time.Now().Add(ic.getCreateWaitTimeout()) + for { + if _, err := os.Stat(dbPath); err == nil { + return + } + if time.Now().After(deadline) { + ic.logger().Debug("timed out waiting for peer indexer to publish DB", "db_path", dbPath) + return + } + time.Sleep(createWaitPollInterval) + } +} + // handleSemanticSearch is the tool handler for the semantic_search tool. // Uses Out=any so the SDK does not set StructuredContent — the LLM sees // only the plaintext in Content. @@ -1047,23 +1126,13 @@ func (ic *indexerCache) handleIndexStatus(_ context.Context, _ *mcp.CallToolRequ }, nil, nil } -// handleHealthCheck pings the configured embedding service and reports status. -// It checks the currently active server (after failover), not always server[0]. -func (ic *indexerCache) handleHealthCheck(ctx context.Context, _ *mcp.CallToolRequest, _ HealthCheckInput) (*mcp.CallToolResult, any, error) { - servers := ic.cfg.Servers() - idx := 0 - if fe, ok := ic.embedder.(*embedder.FailoverEmbedder); ok { - if active := fe.ActiveServerIndex(); active >= 0 && active < len(servers) { - idx = active - } - } - srv := servers[idx] - backend := srv.Backend - host := srv.Host - model := srv.Model - probeURL := host + "/api/tags" - if backend == config.BackendLMStudio { - probeURL = host + "/v1/models" +// probeEmbeddingService pings srv's model-listing endpoint and reports whether +// it is reachable along with a human-readable message. Ollama is probed at +// /api/tags, LM Studio at /v1/models. The probe is bounded to 5 seconds. +func probeEmbeddingService(ctx context.Context, srv config.ServerConfig) (bool, string) { + probeURL := srv.Host + "/api/tags" + if srv.Backend == config.BackendLMStudio { + probeURL = srv.Host + "/v1/models" } probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -1071,23 +1140,36 @@ func (ic *indexerCache) handleHealthCheck(ctx context.Context, _ *mcp.CallToolRe req, err := http.NewRequestWithContext(probeCtx, http.MethodGet, probeURL, nil) if err != nil { - return healthResult(backend, host, model, false, - fmt.Sprintf("failed to create request: %v", err)), nil, nil + return false, fmt.Sprintf("failed to create request: %v", err) } resp, err := http.DefaultClient.Do(req) if err != nil { - return healthResult(backend, host, model, false, - fmt.Sprintf("service unreachable: %v", err)), nil, nil + return false, fmt.Sprintf("service unreachable: %v", err) } _ = resp.Body.Close() if resp.StatusCode >= 500 { - return healthResult(backend, host, model, false, - fmt.Sprintf("service returned HTTP %d", resp.StatusCode)), nil, nil + return false, fmt.Sprintf("service returned HTTP %d", resp.StatusCode) } - return healthResult(backend, host, model, true, "service is healthy"), nil, nil + return true, "service is healthy" +} + +// handleHealthCheck pings the configured embedding service and reports status. +// It checks the currently active server (after failover), not always server[0]. +func (ic *indexerCache) handleHealthCheck(ctx context.Context, _ *mcp.CallToolRequest, _ HealthCheckInput) (*mcp.CallToolResult, any, error) { + servers := ic.cfg.Servers() + idx := 0 + if fe, ok := ic.embedder.(*embedder.FailoverEmbedder); ok { + if active := fe.ActiveServerIndex(); active >= 0 && active < len(servers) { + idx = active + } + } + srv := servers[idx] + + reachable, message := probeEmbeddingService(ctx, srv) + return healthResult(srv.Backend, srv.Host, srv.Model, reachable, message), nil, nil } func healthResult(backend, host, model string, reachable bool, message string) *mcp.CallToolResult { diff --git a/cmd/stdio_probe_test.go b/cmd/stdio_probe_test.go new file mode 100644 index 0000000..39ace4c --- /dev/null +++ b/cmd/stdio_probe_test.go @@ -0,0 +1,95 @@ +// Copyright 2026 Aeneas Rekkas +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/ory/lumen/internal/config" +) + +func TestProbeEmbeddingService(t *testing.T) { + t.Run("reachable when ollama returns 200", func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/tags" { + t.Errorf("expected /api/tags, got %s", r.URL.Path) + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + reachable, msg := probeEmbeddingService(context.Background(), + config.ServerConfig{Backend: config.BackendOllama, Host: srv.URL, Model: "m"}) + if !reachable { + t.Fatalf("expected reachable, got message %q", msg) + } + }) + t.Run("reachable when lmstudio returns 200 on /v1/models", func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/models" { + t.Errorf("expected /v1/models, got %s", r.URL.Path) + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + reachable, _ := probeEmbeddingService(context.Background(), + config.ServerConfig{Backend: config.BackendLMStudio, Host: srv.URL, Model: "m"}) + if !reachable { + t.Fatal("expected reachable for lmstudio 200") + } + }) + t.Run("not reachable on 5xx", func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + reachable, msg := probeEmbeddingService(context.Background(), + config.ServerConfig{Backend: config.BackendOllama, Host: srv.URL, Model: "m"}) + if reachable { + t.Fatal("expected not reachable on 500") + } + if !strings.Contains(msg, "500") { + t.Errorf("expected message to mention 500, got %q", msg) + } + }) + t.Run("not reachable when connection refused", func(t *testing.T) { + // Start a server and close it immediately so srv.URL is a refused + // address — hermetic, no reliance on a well-known closed port. + srv := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})) + srv.Close() + reachable, msg := probeEmbeddingService(context.Background(), + config.ServerConfig{Backend: config.BackendOllama, Host: srv.URL, Model: "m"}) + if reachable { + t.Fatal("expected not reachable for refused connection") + } + if msg == "" { + t.Error("expected non-empty failure message") + } + }) + t.Run("reachable on 4xx (only 5xx and transport errors are unhealthy)", func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer srv.Close() + reachable, _ := probeEmbeddingService(context.Background(), + config.ServerConfig{Backend: config.BackendOllama, Host: srv.URL, Model: "m"}) + if !reachable { + t.Error("expected 404 to be treated as reachable") + } + }) +} diff --git a/cmd/stdio_seedrace_test.go b/cmd/stdio_seedrace_test.go new file mode 100644 index 0000000..f1f532a --- /dev/null +++ b/cmd/stdio_seedrace_test.go @@ -0,0 +1,83 @@ +// Copyright 2026 Aeneas Rekkas +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/ory/lumen/internal/config" + "github.com/ory/lumen/internal/indexlock" +) + +// TestGetOrCreate_SkipsSeedWhenIndexLockHeld reproduces the cross-process race +// between the MCP server (getOrCreate) and the background indexer (`lumen +// index`, spawned by the SessionStart hook). Both target the same fresh +// worktree DB. The background indexer holds the exclusive index flock while it +// creates and seeds the DB; getOrCreate must NOT run SeedFromDonor at the same +// time — two concurrent seeds against the same dbPath corrupt the SQLite file +// (SeedFromDonor copies through a shared temp path and renames over the DB). +// +// getOrCreate must defer to the lock holder instead of racing it. +func TestGetOrCreate_SkipsSeedWhenIndexLockHeld(t *testing.T) { + tmpDir := t.TempDir() + t.Setenv("XDG_DATA_HOME", tmpDir) + + projectDir := filepath.Join(tmpDir, "project") + if err := os.MkdirAll(projectDir, 0o755); err != nil { + t.Fatal(err) + } + + const model = "test-model" + dbPath := config.DBPathForProject(projectDir, model) + if err := os.MkdirAll(filepath.Dir(dbPath), 0o755); err != nil { + t.Fatal(err) + } + + // Simulate the background indexer holding the exclusive index lock while it + // creates + seeds the DB. + lk, err := indexlock.TryAcquire(indexlock.LockPathForDB(dbPath)) + if err != nil || lk == nil { + t.Fatalf("acquire index lock: err=%v lk=%v", err, lk) + } + defer lk.Release() + + var seedCalls int32 + ic := &indexerCache{ + embedder: &stubEmbedder{}, + cfg: newTestConfigService(t, 512), + findDonorFunc: func(_, _ string) string { return "/fake/donor.db" }, + seedFunc: func(_, _ string) (bool, error) { + atomic.AddInt32(&seedCalls, 1) + return true, nil + }, + // Keep the wait-for-peer bounded so the test is fast. + createWaitTimeout: 50 * time.Millisecond, + } + + idx, _, _, err := ic.getOrCreate(projectDir, "", model) + if err != nil { + t.Fatalf("getOrCreate: %v", err) + } + t.Cleanup(func() { _ = idx.Close() }) + + if n := atomic.LoadInt32(&seedCalls); n != 0 { + t.Fatalf("getOrCreate ran SeedFromDonor (%d times) while another process held "+ + "the index lock — this races the background indexer and can corrupt the DB", n) + } +}