From 340578ace1982866280b70b26913cc0150d92ed4 Mon Sep 17 00:00:00 2001 From: Nicolas Bouvrette Date: Sun, 10 May 2026 16:44:44 -0400 Subject: [PATCH 1/6] fix: repopulate Redis automatically after data loss (fixes #141) When Redis restarts and loses data while ld-relay's streaming connection remains open, daemon-mode SDK clients receive empty flag evaluations because no mechanism existed to detect and recover from this state. This adds three complementary resilience layers: 1. In-memory snapshot: the store wrapper now keeps a deep copy of the latest flag/segment data, updated on every Init and Upsert. 2. Circuit breaker: on Redis read errors, Get/GetAll serve from the snapshot and stop hitting Redis until the health check clears it. 3. Periodic health check: a background goroutine checks the Redis $inited sentinel key via a dedicated connection. When the key is missing, it repopulates Redis from the snapshot and clears the circuit breaker. Configuration: [Redis] HealthCheckInterval (default 30s). Co-authored-by: Cursor --- config/config.go | 20 +- docs/persistent-storage.md | 35 +++ internal/relayenv/env_context_impl.go | 61 +++++ internal/store/redis_init_checker.go | 51 ++++ internal/store/relay_feature_store.go | 203 ++++++++++++++- internal/store/relay_feature_store_test.go | 277 +++++++++++++++++++++ internal/store/store_health_check.go | 143 +++++++++++ internal/store/store_health_check_test.go | 180 +++++++++++++ 8 files changed, 961 insertions(+), 9 deletions(-) create mode 100644 internal/store/redis_init_checker.go create mode 100644 internal/store/store_health_check.go create mode 100644 internal/store/store_health_check_test.go diff --git a/config/config.go b/config/config.go index 4570e051..1a86b7ed 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,11 @@ const ( // DefaultDatabaseCacheTTL is the default value for the LocalTTL parameter for databases if not specified. DefaultDatabaseCacheTTL = time.Second * 30 + // DefaultDataStoreHealthCheckInterval is the default interval for checking whether the persistent + // data store still contains its initialization data. If data loss is detected (e.g. after a Redis + // restart), the relay will automatically repopulate the store from its in-memory snapshot. + DefaultDataStoreHealthCheckInterval = time.Second * 30 + // DefaultPrometheusPort is the default value for PrometheusConfig.Port if not specified. DefaultPrometheusPort = 8031 @@ -212,13 +217,14 @@ type EventsConfig struct { // variables, individual fields are not documented here; instead, see the `README.md` section on // configuration. type RedisConfig struct { - Host string `conf:"REDIS_HOST"` - Port ct.OptIntGreaterThanZero - URL ct.OptURLAbsolute `conf:"REDIS_URL"` - LocalTTL ct.OptDuration `conf:"CACHE_TTL"` - TLS bool `conf:"REDIS_TLS"` - Username string `conf:"REDIS_USERNAME"` - Password string `conf:"REDIS_PASSWORD"` + Host string `conf:"REDIS_HOST"` + Port ct.OptIntGreaterThanZero + URL ct.OptURLAbsolute `conf:"REDIS_URL"` + LocalTTL ct.OptDuration `conf:"CACHE_TTL"` + TLS bool `conf:"REDIS_TLS"` + Username string `conf:"REDIS_USERNAME"` + Password string `conf:"REDIS_PASSWORD"` + HealthCheckInterval ct.OptDuration `conf:"REDIS_HEALTH_CHECK_INTERVAL"` } // ConsulConfig configures the optional Consul integration. diff --git a/docs/persistent-storage.md b/docs/persistent-storage.md index 6a0ce1a3..f3b6a999 100644 --- a/docs/persistent-storage.md +++ b/docs/persistent-storage.md @@ -129,6 +129,41 @@ note over Relay Proxy: TTL fresh, serve from memory Relay Proxy-->>SDK2: Streaming response ``` +## Data Store Health Check and Automatic Repopulation + +The Relay Proxy includes a health check mechanism that detects when a persistent store loses its data (for example, when Redis restarts without persistence enabled). Without this, SDKs using daemon mode (such as PHP) would receive empty flag evaluations until the Relay Proxy is manually restarted. + +### How it works + +When using Redis, the Relay Proxy periodically checks for the presence of a sentinel key (`$inited`) that the SDK writes when it first populates the store. If this key is missing but the Relay has valid data in memory, it automatically repopulates the store. + +The health check also includes a **circuit breaker**: if a read from the persistent store fails with a connection error, subsequent reads are served directly from an in-memory snapshot, avoiding connection pool exhaustion and timeout cascades. The circuit breaker is cleared automatically when the health check confirms the store is available again. + +### Configuration + +The health check interval is configurable via the `REDIS_HEALTH_CHECK_INTERVAL` environment variable or the `healthCheckInterval` option in the `[Redis]` configuration section. The default is 30 seconds. + +``` +# Configuration file +[Redis] + host = "localhost" + port = 6379 + localTtl = 30s + healthCheckInterval = 30s +``` + +``` +# Environment variable +REDIS_HEALTH_CHECK_INTERVAL=30s +``` + +### Behavior summary + +- **Store read error (e.g. connection failure):** Circuit breaker activates immediately. Proxy-mode SDKs are served from the in-memory snapshot. The health check probes for recovery at the configured interval. +- **Store data loss (e.g. Redis restart):** Detected within one health check interval. The store is automatically repopulated from the in-memory snapshot. +- **Store recovered:** Circuit breaker is cleared. Normal read path resumes. +- **No snapshot available (e.g. Relay just started):** Health check cannot repopulate. Errors pass through normally. + ## Example: Persistent Store during LaunchDarkly Outage - Cold Relay In this example, LaunchDarkly SaaS is down. Additionally, the Relay in this diagram is starting up **during** the diff --git a/internal/relayenv/env_context_impl.go b/internal/relayenv/env_context_impl.go index 1e56fa0c..8876899f 100644 --- a/internal/relayenv/env_context_impl.go +++ b/internal/relayenv/env_context_impl.go @@ -30,6 +30,8 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/subsystems" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" + + redigo "github.com/gomodule/redigo/redis" ) // LogNameMode is used in NewEnvContext to determine whether the environment's log messages should be @@ -120,6 +122,8 @@ type envContextImpl struct { stopMonitoringCredentials chan struct{} doneMonitoringCredentials chan struct{} connectionMapper ConnectionMapper + storeHealthCheck *store.StoreHealthCheck + storeInitChecker *store.RedisInitChecker offline bool closed bool } @@ -401,6 +405,25 @@ func NewEnvContext( // Connecting may take time, so do this in parallel go envContext.startSDKClient(envConfig.SDKKey, readyCh, allConfig.Main.IgnoreConnectionErrors) + // Start the persistent store health check if using Redis + if allConfig.Redis.URL.IsDefined() { + healthCheckInterval := allConfig.Redis.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + redisURL, prefix := sdks.GetRedisBasicProperties(allConfig.Redis, envConfig) + var dialOptions []redigo.DialOption + if allConfig.Redis.Password != "" { + dialOptions = append(dialOptions, redigo.DialPassword(allConfig.Redis.Password)) + } + if allConfig.Redis.Username != "" { + dialOptions = append(dialOptions, redigo.DialUsername(allConfig.Redis.Username)) + } + initChecker := store.NewRedisInitChecker(redisURL, prefix, dialOptions) + envContext.storeInitChecker = initChecker + thingsToCleanUp.AddFunc(func() { _ = initChecker.Close() }) + // Health check is started later after the store adapter builds the actual store. + // We defer this to startStoreHealthCheck which is called after the SDK client is ready. + envContext.deferredHealthCheckStart(initChecker, healthCheckInterval, envLoggers) + } + cleanupInterval := params.ExpiredCredentialCleanupInterval if cleanupInterval == 0 { // 0 means it wasn't specified; the config system disallows 0 as a valid value. cleanupInterval = defaultCredentialCleanupInterval @@ -412,6 +435,38 @@ func NewEnvContext( return envContext, nil } +func (c *envContextImpl) deferredHealthCheckStart( + initChecker *store.RedisInitChecker, + interval time.Duration, + loggers ldlog.Loggers, +) { + go func() { + // Wait for the store adapter to build the actual store (happens during SDK client init) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + ss := c.storeAdapter.GetSnapshotStore() + if ss == nil { + continue + } + hc := store.NewStoreHealthCheck(ss, initChecker, interval, loggers) + if hc != nil { + c.mu.Lock() + c.storeHealthCheck = hc + c.mu.Unlock() + hc.Start() + loggers.Info("Data store health check started") + } + return + case <-c.stopMonitoringCredentials: + return + } + } + }() +} + func (c *envContextImpl) cleanupExpiredCredentials(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -748,6 +803,12 @@ func (c *envContextImpl) Close() error { if c.sdkBigSegments != nil { c.sdkBigSegments.Close() } + if c.storeHealthCheck != nil { + c.storeHealthCheck.Stop() + } + if c.storeInitChecker != nil { + _ = c.storeInitChecker.Close() + } return nil } diff --git a/internal/store/redis_init_checker.go b/internal/store/redis_init_checker.go new file mode 100644 index 00000000..d96bf5ef --- /dev/null +++ b/internal/store/redis_init_checker.go @@ -0,0 +1,51 @@ +package store + +import ( + "fmt" + + redigo "github.com/gomodule/redigo/redis" +) + +// RedisInitChecker implements StoreInitChecker by directly querying Redis for the +// $inited sentinel key, bypassing the SDK's caching layer. +type RedisInitChecker struct { + pool *redigo.Pool + prefix string +} + +// NewRedisInitChecker creates a checker that connects to Redis using the given URL and +// dial options. The prefix should match the store prefix used by the SDK (e.g. "launchdarkly"). +func NewRedisInitChecker(redisURL string, prefix string, dialOptions []redigo.DialOption) *RedisInitChecker { + pool := &redigo.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: func() (redigo.Conn, error) { + return redigo.DialURL(redisURL, dialOptions...) + }, + } + return &RedisInitChecker{ + pool: pool, + prefix: prefix, + } +} + +func (r *RedisInitChecker) initedKey() string { + return fmt.Sprintf("%s:$inited", r.prefix) +} + +// CheckInitialized checks if the $inited key exists in Redis. +func (r *RedisInitChecker) CheckInitialized() (available bool, initialized bool, err error) { + conn := r.pool.Get() + defer conn.Close() //nolint:errcheck + + exists, err := redigo.Bool(conn.Do("EXISTS", r.initedKey())) + if err != nil { + return false, false, err + } + return true, exists, nil +} + +// Close releases the Redis connection pool. +func (r *RedisInitChecker) Close() error { + return r.pool.Close() +} diff --git a/internal/store/relay_feature_store.go b/internal/store/relay_feature_store.go index 5e3d6a7e..798f30c6 100644 --- a/internal/store/relay_feature_store.go +++ b/internal/store/relay_feature_store.go @@ -46,6 +46,21 @@ func (a *SSERelayDataStoreAdapter) GetStore() subsystems.DataStore { return store } +// GetSnapshotStore returns the current data store as a SnapshotStore (for health check use), +// or nil if the store has not been created. +func (a *SSERelayDataStoreAdapter) GetSnapshotStore() SnapshotStore { + a.mu.RLock() + s := a.store + a.mu.RUnlock() + if s == nil { + return nil + } + if ss, ok := s.(SnapshotStore); ok { + return ss + } + return nil +} + // GetUpdates returns the EnvStreamUpdates that will receive all updates sent to this store. This is // exposed for testing so that we can simulate receiving updates from LaunchDarkly to this component. func (a *SSERelayDataStoreAdapter) GetUpdates() streams.EnvStreamUpdates { @@ -89,10 +104,19 @@ func (a *SSERelayDataStoreAdapter) Build( // A DataStore implementation that delegates to an underlying store // but also publishes stream updates when the store is modified. +// It also maintains an in-memory snapshot of the latest dataset for resilience +// against data store failures (e.g., Redis restart causing data loss). type streamUpdatesStoreWrapper struct { store subsystems.DataStore updates streams.EnvStreamUpdates loggers ldlog.Loggers + + snapshotMu sync.RWMutex + snapshot []ldstoretypes.Collection + snapshotHasData bool + + storeDownMu sync.RWMutex + storeDown bool } func newStreamUpdatesStoreWrapper( @@ -108,6 +132,101 @@ func newStreamUpdatesStoreWrapper( return relayStore } +// HasSnapshot returns true if the wrapper has a valid snapshot with data. +func (sw *streamUpdatesStoreWrapper) HasSnapshot() bool { + sw.snapshotMu.RLock() + defer sw.snapshotMu.RUnlock() + return sw.snapshotHasData +} + +// GetSnapshot returns a deep copy of the current snapshot, or nil if none exists. +func (sw *streamUpdatesStoreWrapper) GetSnapshot() []ldstoretypes.Collection { + sw.snapshotMu.RLock() + defer sw.snapshotMu.RUnlock() + if !sw.snapshotHasData { + return nil + } + return deepCopyCollections(sw.snapshot) +} + +func (sw *streamUpdatesStoreWrapper) saveSnapshot(allData []ldstoretypes.Collection) { + hasData := false + for _, coll := range allData { + if len(coll.Items) > 0 { + hasData = true + break + } + } + + sw.snapshotMu.Lock() + defer sw.snapshotMu.Unlock() + if hasData { + sw.snapshot = deepCopyCollections(allData) + sw.snapshotHasData = true + } else { + sw.snapshot = nil + sw.snapshotHasData = false + } +} + +func (sw *streamUpdatesStoreWrapper) updateSnapshotItem( + kind ldstoretypes.DataKind, + key string, + item ldstoretypes.ItemDescriptor, +) { + sw.snapshotMu.Lock() + defer sw.snapshotMu.Unlock() + if !sw.snapshotHasData { + return + } + + for i, coll := range sw.snapshot { + if coll.Kind.GetName() == kind.GetName() { + found := false + for j, existing := range coll.Items { + if existing.Key == key { + sw.snapshot[i].Items[j] = ldstoretypes.KeyedItemDescriptor{ + Key: key, + Item: item, + } + found = true + break + } + } + if !found { + sw.snapshot[i].Items = append(sw.snapshot[i].Items, ldstoretypes.KeyedItemDescriptor{ + Key: key, + Item: item, + }) + } + return + } + } + + sw.snapshot = append(sw.snapshot, ldstoretypes.Collection{ + Kind: kind, + Items: []ldstoretypes.KeyedItemDescriptor{ + {Key: key, Item: item}, + }, + }) +} + +func deepCopyCollections(src []ldstoretypes.Collection) []ldstoretypes.Collection { + if src == nil { + return nil + } + dst := make([]ldstoretypes.Collection, len(src)) + for i, coll := range src { + items := make([]ldstoretypes.KeyedItemDescriptor, len(coll.Items)) + copy(items, coll.Items) + dst[i] = ldstoretypes.Collection{ + Kind: coll.Kind, + Items: items, + } + } + return dst +} + func (sw *streamUpdatesStoreWrapper) Close() error { return sw.store.Close() } @@ -116,18 +235,96 @@ func (sw *streamUpdatesStoreWrapper) IsStatusMonitoringEnabled() bool { return sw.store.IsStatusMonitoringEnabled() } +// IsStoreDown returns true if the circuit breaker is open (store is considered unavailable). +func (sw *streamUpdatesStoreWrapper) IsStoreDown() bool { + sw.storeDownMu.RLock() + defer sw.storeDownMu.RUnlock() + return sw.storeDown +} + +// SetStoreDown sets or clears the circuit breaker state. +func (sw *streamUpdatesStoreWrapper) SetStoreDown(down bool) { + sw.storeDownMu.Lock() + defer sw.storeDownMu.Unlock() + sw.storeDown = down +} + func (sw *streamUpdatesStoreWrapper) Get(kind ldstoretypes.DataKind, key string) (ldstoretypes.ItemDescriptor, error) { - return sw.store.Get(kind, key) + if sw.IsStoreDown() && sw.HasSnapshot() { + return sw.getFromSnapshot(kind, key), nil + } + + item, err := sw.store.Get(kind, key) + if err != nil { + if sw.HasSnapshot() { + sw.openCircuitBreaker() + return sw.getFromSnapshot(kind, key), nil + } + return item, err + } + return item, nil } func (sw *streamUpdatesStoreWrapper) GetAll(kind ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error) { - return sw.store.GetAll(kind) + if sw.IsStoreDown() && sw.HasSnapshot() { + return sw.getAllFromSnapshot(kind), nil + } + + items, err := sw.store.GetAll(kind) + if err != nil { + if sw.HasSnapshot() { + sw.openCircuitBreaker() + return sw.getAllFromSnapshot(kind), nil + } + return nil, err + } + return items, nil +} + +func (sw *streamUpdatesStoreWrapper) openCircuitBreaker() { + sw.storeDownMu.Lock() + alreadyDown := sw.storeDown + sw.storeDown = true + sw.storeDownMu.Unlock() + if !alreadyDown { + sw.loggers.Warn("Data store read error, activating circuit breaker and serving from in-memory snapshot") + } +} + +func (sw *streamUpdatesStoreWrapper) getFromSnapshot(kind ldstoretypes.DataKind, key string) ldstoretypes.ItemDescriptor { + sw.snapshotMu.RLock() + defer sw.snapshotMu.RUnlock() + for _, coll := range sw.snapshot { + if coll.Kind.GetName() == kind.GetName() { + for _, item := range coll.Items { + if item.Key == key { + return item.Item + } + } + } + } + return ldstoretypes.ItemDescriptor{}.NotFound() +} + +func (sw *streamUpdatesStoreWrapper) getAllFromSnapshot(kind ldstoretypes.DataKind) []ldstoretypes.KeyedItemDescriptor { + sw.snapshotMu.RLock() + defer sw.snapshotMu.RUnlock() + for _, coll := range sw.snapshot { + if coll.Kind.GetName() == kind.GetName() { + items := make([]ldstoretypes.KeyedItemDescriptor, len(coll.Items)) + copy(items, coll.Items) + return items + } + } + return nil } func (sw *streamUpdatesStoreWrapper) Init(allData []ldstoretypes.Collection) error { sw.loggers.Debug("Received all feature flags") err := sw.store.Init(allData) + sw.saveSnapshot(allData) + // See comments in Upsert for why we call SendAllDataUpdate here even if Init returned an error. sw.updates.SendAllDataUpdate(allData) @@ -163,6 +360,8 @@ func (sw *streamUpdatesStoreWrapper) Upsert( // connected clients, because they may be using the stream rather than the database as their source of // truth. + sw.updateSnapshotItem(kind, key, item) + sw.updates.SendSingleItemUpdate(kind, key, item) return updated, err diff --git a/internal/store/relay_feature_store_test.go b/internal/store/relay_feature_store_test.go index 48768f80..2c8d8f57 100644 --- a/internal/store/relay_feature_store_test.go +++ b/internal/store/relay_feature_store_test.go @@ -9,6 +9,7 @@ import ( "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldbuilders" "github.com/launchdarkly/go-server-sdk/v7/subsystems" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -285,3 +286,279 @@ func TestStoreClose(t *testing.T) { wrappedStore.Close() assert.True(t, baseStore.closed) } + +func TestSnapshotInitSavesData(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not exist before Init") + + err := wrappedStore.Init(allData) + require.NoError(t, err) + + assert.True(t, wrappedStore.HasSnapshot(), "snapshot should exist after Init with data") + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + assert.Equal(t, len(allData), len(snapshot)) + + for i, coll := range allData { + assert.Equal(t, coll.Kind, snapshot[i].Kind) + assert.Equal(t, len(coll.Items), len(snapshot[i].Items)) + for j, item := range coll.Items { + assert.Equal(t, item.Key, snapshot[i].Items[j].Key) + assert.Equal(t, item.Item.Version, snapshot[i].Items[j].Item.Version) + } + } +} + +func TestSnapshotIsDeepCopy(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + snapshot1 := wrappedStore.GetSnapshot() + snapshot2 := wrappedStore.GetSnapshot() + + // Modifying one snapshot should not affect the other + snapshot1[0].Items = nil + assert.NotNil(t, snapshot2[0].Items, "snapshots should be independent deep copies") +} + +func TestSnapshotEmptyInitDoesNotSetHasData(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init([]ldstoretypes.Collection{}) + require.NoError(t, err) + + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not be set for empty Init") +} + +func TestSnapshotNilInitDoesNotSetHasData(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(nil) + require.NoError(t, err) + + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not be set for nil Init") +} + +func TestSnapshotInitWithEmptyCollectionsDoesNotSetHasData(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + emptyCollections := []ldstoretypes.Collection{ + {Kind: ldstoreimpl.Features(), Items: []ldstoretypes.KeyedItemDescriptor{}}, + {Kind: ldstoreimpl.Segments(), Items: []ldstoretypes.KeyedItemDescriptor{}}, + } + err := wrappedStore.Init(emptyCollections) + require.NoError(t, err) + + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not be set when all collections are empty") +} + +func TestSnapshotUpsertUpdatesExistingItem(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + testFlag1v2 := ldbuilders.NewFlagBuilder(testFlag1.Key).Version(testFlag1.Version + 1).On(false).Build() + _, _ = sharedtest.UpsertFlag(wrappedStore, testFlag1v2) + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + + var found bool + for _, coll := range snapshot { + if coll.Kind == ldstoreimpl.Features() { + for _, item := range coll.Items { + if item.Key == testFlag1.Key { + assert.Equal(t, testFlag1v2.Version, item.Item.Version) + found = true + } + } + } + } + assert.True(t, found, "updated flag should be in snapshot") +} + +func TestSnapshotUpsertAddsNewItem(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + _, _ = sharedtest.UpsertFlag(wrappedStore, testFlag2) + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + + var foundFlag1, foundFlag2 bool + for _, coll := range snapshot { + if coll.Kind == ldstoreimpl.Features() { + for _, item := range coll.Items { + if item.Key == testFlag1.Key { + foundFlag1 = true + } + if item.Key == testFlag2.Key { + foundFlag2 = true + } + } + } + } + assert.True(t, foundFlag1, "original flag should still be in snapshot") + assert.True(t, foundFlag2, "new flag should be in snapshot") +} + +func TestSnapshotUpsertBeforeInitIsIgnored(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + + _, _ = sharedtest.UpsertFlag(wrappedStore, testFlag1) + assert.False(t, wrappedStore.HasSnapshot(), "snapshot should not be created by Upsert alone") +} + +func TestSnapshotUpsertSegment(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + testSegment1v2 := ldbuilders.NewSegmentBuilder(testSegment1.Key).Version(testSegment1.Version + 1).Build() + _, _ = sharedtest.UpsertSegment(wrappedStore, testSegment1v2) + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + + var found bool + for _, coll := range snapshot { + if coll.Kind == ldstoreimpl.Segments() { + for _, item := range coll.Items { + if item.Key == testSegment1.Key { + assert.Equal(t, testSegment1v2.Version, item.Item.Version) + found = true + } + } + } + } + assert.True(t, found, "updated segment should be in snapshot") +} + +// Circuit breaker tests + +func TestCircuitBreakerOpensOnGetError(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Simulate store failure + baseStore.fakeError = fakeError + + item, err := wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.NoError(t, err, "should not return error when snapshot is available") + assert.Equal(t, testFlag1.Version, item.Version) + assert.True(t, wrappedStore.IsStoreDown(), "circuit breaker should be open") +} + +func TestCircuitBreakerOpensOnGetAllError(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + baseStore.fakeError = fakeError + + items, err := wrappedStore.GetAll(ldstoreimpl.Features()) + assert.NoError(t, err, "should not return error when snapshot is available") + assert.Equal(t, 1, len(items), "should return snapshot data") + assert.True(t, wrappedStore.IsStoreDown(), "circuit breaker should be open") +} + +func TestCircuitBreakerSkipsStoreOnSubsequentReads(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Trigger circuit breaker + baseStore.fakeError = fakeError + _, _ = wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.True(t, wrappedStore.IsStoreDown()) + + // Subsequent reads should serve from snapshot without touching the store + item, err := wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.NoError(t, err) + assert.Equal(t, testFlag1.Version, item.Version) + + items, err := wrappedStore.GetAll(ldstoreimpl.Features()) + assert.NoError(t, err) + assert.Equal(t, 1, len(items)) +} + +func TestCircuitBreakerReturnsErrorWithoutSnapshot(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + // No Init called, so no snapshot exists + + baseStore.fakeError = fakeError + + // Without snapshot, errors pass through even with circuit open + _, err := wrappedStore.Get(ldstoreimpl.Features(), "any-key") + assert.Equal(t, fakeError, err, "should return error when no snapshot is available") + + _, err = wrappedStore.GetAll(ldstoreimpl.Features()) + assert.Equal(t, fakeError, err, "should return error when no snapshot is available") + + // Circuit breaker should still be set even though we can't serve data + assert.False(t, wrappedStore.IsStoreDown(), "circuit should not open without snapshot") +} + +func TestCircuitBreakerFallsThroughToStoreWithoutSnapshot(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + // No Init, so no snapshot. Set circuit breaker manually. + wrappedStore.SetStoreDown(true) + baseStore.fakeError = fakeError + + // With storeDown=true but no snapshot, it should still try the store + _, err := wrappedStore.Get(ldstoreimpl.Features(), "any-key") + assert.Equal(t, fakeError, err, "should fall through to store when no snapshot") + + _, err = wrappedStore.GetAll(ldstoreimpl.Features()) + assert.Equal(t, fakeError, err, "should fall through to store when no snapshot") +} + +func TestCircuitBreakerClearResumesNormalReads(t *testing.T) { + baseStore, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Open circuit + baseStore.fakeError = fakeError + _, _ = wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.True(t, wrappedStore.IsStoreDown()) + + // Clear circuit and fix the store + baseStore.fakeError = nil + wrappedStore.SetStoreDown(false) + assert.False(t, wrappedStore.IsStoreDown()) + + // Reads should go to the real store again + item, err := wrappedStore.Get(ldstoreimpl.Features(), testFlag1.Key) + assert.NoError(t, err) + assert.Equal(t, testFlag1.Version, item.Version) +} + +func TestCircuitBreakerGetReturnsNotFoundFromSnapshot(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Circuit is open, ask for a key that doesn't exist in snapshot + wrappedStore.SetStoreDown(true) + + item, err := wrappedStore.Get(ldstoreimpl.Features(), "nonexistent-key") + assert.NoError(t, err) + assert.Equal(t, ldstoretypes.ItemDescriptor{}.NotFound(), item) +} + +func TestCircuitBreakerGetAllReturnsEmptyForUnknownKind(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + wrappedStore.SetStoreDown(true) + + // Segments exist in snapshot, so this should work + items, err := wrappedStore.GetAll(ldstoreimpl.Segments()) + assert.NoError(t, err) + assert.Equal(t, 1, len(items)) +} diff --git a/internal/store/store_health_check.go b/internal/store/store_health_check.go new file mode 100644 index 00000000..f27ed863 --- /dev/null +++ b/internal/store/store_health_check.go @@ -0,0 +1,143 @@ +package store + +import ( + "sync" + "time" + + "github.com/launchdarkly/go-sdk-common/v3/ldlog" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" +) + +// StoreInitChecker abstracts the ability to check whether a persistent store still +// contains its initialization marker (e.g. the Redis $inited key). Implementations +// should query the store directly, bypassing any SDK-level caching. +type StoreInitChecker interface { + // CheckInitialized returns: + // available=true, initialized=true → store is up and has data + // available=true, initialized=false → store is up but data is missing (needs repopulation) + // available=false with err → store is unreachable (retry later) + CheckInitialized() (available bool, initialized bool, err error) +} + +// SnapshotStore is the interface that StoreHealthCheck uses to interact with the store wrapper. +// It provides access to snapshot data and circuit breaker state for resilience operations. +type SnapshotStore interface { + HasSnapshot() bool + GetSnapshot() []ldstoretypes.Collection + IsStoreDown() bool + SetStoreDown(bool) + Init([]ldstoretypes.Collection) error + IsInitialized() bool +} + +// StoreHealthCheck periodically verifies that the persistent data store still contains +// its initialization data. If data loss is detected (e.g. after a Redis restart), it +// repopulates the store from the in-memory snapshot and manages the circuit breaker state. +type StoreHealthCheck struct { + store SnapshotStore + checker StoreInitChecker + interval time.Duration + loggers ldlog.Loggers + stopCh chan struct{} + stopOnce sync.Once +} + +// NewStoreHealthCheck creates a new health check instance. Returns nil if store or checker is nil. +func NewStoreHealthCheck( + store SnapshotStore, + checker StoreInitChecker, + interval time.Duration, + loggers ldlog.Loggers, +) *StoreHealthCheck { + if store == nil || checker == nil { + return nil + } + return &StoreHealthCheck{ + store: store, + checker: checker, + interval: interval, + loggers: loggers, + stopCh: make(chan struct{}), + } +} + +// Start begins the periodic health check in a background goroutine. +func (hc *StoreHealthCheck) Start() { + go hc.run() +} + +// Stop terminates the health check goroutine. Safe to call multiple times. +func (hc *StoreHealthCheck) Stop() { + hc.stopOnce.Do(func() { + close(hc.stopCh) + }) +} + +func (hc *StoreHealthCheck) run() { + ticker := time.NewTicker(hc.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + hc.check() + case <-hc.stopCh: + return + } + } +} + +func (hc *StoreHealthCheck) check() { + available, initialized, err := hc.checker.CheckInitialized() + + if err != nil { + hc.loggers.Debugf("Data store health check: connection error: %s", err) + return + } + + if !available { + hc.loggers.Debug("Data store health check: store not available") + return + } + + // Store is available + if initialized { + if hc.store.IsStoreDown() { + hc.loggers.Info("Data store has recovered, resuming normal reads") + hc.store.SetStoreDown(false) + } + return + } + + // Store is available but not initialized -- data was lost + hc.loggers.Warn("Data store lost initialization data, possible store restart detected") + + if !hc.store.HasSnapshot() { + hc.loggers.Warn("Cannot repopulate data store: no snapshot data available to restore") + return + } + + hc.repopulate() +} + +func (hc *StoreHealthCheck) repopulate() { + snapshot := hc.store.GetSnapshot() + if snapshot == nil { + return + } + + hc.loggers.Warn("Repopulating data store from in-memory snapshot") + + err := hc.store.Init(snapshot) + if err != nil { + hc.loggers.Errorf("Failed to repopulate data store from snapshot: %s", err) + return + } + + hc.loggers.Info("Successfully repopulated data store from snapshot") + + if hc.store.IsStoreDown() { + hc.store.SetStoreDown(false) + hc.loggers.Info("Circuit breaker cleared after repopulation") + } +} diff --git a/internal/store/store_health_check_test.go b/internal/store/store_health_check_test.go new file mode 100644 index 00000000..32648599 --- /dev/null +++ b/internal/store/store_health_check_test.go @@ -0,0 +1,180 @@ +package store + +import ( + "sync" + "testing" + "time" + + "github.com/launchdarkly/go-sdk-common/v3/ldlog" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockInitChecker struct { + mu sync.Mutex + available bool + inited bool + err error +} + +func (m *mockInitChecker) CheckInitialized() (available bool, initialized bool, err error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.available, m.inited, m.err +} + +func (m *mockInitChecker) set(available, inited bool, err error) { + m.mu.Lock() + defer m.mu.Unlock() + m.available = available + m.inited = inited + m.err = err +} + +func makeHealthCheckTestComponents() (*mockStore, *streamUpdatesStoreWrapper, *mockInitChecker, *mockEnvStreamsUpdates) { + baseStore, wrappedStore, updates := makeTestComponents() + checker := &mockInitChecker{available: true, inited: true} + return baseStore, wrappedStore, checker, updates +} + +func TestHealthCheckDetectsDataLossAndRepopulates(t *testing.T) { + baseStore, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Clear the base store to simulate Redis data loss + _ = baseStore.Init(nil) + + // Redis is up but $inited is gone (data loss after restart) + checker.set(true, false, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + // Health check should trigger repopulation - verify data comes back + assert.Eventually(t, func() bool { + flags, e := baseStore.GetAll(ldstoreimpl.Features()) + return e == nil && len(flags) > 0 + }, time.Second, 5*time.Millisecond, "health check should repopulate the store") +} + +func TestHealthCheckClearsCircuitBreaker(t *testing.T) { + _, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + wrappedStore.SetStoreDown(true) + checker.set(true, true, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + assert.Eventually(t, func() bool { + return !wrappedStore.IsStoreDown() + }, time.Second, 5*time.Millisecond, "health check should clear circuit breaker") +} + +func TestHealthCheckDoesNothingOnConnectionError(t *testing.T) { + _, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + wrappedStore.SetStoreDown(true) + checker.set(false, false, fakeError) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + time.Sleep(50 * time.Millisecond) + assert.True(t, wrappedStore.IsStoreDown(), "circuit breaker should remain open on connection error") +} + +func TestHealthCheckDoesNotRepopulateWithoutSnapshot(t *testing.T) { + _, wrappedStore, checker, updates := makeHealthCheckTestComponents() + // No Init called, no snapshot + checker.set(true, false, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + time.Sleep(50 * time.Millisecond) + updates.expectNoAllDataUpdate(t) +} + +func TestHealthCheckStops(t *testing.T) { + _, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + checker.set(true, true, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + hc.Stop() + // Verify Stop doesn't panic on double-call + hc.Stop() +} + +func TestHealthCheckNilParams(t *testing.T) { + hc := NewStoreHealthCheck(nil, nil, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + assert.Nil(t, hc, "health check should not be created without store and checker") +} + +func TestHealthCheckRepopulationIncludesUpserts(t *testing.T) { + baseStore, wrappedStore, checker, _ := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + // Upsert a new flag after Init + testFlag2Desc := ldstoretypes.ItemDescriptor{Version: testFlag2.Version, Item: &testFlag2} + _, _ = wrappedStore.Upsert(ldstoreimpl.Features(), testFlag2.Key, testFlag2Desc) + + // Clear store to simulate Redis restart + _ = baseStore.Init(nil) + + // Simulate Redis data loss + checker.set(true, false, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + // Wait for repopulation + assert.Eventually(t, func() bool { + flags, e := baseStore.GetAll(ldstoreimpl.Features()) + return e == nil && len(flags) == 2 + }, time.Second, 5*time.Millisecond, "repopulation should include upserted flags") +} + +func TestHealthCheckNormalOperationDoesNothing(t *testing.T) { + _, wrappedStore, checker, updates := makeHealthCheckTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + // Reset the updates tracker since Init sends an update + updates.allData = nil + + // Everything is fine + checker.set(true, true, nil) + + hc := NewStoreHealthCheck(wrappedStore, checker, 10*time.Millisecond, ldlog.NewDisabledLoggers()) + require.NotNil(t, hc) + hc.Start() + defer hc.Stop() + + time.Sleep(50 * time.Millisecond) + // No repopulation should have been triggered + updates.expectNoAllDataUpdate(t) + assert.False(t, wrappedStore.IsStoreDown()) +} From 256118cc392730980ad915071c114891baef1bcc Mon Sep 17 00:00:00 2001 From: Nicolas Bouvrette Date: Sun, 10 May 2026 16:52:06 -0400 Subject: [PATCH 2/6] fix: address review findings from Bugbot 1. Snapshot version regression: updateSnapshotItem now compares versions and skips updates with older data, preventing stale flags from being written to the snapshot on out-of-order upserts. 2. Race condition in Close(): storeHealthCheck is now read under the mutex, and deferredHealthCheckStart checks c.closed before starting the health check to prevent goroutine leaks. Co-authored-by: Cursor --- internal/relayenv/env_context_impl.go | 23 +++++++++++------- internal/store/relay_feature_store.go | 22 ++++++++--------- internal/store/relay_feature_store_test.go | 28 ++++++++++++++++++++++ 3 files changed, 53 insertions(+), 20 deletions(-) diff --git a/internal/relayenv/env_context_impl.go b/internal/relayenv/env_context_impl.go index 8876899f..57d0cd23 100644 --- a/internal/relayenv/env_context_impl.go +++ b/internal/relayenv/env_context_impl.go @@ -441,7 +441,6 @@ func (c *envContextImpl) deferredHealthCheckStart( loggers ldlog.Loggers, ) { go func() { - // Wait for the store adapter to build the actual store (happens during SDK client init) ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { @@ -452,13 +451,18 @@ func (c *envContextImpl) deferredHealthCheckStart( continue } hc := store.NewStoreHealthCheck(ss, initChecker, interval, loggers) - if hc != nil { - c.mu.Lock() - c.storeHealthCheck = hc + if hc == nil { + return + } + c.mu.Lock() + if c.closed { c.mu.Unlock() - hc.Start() - loggers.Info("Data store health check started") + return } + c.storeHealthCheck = hc + c.mu.Unlock() + hc.Start() + loggers.Info("Data store health check started") return case <-c.stopMonitoringCredentials: return @@ -803,8 +807,11 @@ func (c *envContextImpl) Close() error { if c.sdkBigSegments != nil { c.sdkBigSegments.Close() } - if c.storeHealthCheck != nil { - c.storeHealthCheck.Stop() + c.mu.RLock() + hc := c.storeHealthCheck + c.mu.RUnlock() + if hc != nil { + hc.Stop() } if c.storeInitChecker != nil { _ = c.storeInitChecker.Close() diff --git a/internal/store/relay_feature_store.go b/internal/store/relay_feature_store.go index 798f30c6..4ccbd068 100644 --- a/internal/store/relay_feature_store.go +++ b/internal/store/relay_feature_store.go @@ -182,23 +182,21 @@ func (sw *streamUpdatesStoreWrapper) updateSnapshotItem( for i, coll := range sw.snapshot { if coll.Kind.GetName() == kind.GetName() { - found := false for j, existing := range coll.Items { if existing.Key == key { - sw.snapshot[i].Items[j] = ldstoretypes.KeyedItemDescriptor{ - Key: key, - Item: item, + if item.Version >= existing.Item.Version { + sw.snapshot[i].Items[j] = ldstoretypes.KeyedItemDescriptor{ + Key: key, + Item: item, + } } - found = true - break + return } } - if !found { - sw.snapshot[i].Items = append(sw.snapshot[i].Items, ldstoretypes.KeyedItemDescriptor{ - Key: key, - Item: item, - }) - } + sw.snapshot[i].Items = append(sw.snapshot[i].Items, ldstoretypes.KeyedItemDescriptor{ + Key: key, + Item: item, + }) return } } diff --git a/internal/store/relay_feature_store_test.go b/internal/store/relay_feature_store_test.go index 2c8d8f57..e9da6726 100644 --- a/internal/store/relay_feature_store_test.go +++ b/internal/store/relay_feature_store_test.go @@ -436,6 +436,34 @@ func TestSnapshotUpsertSegment(t *testing.T) { assert.True(t, found, "updated segment should be in snapshot") } +func TestSnapshotUpsertIgnoresOlderVersion(t *testing.T) { + _, wrappedStore, _ := makeTestComponents() + err := wrappedStore.Init(allData) + require.NoError(t, err) + + newerFlag := ldbuilders.NewFlagBuilder(testFlag1.Key).Version(testFlag1.Version + 10).On(false).Build() + _, _ = sharedtest.UpsertFlag(wrappedStore, newerFlag) + + olderFlag := ldbuilders.NewFlagBuilder(testFlag1.Key).Version(testFlag1.Version + 1).On(true).Build() + _, _ = sharedtest.UpsertFlag(wrappedStore, olderFlag) + + snapshot := wrappedStore.GetSnapshot() + require.NotNil(t, snapshot) + + for _, coll := range snapshot { + if coll.Kind == ldstoreimpl.Features() { + for _, item := range coll.Items { + if item.Key == testFlag1.Key { + assert.Equal(t, newerFlag.Version, item.Item.Version, + "snapshot should retain the newer version, not regress to older") + return + } + } + } + } + t.Fatal("flag not found in snapshot") +} + // Circuit breaker tests func TestCircuitBreakerOpensOnGetError(t *testing.T) { From 834ab87f8592d69ab959515d392c1b5abb6f052d Mon Sep 17 00:00:00 2001 From: Nicolas Bouvrette Date: Sun, 10 May 2026 17:04:55 -0400 Subject: [PATCH 3/6] fix: repopulation bypasses wrapper to avoid snapshot regression Repopulation now uses RepopulateStore() which writes directly to the underlying store, bypassing saveSnapshot() and SendAllDataUpdate(). This prevents a race where a streaming Upsert between GetSnapshot() and Init() could be overwritten in both the snapshot and Redis, and avoids broadcasting stale put events to connected SSE clients. Co-authored-by: Cursor --- internal/store/relay_feature_store.go | 7 +++++++ internal/store/store_health_check.go | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/internal/store/relay_feature_store.go b/internal/store/relay_feature_store.go index 4ccbd068..a10fd6ea 100644 --- a/internal/store/relay_feature_store.go +++ b/internal/store/relay_feature_store.go @@ -317,6 +317,13 @@ func (sw *streamUpdatesStoreWrapper) getAllFromSnapshot(kind ldstoretypes.DataKi return nil } +// RepopulateStore writes data directly to the underlying store, bypassing snapshot +// updates and SSE broadcasting. Used by the health check to restore Redis after data loss +// without risking snapshot regression from concurrent streaming Upserts. +func (sw *streamUpdatesStoreWrapper) RepopulateStore(allData []ldstoretypes.Collection) error { + return sw.store.Init(allData) +} + func (sw *streamUpdatesStoreWrapper) Init(allData []ldstoretypes.Collection) error { sw.loggers.Debug("Received all feature flags") err := sw.store.Init(allData) diff --git a/internal/store/store_health_check.go b/internal/store/store_health_check.go index f27ed863..f82f75e1 100644 --- a/internal/store/store_health_check.go +++ b/internal/store/store_health_check.go @@ -26,7 +26,7 @@ type SnapshotStore interface { GetSnapshot() []ldstoretypes.Collection IsStoreDown() bool SetStoreDown(bool) - Init([]ldstoretypes.Collection) error + RepopulateStore([]ldstoretypes.Collection) error IsInitialized() bool } @@ -128,7 +128,7 @@ func (hc *StoreHealthCheck) repopulate() { hc.loggers.Warn("Repopulating data store from in-memory snapshot") - err := hc.store.Init(snapshot) + err := hc.store.RepopulateStore(snapshot) if err != nil { hc.loggers.Errorf("Failed to repopulate data store from snapshot: %s", err) return From 9384d2c3dc7653813e388ab3a038c2162c1e4833 Mon Sep 17 00:00:00 2001 From: Nicolas Bouvrette Date: Sun, 10 May 2026 17:15:15 -0400 Subject: [PATCH 4/6] feat: extend health check to Consul and DynamoDB backends The health check and circuit breaker now work with all three persistent store backends, not just Redis. Each backend has its own init checker that queries the $inited sentinel key directly: - Redis: EXISTS {prefix}:$inited (via dedicated Redigo pool) - Consul: KV.Get {prefix}/$inited (via dedicated Consul client) - DynamoDB: GetItem {prefix}:$inited (via dedicated DynamoDB client) Health check interval is now configurable per backend via CONSUL_HEALTH_CHECK_INTERVAL and DYNAMODB_HEALTH_CHECK_INTERVAL environment variables (or healthCheckInterval in the config file). Setting the interval to 0 disables the health check entirely. Co-authored-by: Cursor --- config/config.go | 18 +++--- docs/persistent-storage.md | 25 ++++++-- internal/relayenv/env_context_impl.go | 73 ++++++++++++++++++----- internal/store/consul_init_checker.go | 50 ++++++++++++++++ internal/store/dynamodb_init_checker.go | 78 +++++++++++++++++++++++++ internal/store/store_health_check.go | 8 ++- 6 files changed, 221 insertions(+), 31 deletions(-) create mode 100644 internal/store/consul_init_checker.go create mode 100644 internal/store/dynamodb_init_checker.go diff --git a/config/config.go b/config/config.go index 1a86b7ed..d16bbefb 100644 --- a/config/config.go +++ b/config/config.go @@ -237,10 +237,11 @@ type RedisConfig struct { // variables, individual fields are not documented here; instead, see the `README.md` section on // configuration. type ConsulConfig struct { - Host string `conf:"CONSUL_HOST"` - LocalTTL ct.OptDuration `conf:"CACHE_TTL"` - Token string `conf:"CONSUL_TOKEN"` - TokenFile string `conf:"CONSUL_TOKEN_FILE"` + Host string `conf:"CONSUL_HOST"` + LocalTTL ct.OptDuration `conf:"CACHE_TTL"` + Token string `conf:"CONSUL_TOKEN"` + TokenFile string `conf:"CONSUL_TOKEN_FILE"` + HealthCheckInterval ct.OptDuration `conf:"CONSUL_HEALTH_CHECK_INTERVAL"` } // DynamoDBConfig configures the optional DynamoDB integration, which is used only if Enabled is true. @@ -251,10 +252,11 @@ type ConsulConfig struct { // variables, individual fields are not documented here; instead, see the `README.md` section on // configuration. type DynamoDBConfig struct { - Enabled bool `conf:"USE_DYNAMODB"` - TableName string `conf:"DYNAMODB_TABLE"` - URL ct.OptURLAbsolute `conf:"DYNAMODB_URL"` - LocalTTL ct.OptDuration `conf:"CACHE_TTL"` + Enabled bool `conf:"USE_DYNAMODB"` + TableName string `conf:"DYNAMODB_TABLE"` + URL ct.OptURLAbsolute `conf:"DYNAMODB_URL"` + LocalTTL ct.OptDuration `conf:"CACHE_TTL"` + HealthCheckInterval ct.OptDuration `conf:"DYNAMODB_HEALTH_CHECK_INTERVAL"` } // EnvConfig describes an environment to be relayed. There may be any number of these. diff --git a/docs/persistent-storage.md b/docs/persistent-storage.md index f3b6a999..7d7cde2f 100644 --- a/docs/persistent-storage.md +++ b/docs/persistent-storage.md @@ -131,36 +131,49 @@ Relay Proxy-->>SDK2: Streaming response ## Data Store Health Check and Automatic Repopulation -The Relay Proxy includes a health check mechanism that detects when a persistent store loses its data (for example, when Redis restarts without persistence enabled). Without this, SDKs using daemon mode (such as PHP) would receive empty flag evaluations until the Relay Proxy is manually restarted. +The Relay Proxy includes a health check mechanism that detects when a persistent store loses its data (for example, when Redis restarts without persistence enabled, or when Consul/DynamoDB data is deleted). Without this, SDKs using daemon mode (such as PHP) would receive empty flag evaluations until the Relay Proxy is manually restarted. ### How it works -When using Redis, the Relay Proxy periodically checks for the presence of a sentinel key (`$inited`) that the SDK writes when it first populates the store. If this key is missing but the Relay has valid data in memory, it automatically repopulates the store. +When using a persistent data store (Redis, Consul, or DynamoDB), the Relay Proxy periodically checks for the presence of a sentinel key (`$inited`) that the SDK writes when it first populates the store. If this key is missing but the Relay has valid data in memory, it automatically repopulates the store. The health check also includes a **circuit breaker**: if a read from the persistent store fails with a connection error, subsequent reads are served directly from an in-memory snapshot, avoiding connection pool exhaustion and timeout cascades. The circuit breaker is cleared automatically when the health check confirms the store is available again. ### Configuration -The health check interval is configurable via the `REDIS_HEALTH_CHECK_INTERVAL` environment variable or the `healthCheckInterval` option in the `[Redis]` configuration section. The default is 30 seconds. +The health check is enabled by default for all persistent store backends with a 30-second interval. The interval is configurable per backend. Setting the interval to `0` disables the health check. ``` -# Configuration file +# Configuration file examples [Redis] host = "localhost" port = 6379 localTtl = 30s healthCheckInterval = 30s + +[Consul] + host = "localhost" + healthCheckInterval = 30s + +[DynamoDB] + tableName = "my-feature-flags" + healthCheckInterval = 30s ``` ``` -# Environment variable +# Environment variable examples REDIS_HEALTH_CHECK_INTERVAL=30s +CONSUL_HEALTH_CHECK_INTERVAL=30s +DYNAMODB_HEALTH_CHECK_INTERVAL=30s + +# To disable the health check: +REDIS_HEALTH_CHECK_INTERVAL=0 ``` ### Behavior summary - **Store read error (e.g. connection failure):** Circuit breaker activates immediately. Proxy-mode SDKs are served from the in-memory snapshot. The health check probes for recovery at the configured interval. -- **Store data loss (e.g. Redis restart):** Detected within one health check interval. The store is automatically repopulated from the in-memory snapshot. +- **Store data loss (e.g. Redis restart, Consul KV deletion):** Detected within one health check interval. The store is automatically repopulated from the in-memory snapshot. - **Store recovered:** Circuit breaker is cleared. Normal read path resumes. - **No snapshot available (e.g. Relay just started):** Health check cannot repopulate. Errors pass through normally. diff --git a/internal/relayenv/env_context_impl.go b/internal/relayenv/env_context_impl.go index 57d0cd23..5a0da45b 100644 --- a/internal/relayenv/env_context_impl.go +++ b/internal/relayenv/env_context_impl.go @@ -31,6 +31,7 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" + ldconsul "github.com/launchdarkly/go-server-sdk-consul/v3" redigo "github.com/gomodule/redigo/redis" ) @@ -123,7 +124,7 @@ type envContextImpl struct { doneMonitoringCredentials chan struct{} connectionMapper ConnectionMapper storeHealthCheck *store.StoreHealthCheck - storeInitChecker *store.RedisInitChecker + storeInitChecker store.StoreInitCheckerCloser offline bool closed bool } @@ -405,9 +406,33 @@ func NewEnvContext( // Connecting may take time, so do this in parallel go envContext.startSDKClient(envConfig.SDKKey, readyCh, allConfig.Main.IgnoreConnectionErrors) - // Start the persistent store health check if using Redis + // Start the persistent store health check for any configured persistent store. + // A health check interval of 0 disables the check. + if initChecker, interval, err := createInitChecker(allConfig, envConfig); err != nil { + envLoggers.Errorf("Failed to create data store health checker: %s", err) + } else if initChecker != nil && interval > 0 { + envContext.storeInitChecker = initChecker + thingsToCleanUp.AddFunc(func() { _ = initChecker.Close() }) + envContext.deferredHealthCheckStart(initChecker, interval, envLoggers) + } + + cleanupInterval := params.ExpiredCredentialCleanupInterval + if cleanupInterval == 0 { // 0 means it wasn't specified; the config system disallows 0 as a valid value. + cleanupInterval = defaultCredentialCleanupInterval + } + go envContext.cleanupExpiredCredentials(cleanupInterval) + + thingsToCleanUp.Clear() // we've succeeded so we do not want to throw away these things + + return envContext, nil +} + +func createInitChecker( + allConfig config.Config, + envConfig config.EnvConfig, +) (store.StoreInitCheckerCloser, time.Duration, error) { if allConfig.Redis.URL.IsDefined() { - healthCheckInterval := allConfig.Redis.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + interval := allConfig.Redis.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) redisURL, prefix := sdks.GetRedisBasicProperties(allConfig.Redis, envConfig) var dialOptions []redigo.DialOption if allConfig.Redis.Password != "" { @@ -416,27 +441,43 @@ func NewEnvContext( if allConfig.Redis.Username != "" { dialOptions = append(dialOptions, redigo.DialUsername(allConfig.Redis.Username)) } - initChecker := store.NewRedisInitChecker(redisURL, prefix, dialOptions) - envContext.storeInitChecker = initChecker - thingsToCleanUp.AddFunc(func() { _ = initChecker.Close() }) - // Health check is started later after the store adapter builds the actual store. - // We defer this to startStoreHealthCheck which is called after the SDK client is ready. - envContext.deferredHealthCheckStart(initChecker, healthCheckInterval, envLoggers) + checker := store.NewRedisInitChecker(redisURL, prefix, dialOptions) + return checker, interval, nil } - cleanupInterval := params.ExpiredCredentialCleanupInterval - if cleanupInterval == 0 { // 0 means it wasn't specified; the config system disallows 0 as a valid value. - cleanupInterval = defaultCredentialCleanupInterval + if allConfig.Consul.Host != "" { + interval := allConfig.Consul.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + prefix := envConfig.Prefix + if prefix == "" { + prefix = ldconsul.DefaultPrefix + } + checker, err := store.NewConsulInitChecker( + allConfig.Consul.Host, allConfig.Consul.Token, allConfig.Consul.TokenFile, prefix, + ) + if err != nil { + return nil, 0, err + } + return checker, interval, nil } - go envContext.cleanupExpiredCredentials(cleanupInterval) - thingsToCleanUp.Clear() // we've succeeded so we do not want to throw away these things + if allConfig.DynamoDB.Enabled { + interval := allConfig.DynamoDB.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + endpoint, tableName, prefix := sdks.GetDynamoDBBasicProperties(allConfig.DynamoDB, envConfig) + if tableName == "" { + return nil, 0, nil + } + checker, err := store.NewDynamoDBInitChecker(tableName, prefix, endpoint) + if err != nil { + return nil, 0, err + } + return checker, interval, nil + } - return envContext, nil + return nil, 0, nil } func (c *envContextImpl) deferredHealthCheckStart( - initChecker *store.RedisInitChecker, + initChecker store.StoreInitChecker, interval time.Duration, loggers ldlog.Loggers, ) { diff --git a/internal/store/consul_init_checker.go b/internal/store/consul_init_checker.go new file mode 100644 index 00000000..2126f0ca --- /dev/null +++ b/internal/store/consul_init_checker.go @@ -0,0 +1,50 @@ +package store + +import ( + consul "github.com/hashicorp/consul/api" +) + +// ConsulInitChecker implements StoreInitChecker by directly querying Consul +// for the $inited KV key, bypassing the SDK's caching layer. +type ConsulInitChecker struct { + client *consul.Client + prefix string +} + +// NewConsulInitChecker creates a checker that connects to Consul at the given address. +// The prefix should match the store prefix used by the SDK (e.g. "launchdarkly"). +func NewConsulInitChecker(address string, token string, tokenFile string, prefix string) (*ConsulInitChecker, error) { + config := consul.DefaultConfig() + config.Address = address + if token != "" { + config.Token = token + } else if tokenFile != "" { + config.TokenFile = tokenFile + } + client, err := consul.NewClient(config) + if err != nil { + return nil, err + } + return &ConsulInitChecker{ + client: client, + prefix: prefix, + }, nil +} + +func (c *ConsulInitChecker) initedKey() string { + return c.prefix + "/$inited" +} + +// CheckInitialized checks if the $inited key exists in Consul KV. +func (c *ConsulInitChecker) CheckInitialized() (available bool, initialized bool, err error) { + pair, _, err := c.client.KV().Get(c.initedKey(), nil) + if err != nil { + return false, false, err + } + return true, pair != nil, nil +} + +// Close is a no-op for Consul (the HTTP client doesn't need explicit cleanup). +func (c *ConsulInitChecker) Close() error { + return nil +} diff --git a/internal/store/dynamodb_init_checker.go b/internal/store/dynamodb_init_checker.go new file mode 100644 index 00000000..d8935523 --- /dev/null +++ b/internal/store/dynamodb_init_checker.go @@ -0,0 +1,78 @@ +package store + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +const ( + dynamoTablePartitionKey = "namespace" + dynamoTableSortKey = "key" +) + +// DynamoDBInitChecker implements StoreInitChecker by directly querying DynamoDB +// for the $inited item, bypassing the SDK's caching layer. +type DynamoDBInitChecker struct { + client *dynamodb.Client + tableName string + prefix string +} + +// NewDynamoDBInitChecker creates a checker that connects to DynamoDB. +// The tableName and prefix should match those used by the SDK store. +// If endpoint is non-nil, it overrides the default AWS endpoint (for local testing). +func NewDynamoDBInitChecker(tableName string, prefix string, endpoint *string) (*DynamoDBInitChecker, error) { + cfg, err := awsconfig.LoadDefaultConfig(context.Background()) + if err != nil { + return nil, err + } + var options []func(*dynamodb.Options) + if endpoint != nil { + options = append(options, func(o *dynamodb.Options) { + o.BaseEndpoint = endpoint + }) + } + client := dynamodb.NewFromConfig(cfg, options...) + return &DynamoDBInitChecker{ + client: client, + tableName: tableName, + prefix: prefix, + }, nil +} + +func (d *DynamoDBInitChecker) initedKey() string { + if d.prefix == "" { + return "$inited" + } + return d.prefix + ":$inited" +} + +func attrValueStr(s string) *types.AttributeValueMemberS { + return &types.AttributeValueMemberS{Value: s} +} + +// CheckInitialized checks if the $inited item exists in the DynamoDB table. +func (d *DynamoDBInitChecker) CheckInitialized() (available bool, initialized bool, err error) { + key := d.initedKey() + result, err := d.client.GetItem(context.Background(), &dynamodb.GetItemInput{ + TableName: aws.String(d.tableName), + Key: map[string]types.AttributeValue{ + dynamoTablePartitionKey: attrValueStr(key), + dynamoTableSortKey: attrValueStr(key), + }, + ConsistentRead: aws.Bool(true), + }) + if err != nil { + return false, false, err + } + return true, len(result.Item) > 0, nil +} + +// Close is a no-op for DynamoDB (the client doesn't need explicit cleanup). +func (d *DynamoDBInitChecker) Close() error { + return nil +} diff --git a/internal/store/store_health_check.go b/internal/store/store_health_check.go index f82f75e1..2f129a0d 100644 --- a/internal/store/store_health_check.go +++ b/internal/store/store_health_check.go @@ -9,7 +9,7 @@ import ( ) // StoreInitChecker abstracts the ability to check whether a persistent store still -// contains its initialization marker (e.g. the Redis $inited key). Implementations +// contains its initialization marker (e.g. the $inited sentinel key). Implementations // should query the store directly, bypassing any SDK-level caching. type StoreInitChecker interface { // CheckInitialized returns: @@ -19,6 +19,12 @@ type StoreInitChecker interface { CheckInitialized() (available bool, initialized bool, err error) } +// StoreInitCheckerCloser extends StoreInitChecker with a Close method for resource cleanup. +type StoreInitCheckerCloser interface { + StoreInitChecker + Close() error +} + // SnapshotStore is the interface that StoreHealthCheck uses to interact with the store wrapper. // It provides access to snapshot data and circuit breaker state for resilience operations. type SnapshotStore interface { From f273bed67ea86b36c3122606e078c0f77fc3737d Mon Sep 17 00:00:00 2001 From: Nicolas Bouvrette Date: Sun, 10 May 2026 23:48:12 -0400 Subject: [PATCH 5/6] fix: avoid resource leak when health check is disabled When healthCheckInterval is set to 0, return early from createInitChecker before allocating the checker and its connection pool. Previously the checker was created but never stored or closed when the interval guard rejected it. Co-authored-by: Cursor --- internal/relayenv/env_context_impl.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/internal/relayenv/env_context_impl.go b/internal/relayenv/env_context_impl.go index 5a0da45b..68ae0630 100644 --- a/internal/relayenv/env_context_impl.go +++ b/internal/relayenv/env_context_impl.go @@ -433,6 +433,9 @@ func createInitChecker( ) (store.StoreInitCheckerCloser, time.Duration, error) { if allConfig.Redis.URL.IsDefined() { interval := allConfig.Redis.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + if interval <= 0 { + return nil, 0, nil + } redisURL, prefix := sdks.GetRedisBasicProperties(allConfig.Redis, envConfig) var dialOptions []redigo.DialOption if allConfig.Redis.Password != "" { @@ -447,6 +450,9 @@ func createInitChecker( if allConfig.Consul.Host != "" { interval := allConfig.Consul.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + if interval <= 0 { + return nil, 0, nil + } prefix := envConfig.Prefix if prefix == "" { prefix = ldconsul.DefaultPrefix @@ -462,6 +468,9 @@ func createInitChecker( if allConfig.DynamoDB.Enabled { interval := allConfig.DynamoDB.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval) + if interval <= 0 { + return nil, 0, nil + } endpoint, tableName, prefix := sdks.GetDynamoDBBasicProperties(allConfig.DynamoDB, envConfig) if tableName == "" { return nil, 0, nil From 256be9cbc987c9c0ce309f9c999cb25d097284b7 Mon Sep 17 00:00:00 2001 From: Nicolas Bouvrette Date: Mon, 11 May 2026 13:22:53 -0400 Subject: [PATCH 6/6] refactor: rename deepCopyCollections to copyCollectionStructure The function copies collection and item slices but shares the underlying ItemDescriptor.Item pointers. The old name implied full deep copying which was misleading. Added a comment explaining the shared pointer semantics and why it is safe (SDK treats flag/segment objects as immutable). Co-authored-by: Cursor --- internal/store/relay_feature_store.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/internal/store/relay_feature_store.go b/internal/store/relay_feature_store.go index a10fd6ea..55dfb5b8 100644 --- a/internal/store/relay_feature_store.go +++ b/internal/store/relay_feature_store.go @@ -139,14 +139,16 @@ func (sw *streamUpdatesStoreWrapper) HasSnapshot() bool { return sw.snapshotHasData } -// GetSnapshot returns a deep copy of the current snapshot, or nil if none exists. +// GetSnapshot returns a structural copy of the current snapshot, or nil if none exists. +// The returned slices are independent but ItemDescriptor.Item pointers are shared (safe +// because flag/segment objects are immutable in the SDK). func (sw *streamUpdatesStoreWrapper) GetSnapshot() []ldstoretypes.Collection { sw.snapshotMu.RLock() defer sw.snapshotMu.RUnlock() if !sw.snapshotHasData { return nil } - return deepCopyCollections(sw.snapshot) + return copyCollectionStructure(sw.snapshot) } func (sw *streamUpdatesStoreWrapper) saveSnapshot(allData []ldstoretypes.Collection) { @@ -161,7 +163,7 @@ func (sw *streamUpdatesStoreWrapper) saveSnapshot(allData []ldstoretypes.Collect sw.snapshotMu.Lock() defer sw.snapshotMu.Unlock() if hasData { - sw.snapshot = deepCopyCollections(allData) + sw.snapshot = copyCollectionStructure(allData) sw.snapshotHasData = true } else { sw.snapshot = nil @@ -209,7 +211,11 @@ func (sw *streamUpdatesStoreWrapper) updateSnapshotItem( }) } -func deepCopyCollections(src []ldstoretypes.Collection) []ldstoretypes.Collection { +// copyCollectionStructure copies the collection and item slices so that +// modifications to the returned structure (appending, replacing items) do not +// affect the source. The ItemDescriptor.Item pointers are shared, which is safe +// because the LaunchDarkly SDK treats flag/segment objects as immutable. +func copyCollectionStructure(src []ldstoretypes.Collection) []ldstoretypes.Collection { if src == nil { return nil }