Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ const (
// DefaultDatabaseCacheTTL is the default value for the LocalTTL parameter for databases if not specified.
DefaultDatabaseCacheTTL = time.Second * 30

// DefaultDataStoreHealthCheckInterval is the default interval for checking whether the persistent
// data store still contains its initialization data. If data loss is detected (e.g. after a Redis
// restart), the relay will automatically repopulate the store from its in-memory snapshot.
DefaultDataStoreHealthCheckInterval = time.Second * 30

// DefaultPrometheusPort is the default value for PrometheusConfig.Port if not specified.
DefaultPrometheusPort = 8031

Expand Down Expand Up @@ -212,13 +217,14 @@ type EventsConfig struct {
// variables, individual fields are not documented here; instead, see the `README.md` section on
// configuration.
type RedisConfig struct {
Host string `conf:"REDIS_HOST"`
Port ct.OptIntGreaterThanZero
URL ct.OptURLAbsolute `conf:"REDIS_URL"`
LocalTTL ct.OptDuration `conf:"CACHE_TTL"`
TLS bool `conf:"REDIS_TLS"`
Username string `conf:"REDIS_USERNAME"`
Password string `conf:"REDIS_PASSWORD"`
Host string `conf:"REDIS_HOST"`
Port ct.OptIntGreaterThanZero
URL ct.OptURLAbsolute `conf:"REDIS_URL"`
LocalTTL ct.OptDuration `conf:"CACHE_TTL"`
TLS bool `conf:"REDIS_TLS"`
Username string `conf:"REDIS_USERNAME"`
Password string `conf:"REDIS_PASSWORD"`
HealthCheckInterval ct.OptDuration `conf:"REDIS_HEALTH_CHECK_INTERVAL"`
}

// ConsulConfig configures the optional Consul integration.
Expand All @@ -231,10 +237,11 @@ type RedisConfig struct {
// variables, individual fields are not documented here; instead, see the `README.md` section on
// configuration.
type ConsulConfig struct {
Host string `conf:"CONSUL_HOST"`
LocalTTL ct.OptDuration `conf:"CACHE_TTL"`
Token string `conf:"CONSUL_TOKEN"`
TokenFile string `conf:"CONSUL_TOKEN_FILE"`
Host string `conf:"CONSUL_HOST"`
LocalTTL ct.OptDuration `conf:"CACHE_TTL"`
Token string `conf:"CONSUL_TOKEN"`
TokenFile string `conf:"CONSUL_TOKEN_FILE"`
HealthCheckInterval ct.OptDuration `conf:"CONSUL_HEALTH_CHECK_INTERVAL"`
}

// DynamoDBConfig configures the optional DynamoDB integration, which is used only if Enabled is true.
Expand All @@ -245,10 +252,11 @@ type ConsulConfig struct {
// variables, individual fields are not documented here; instead, see the `README.md` section on
// configuration.
type DynamoDBConfig struct {
Enabled bool `conf:"USE_DYNAMODB"`
TableName string `conf:"DYNAMODB_TABLE"`
URL ct.OptURLAbsolute `conf:"DYNAMODB_URL"`
LocalTTL ct.OptDuration `conf:"CACHE_TTL"`
Enabled bool `conf:"USE_DYNAMODB"`
TableName string `conf:"DYNAMODB_TABLE"`
URL ct.OptURLAbsolute `conf:"DYNAMODB_URL"`
LocalTTL ct.OptDuration `conf:"CACHE_TTL"`
HealthCheckInterval ct.OptDuration `conf:"DYNAMODB_HEALTH_CHECK_INTERVAL"`
}

// EnvConfig describes an environment to be relayed. There may be any number of these.
Expand Down
48 changes: 48 additions & 0 deletions docs/persistent-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,54 @@ note over Relay Proxy: TTL fresh, serve from memory
Relay Proxy-->>SDK2: Streaming response
```

## Data Store Health Check and Automatic Repopulation

The Relay Proxy includes a health check mechanism that detects when a persistent store loses its data (for example, when Redis restarts without persistence enabled, or when Consul/DynamoDB data is deleted). Without this, SDKs using daemon mode (such as PHP) would receive empty flag evaluations until the Relay Proxy is manually restarted.

### How it works

When using a persistent data store (Redis, Consul, or DynamoDB), the Relay Proxy periodically checks for the presence of a sentinel key (`$inited`) that the SDK writes when it first populates the store. If this key is missing but the Relay has valid data in memory, it automatically repopulates the store.

The health check also includes a **circuit breaker**: if a read from the persistent store fails with a connection error, subsequent reads are served directly from an in-memory snapshot, avoiding connection pool exhaustion and timeout cascades. The circuit breaker is cleared automatically when the health check confirms the store is available again.

### Configuration

The health check is enabled by default for all persistent store backends with a 30-second interval. The interval is configurable per backend. Setting the interval to `0` disables the health check.

```
# Configuration file examples
[Redis]
host = "localhost"
port = 6379
localTtl = 30s
healthCheckInterval = 30s

[Consul]
host = "localhost"
healthCheckInterval = 30s

[DynamoDB]
tableName = "my-feature-flags"
healthCheckInterval = 30s
```

```
# Environment variable examples
REDIS_HEALTH_CHECK_INTERVAL=30s
CONSUL_HEALTH_CHECK_INTERVAL=30s
DYNAMODB_HEALTH_CHECK_INTERVAL=30s

# To disable the health check:
REDIS_HEALTH_CHECK_INTERVAL=0
```

### Behavior summary

- **Store read error (e.g. connection failure):** Circuit breaker activates immediately. Proxy-mode SDKs are served from the in-memory snapshot. The health check probes for recovery at the configured interval.
- **Store data loss (e.g. Redis restart, Consul KV deletion):** Detected within one health check interval. The store is automatically repopulated from the in-memory snapshot.
- **Store recovered:** Circuit breaker is cleared. Normal read path resumes.
- **No snapshot available (e.g. Relay just started):** Health check cannot repopulate. Errors pass through normally.

## Example: Persistent Store during LaunchDarkly Outage - Cold Relay

In this example, LaunchDarkly SaaS is down. Additionally, the Relay in this diagram is starting up **during** the
Expand Down
118 changes: 118 additions & 0 deletions internal/relayenv/env_context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"

ldconsul "github.com/launchdarkly/go-server-sdk-consul/v3"
redigo "github.com/gomodule/redigo/redis"
)

// LogNameMode is used in NewEnvContext to determine whether the environment's log messages should be
Expand Down Expand Up @@ -120,6 +123,8 @@ type envContextImpl struct {
stopMonitoringCredentials chan struct{}
doneMonitoringCredentials chan struct{}
connectionMapper ConnectionMapper
storeHealthCheck *store.StoreHealthCheck
storeInitChecker store.StoreInitCheckerCloser
offline bool
closed bool
}
Expand Down Expand Up @@ -401,6 +406,16 @@ func NewEnvContext(
// Connecting may take time, so do this in parallel
go envContext.startSDKClient(envConfig.SDKKey, readyCh, allConfig.Main.IgnoreConnectionErrors)

// Start the persistent store health check for any configured persistent store.
// A health check interval of 0 disables the check.
if initChecker, interval, err := createInitChecker(allConfig, envConfig); err != nil {
envLoggers.Errorf("Failed to create data store health checker: %s", err)
} else if initChecker != nil && interval > 0 {
envContext.storeInitChecker = initChecker
thingsToCleanUp.AddFunc(func() { _ = initChecker.Close() })
envContext.deferredHealthCheckStart(initChecker, interval, envLoggers)
}
Comment thread
cursor[bot] marked this conversation as resolved.

cleanupInterval := params.ExpiredCredentialCleanupInterval
if cleanupInterval == 0 { // 0 means it wasn't specified; the config system disallows 0 as a valid value.
cleanupInterval = defaultCredentialCleanupInterval
Expand All @@ -412,6 +427,100 @@ func NewEnvContext(
return envContext, nil
}

func createInitChecker(
allConfig config.Config,
envConfig config.EnvConfig,
) (store.StoreInitCheckerCloser, time.Duration, error) {
if allConfig.Redis.URL.IsDefined() {
interval := allConfig.Redis.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval)
if interval <= 0 {
return nil, 0, nil
}
redisURL, prefix := sdks.GetRedisBasicProperties(allConfig.Redis, envConfig)
var dialOptions []redigo.DialOption
if allConfig.Redis.Password != "" {
dialOptions = append(dialOptions, redigo.DialPassword(allConfig.Redis.Password))
}
if allConfig.Redis.Username != "" {
dialOptions = append(dialOptions, redigo.DialUsername(allConfig.Redis.Username))
}
checker := store.NewRedisInitChecker(redisURL, prefix, dialOptions)
return checker, interval, nil
}

if allConfig.Consul.Host != "" {
interval := allConfig.Consul.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval)
if interval <= 0 {
return nil, 0, nil
}
prefix := envConfig.Prefix
if prefix == "" {
prefix = ldconsul.DefaultPrefix
}
checker, err := store.NewConsulInitChecker(
allConfig.Consul.Host, allConfig.Consul.Token, allConfig.Consul.TokenFile, prefix,
)
if err != nil {
return nil, 0, err
}
return checker, interval, nil
}

if allConfig.DynamoDB.Enabled {
interval := allConfig.DynamoDB.HealthCheckInterval.GetOrElse(config.DefaultDataStoreHealthCheckInterval)
if interval <= 0 {
return nil, 0, nil
}
endpoint, tableName, prefix := sdks.GetDynamoDBBasicProperties(allConfig.DynamoDB, envConfig)
if tableName == "" {
return nil, 0, nil
}
checker, err := store.NewDynamoDBInitChecker(tableName, prefix, endpoint)
if err != nil {
return nil, 0, err
}
return checker, interval, nil
}

return nil, 0, nil
}

func (c *envContextImpl) deferredHealthCheckStart(
initChecker store.StoreInitChecker,
interval time.Duration,
loggers ldlog.Loggers,
) {
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ss := c.storeAdapter.GetSnapshotStore()
if ss == nil {
continue
}
hc := store.NewStoreHealthCheck(ss, initChecker, interval, loggers)
if hc == nil {
return
}
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return
}
c.storeHealthCheck = hc
c.mu.Unlock()
hc.Start()
loggers.Info("Data store health check started")
return
case <-c.stopMonitoringCredentials:
return
}
}
}()
}

func (c *envContextImpl) cleanupExpiredCredentials(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
Expand Down Expand Up @@ -748,6 +857,15 @@ func (c *envContextImpl) Close() error {
if c.sdkBigSegments != nil {
c.sdkBigSegments.Close()
}
c.mu.RLock()
hc := c.storeHealthCheck
c.mu.RUnlock()
if hc != nil {
hc.Stop()
}
Comment thread
cursor[bot] marked this conversation as resolved.
if c.storeInitChecker != nil {
_ = c.storeInitChecker.Close()
}
return nil
}

Expand Down
50 changes: 50 additions & 0 deletions internal/store/consul_init_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package store

import (
consul "github.com/hashicorp/consul/api"
)

// ConsulInitChecker implements StoreInitChecker by directly querying Consul
// for the $inited KV key, bypassing the SDK's caching layer.
type ConsulInitChecker struct {
client *consul.Client
prefix string
}

// NewConsulInitChecker creates a checker that connects to Consul at the given address.
// The prefix should match the store prefix used by the SDK (e.g. "launchdarkly").
func NewConsulInitChecker(address string, token string, tokenFile string, prefix string) (*ConsulInitChecker, error) {
config := consul.DefaultConfig()
config.Address = address
if token != "" {
config.Token = token
} else if tokenFile != "" {
config.TokenFile = tokenFile
}
client, err := consul.NewClient(config)
if err != nil {
return nil, err
}
return &ConsulInitChecker{
client: client,
prefix: prefix,
}, nil
}

func (c *ConsulInitChecker) initedKey() string {
return c.prefix + "/$inited"
}

// CheckInitialized checks if the $inited key exists in Consul KV.
func (c *ConsulInitChecker) CheckInitialized() (available bool, initialized bool, err error) {
pair, _, err := c.client.KV().Get(c.initedKey(), nil)
if err != nil {
return false, false, err
}
return true, pair != nil, nil
}

// Close is a no-op for Consul (the HTTP client doesn't need explicit cleanup).
func (c *ConsulInitChecker) Close() error {
return nil
}
Loading
Loading