From 73fe103df0528fdfc4a0aa0a9f3b4def0c4b2c07 Mon Sep 17 00:00:00 2001 From: qk-santi <94850169+qk-santi@users.noreply.github.com> Date: Tue, 14 Apr 2026 18:45:49 +0200 Subject: [PATCH] feat(observer): per-RPC observer groups with global fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for overriding the observer set on a per-RPC-provider basis. Each provider can now define its own observers group in config; providers without one fall back to the global observers.rpc set. Non-RPC providers (exchange_rates, system, hash_divergence, etc.) use a separate observers.system set on their own EventBus. - Add GlobalObservers{RPC, System} config struct; observers.rpc becomes the fallback for all RPC providers, observers.system for non-RPC - Add optional observers field on RPC provider config; when set, a dedicated EventBus is created for that provider - Rename GetEnabledObserverSet → GetObserverSetFrom(cfg Observers) to accept an explicit config instead of reading global state - Replace promauto with manual Register + registerOrExisting helper to prevent duplicate-registration panics on repeated Init calls - Fix Publish to only warn on empty subscriber sets for topics that were explicitly subscribed (avoids false warnings for unregistered topics) --- config.yml | 195 ++++++++++++++++++++++++------------- config/config.go | 12 ++- metrics/metrics.go | 29 ++++-- observer/exchange_rates.go | 11 ++- observer/observer.go | 12 ++- runner/runner.go | 40 +++++--- 6 files changed, 204 insertions(+), 95 deletions(-) diff --git a/config.yml b/config.yml index 7f050de..149b6c8 100644 --- a/config.yml +++ b/config.yml @@ -215,6 +215,53 @@ ## @env PANOPTICHAIN_PROVIDERS_RPC_0_ACCOUNTS_0_TAG - string - optional ## A tag to label this account in metrics (e.g., "sequencer", "proposer"). ## + ## @param observers - object - optional - default: observers.rpc + ## Override the observer set for this specific provider. If omitted, the + ## observers.rpc config is used as the fallback (all observers enabled by + ## default). Only observers listed here will emit metrics for this provider. + ## YAML anchors (&) and references (*) can be used to share observer groups + ## across providers without repeating the list. + ## + ## Tip: define observers.rpc with an anchor (&full) so the full group is + ## available to reference on per-provider overrides: + ## + ## observers: + ## rpc: &full + ## enabled: + ## - block + ## - finalized_height + ## - block_interval + ## # ... all desired RPC observers + ## system: + ## enabled: + ## - exchange_rates + ## - hash_divergence + ## - system + ## + ## x-observer-groups: + ## light: &light + ## enabled: + ## - block + ## - finalized_height + ## + ## providers: + ## rpc: + ## - name: "Polygon Mainnet" + ## url: "https://internal-rpc" + ## label: "internal" + ## observers: *full + ## - name: "Polygon Mainnet" + ## url: "https://external-rpc" + ## label: "external" + ## observers: *light + ## + ## @param enabled - list of strings - optional + ## Enable only specific observers for this provider. + ## + ## @param disabled - list of strings - optional + ## Disable specific observers for this provider (only applies when + ## `enabled` is not set, i.e. all-minus-disabled semantics). + ## ## @param time_to_mine - object - optional ## The `time_to_mine` configuration. This will periodically send ## transactions and record the time it took to be included in a block. If @@ -513,71 +560,89 @@ # - eur ## @param observers - object - optional -## Observers control which metrics will be emitted. By default, all observers are -## enabled unless explicitly disabled. +## Controls which observers are active, split by scope. +## +## observers.rpc — fallback set for RPC providers without a custom observers +## group. Defaults to all observers when not configured. +## Define with a YAML anchor (&full) to reuse on providers. +## +## observers.system — set for non-RPC providers: exchange_rates, system, +## hash_divergence, heimdall, sensor_network, etc. These +## run on a separate event bus from the RPC providers. +## Defaults to all observers when not configured. +## +## Each sub-key supports: +## enabled - enable only the listed observers (all others disabled) +## disabled - disable the listed observers (only applies when enabled is unset) # # observers: # - ## @param disabled - list of strings - optional - ## @env PANOPTICHAIN_OBSERVERS_DISABLED - list of strings - optional - ## Disable specific observers from the observer list. - # - # disabled: - # - ## @param enabled - list of strings - optional - ## @env PANOPTICHAIN_OBSERVERS_ENABLED - list of strings - optional - default enables all observers - ## Enable only specific observers. Below are all available observers: - # - # enabled: - # - "account_balances" - # - "base_fee_per_gas" - # - "block" - # - "block_interval" - # - "bogon_block" - # - "bridge_event" - # - "checkpoint" - # - "claim_event" - # - "deposit_counts" - # - "double_sign" - # - "empty_block" - # - "exchange_rates" - # - "exit_roots" - # - "finalized_height" - # - "gas_limit" - # - "gas_used" - # - "hash_divergence" - # - "heimdall_block" - # - "heimdall_block_interval" - # - "heimdall_checkpoint" - # - "heimdall_height" - # - "heimdall_missed_block_proposal" - # - "heimdall_missed_checkpoint_proposal" - # - "heimdall_missed_milestone_proposal" - # - "heimdall_signature_count" - # - "milestone" - # - "missed_block_proposal" - # - "refresh_state_time" - # - "reorg" - # - "sealed_out_of_turn" - # - "sensor_block_events" - # - "sensor_blocks" - # - "sensor_bogon_block" - # - "state_sync" - # - "stolen_block" - # - "system" - # - "time_to_finalized" - # - "time_to_mine" - # - "transaction_cost" - # - "transaction_count" - # - "transaction_gas_fee_cap" - # - "transaction_gas_limit" - # - "transaction_gas_price" - # - "transaction_gas_tip_cap" - # - "transaction_pool" - # - "transaction_value" - # - "trusted_batch" - # - "uncles" - # - "validator_wallet_balance" - # - "zkevm_batches" - # - "rollup_manager" - # - "span" + ## @param rpc - object - optional + ## @param rpc.enabled - list of strings - optional + ## @env PANOPTICHAIN_OBSERVERS_RPC_ENABLED - list of strings - optional + ## @param rpc.disabled - list of strings - optional + ## @env PANOPTICHAIN_OBSERVERS_RPC_DISABLED - list of strings - optional + # + # rpc: + # enabled: + # - "account_balances" + # - "base_fee_per_gas" + # - "block" + # - "block_interval" + # - "bogon_block" + # - "bridge_event" + # - "checkpoint" + # - "claim_event" + # - "deposit_counts" + # - "double_sign" + # - "empty_block" + # - "exit_roots" + # - "finalized_height" + # - "gas_limit" + # - "gas_used" + # - "heimdall_block" + # - "heimdall_block_interval" + # - "heimdall_checkpoint" + # - "heimdall_height" + # - "heimdall_missed_block_proposal" + # - "heimdall_missed_checkpoint_proposal" + # - "heimdall_missed_milestone_proposal" + # - "heimdall_signature_count" + # - "milestone" + # - "missed_block_proposal" + # - "refresh_state_time" + # - "reorg" + # - "sealed_out_of_turn" + # - "sensor_block_events" + # - "sensor_blocks" + # - "sensor_bogon_block" + # - "state_sync" + # - "stolen_block" + # - "time_to_finalized" + # - "time_to_mine" + # - "transaction_cost" + # - "transaction_count" + # - "transaction_gas_fee_cap" + # - "transaction_gas_limit" + # - "transaction_gas_price" + # - "transaction_gas_tip_cap" + # - "transaction_pool" + # - "transaction_value" + # - "trusted_batch" + # - "uncles" + # - "validator_wallet_balance" + # - "zkevm_batches" + # - "rollup_manager" + # - "span" + # + ## @param system - object - optional + ## @param system.enabled - list of strings - optional + ## @env PANOPTICHAIN_OBSERVERS_SYSTEM_ENABLED - list of strings - optional + ## @param system.disabled - list of strings - optional + ## @env PANOPTICHAIN_OBSERVERS_SYSTEM_DISABLED - list of strings - optional + # + # system: + # enabled: + # - "exchange_rates" + # - "hash_divergence" + # - "system" diff --git a/config/config.go b/config/config.go index 41a431a..2dd1d9a 100644 --- a/config/config.go +++ b/config/config.go @@ -52,6 +52,7 @@ type RPC struct { Accounts []Account `mapstructure:"accounts"` BlockLookBack *uint64 `mapstructure:"block_look_back"` TxPool bool `mapstructure:"txpool"` + Observers *Observers `mapstructure:"observers"` } // Contracts maps specific contracts to their addresses. This is used to @@ -174,6 +175,15 @@ type Observers struct { Disabled []string `mapstructure:"disabled"` } +// GlobalObservers separates observer configuration by scope. RPC is the +// fallback set for RPC providers without a custom observers group. System is +// the set for non-RPC providers (exchange_rates, system, hash_divergence). +// Both default to all known observers when not configured. +type GlobalObservers struct { + RPC Observers `mapstructure:"rpc"` + System Observers `mapstructure:"system"` +} + // HTTP defines the properties that used for exposing metrics. type HTTP struct { PromPort int `mapstructure:"port" validate:"required"` @@ -221,7 +231,7 @@ type config struct { Runner Runner `mapstructure:"runner"` HTTP HTTP `mapstructure:"http"` Providers Providers `mapstructure:"providers"` - Observers Observers `mapstructure:"observers"` + Observers GlobalObservers `mapstructure:"observers"` Networks []Network `mapstructure:"networks"` Logs Logs `mapstructure:"logs"` } diff --git a/metrics/metrics.go b/metrics/metrics.go index f15e348..0f1b10e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/0xPolygon/panoptichain/config" ) @@ -26,45 +25,57 @@ const ( SPN ) +// registerOrExisting registers c and returns it, or returns the previously +// registered collector if one with the same name already exists. +func registerOrExisting[C prometheus.Collector](c C) C { + if err := prometheus.Register(c); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + return are.ExistingCollector.(C) + } + panic(err) + } + return c +} + // NewCounter returns a Prometheus counter object with labels for network // and provider. func NewCounter(subsystem Subsystem, name, help string, labels ...string) *prometheus.CounterVec { - return promauto.NewCounterVec(prometheus.CounterOpts{ + return registerOrExisting(prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: config.Config().Namespace, Subsystem: strings.ToLower(subsystem.String()), Name: name, Help: help, - }, append([]string{"network", "provider"}, labels...)) + }, append([]string{"network", "provider"}, labels...))) } // NewGauge returns a Prometheus gauge with labels for network and provider. func NewGauge(subsystem Subsystem, name, help string, labels ...string) *prometheus.GaugeVec { - return promauto.NewGaugeVec(prometheus.GaugeOpts{ + return registerOrExisting(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: config.Config().Namespace, Subsystem: strings.ToLower(subsystem.String()), Name: name, Help: help, - }, append([]string{"network", "provider"}, labels...)) + }, append([]string{"network", "provider"}, labels...))) } // NewGaugeWithoutLabels returns a Prometheus gauge without labels. func NewGaugeWithoutLabels(subsystem Subsystem, name, help string) prometheus.Gauge { - return promauto.NewGauge(prometheus.GaugeOpts{ + return registerOrExisting(prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: config.Config().Namespace, Subsystem: strings.ToLower(subsystem.String()), Name: name, Help: help, - }) + })) } // NewHistogram returns a configured histogram with labels for network and // provider. func NewHistogram(subsystem Subsystem, name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec { - return promauto.NewHistogramVec(prometheus.HistogramOpts{ + return registerOrExisting(prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: config.Config().Namespace, Subsystem: strings.ToLower(subsystem.String()), Name: name, Help: help, Buckets: buckets, - }, append([]string{"network", "provider"}, labels...)) + }, append([]string{"network", "provider"}, labels...))) } diff --git a/observer/exchange_rates.go b/observer/exchange_rates.go index 5de5e48..0736c6b 100644 --- a/observer/exchange_rates.go +++ b/observer/exchange_rates.go @@ -4,7 +4,6 @@ import ( "context" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/0xPolygon/panoptichain/config" "github.com/0xPolygon/panoptichain/observer/topics" @@ -23,11 +22,19 @@ type ExchangeRatesObserver struct { func (o *ExchangeRatesObserver) Register(eb *EventBus) { eb.Subscribe(topics.ExchangeRate, o) - o.gauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + g := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: config.Config().Namespace, Name: "exchange_rates", Help: "The exchange rate between the base and quote currencies", }, []string{"base", "quote"}) + if err := prometheus.Register(g); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + g = are.ExistingCollector.(*prometheus.GaugeVec) + } else { + panic(err) + } + } + o.gauge = g } func (o *ExchangeRatesObserver) Notify(ctx context.Context, m Message) { diff --git a/observer/observer.go b/observer/observer.go index a61647a..15f4996 100644 --- a/observer/observer.go +++ b/observer/observer.go @@ -108,11 +108,12 @@ func (eb *EventBus) Subscribe(topic Topic, o Observer) { // Publish is called by the providers when they want to send a message to all // subscribers. func (eb *EventBus) Publish(ctx context.Context, topic Topic, m Message) { - if len(eb.observers[topic.String()]) == 0 { + subs, subscribed := eb.observers[topic.String()] + if subscribed && len(subs) == 0 { log.Warn().Str("topic", topic.String()).Msg("Topic published to empty subscriber set") } - for _, s := range eb.observers[topic.String()] { + for _, s := range subs { eb.jobs <- struct{}{} go func(o Observer) { o.Notify(ctx, m) @@ -192,8 +193,9 @@ var observersMap = map[string]Observer{ "zkevm_batches": new(ZkEVMBatchObserver), } -func GetEnabledObserverSet() ObserverSet { - cfg := config.Config().Observers +// GetObserverSetFrom builds an ObserverSet from the given Observers config. +// If cfg.Enabled is empty, all known observers are enabled by default. +func GetObserverSetFrom(cfg config.Observers) ObserverSet { set := make(map[string]struct{}) for _, name := range cfg.Enabled { @@ -217,7 +219,7 @@ func GetEnabledObserverSet() ObserverSet { delete(set, name) } - observers := make(ObserverSet, 0, len(observersMap)) + observers := make(ObserverSet, 0, len(set)) for name := range set { observers = append(observers, observersMap[name]) } diff --git a/runner/runner.go b/runner/runner.go index 712aec2..0251162 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -14,7 +14,6 @@ import ( ) var providers []provider.Provider -var observers observer.ObserverSet // Start starts the main infinite loop of this program. func Start(ctx context.Context) { @@ -43,16 +42,34 @@ func Start(ctx context.Context) { // Init configures all the providers and observers of the system. func Init(ctx context.Context) error { - eb := observer.NewEventBus() providers = []provider.Provider{} rpcProviders := []*provider.RPCProvider{} + // systemEB is used by non-RPC providers (exchange_rates, system, + // hash_divergence, heimdall, sensor, etc.). + systemEB := observer.NewEventBus() + systemObs := observer.GetObserverSetFrom(config.Config().Observers.System) + systemObs.Register(systemEB) + + // rpcFallbackEB is used by RPC providers that don't specify a custom + // observers group. + rpcFallbackEB := observer.NewEventBus() + rpcFallbackObs := observer.GetObserverSetFrom(config.Config().Observers.RPC) + rpcFallbackObs.Register(rpcFallbackEB) + for _, r := range config.Config().Providers.RPCs { n, err := network.GetNetworkByName(r.Name) if err != nil { return err } + eb := rpcFallbackEB + if r.Observers != nil { + eb = observer.NewEventBus() + obs := observer.GetObserverSetFrom(*r.Observers) + obs.Register(eb) + } + p := provider.NewRPCProvider(n, eb, r) providers = append(providers, p) rpcProviders = append(rpcProviders, p) @@ -61,7 +78,7 @@ func Init(ctx context.Context) error { if hd := config.Config().Providers.HashDivergence; hd != nil { p := provider.NewHashDivergenceProvider( rpcProviders, - eb, + systemEB, provider.GetInterval(hd.Interval), ) providers = append(providers, p) @@ -73,7 +90,7 @@ func Init(ctx context.Context) error { return err } - p := provider.NewHeimdallProvider(n, eb, h) + p := provider.NewHeimdallProvider(n, systemEB, h) providers = append(providers, p) } @@ -83,7 +100,7 @@ func Init(ctx context.Context) error { return err } - p := provider.NewSensorNetworkProvider(ctx, n, eb, s) + p := provider.NewSensorNetworkProvider(ctx, n, systemEB, s) providers = append(providers, p) } @@ -93,7 +110,7 @@ func Init(ctx context.Context) error { return err } - p := provider.NewProverNetworkProvider(n, eb, p) + p := provider.NewProverNetworkProvider(n, systemEB, p) providers = append(providers, p) } @@ -103,7 +120,7 @@ func Init(ctx context.Context) error { return err } - p := provider.NewAggchainProvider(n, eb, p) + p := provider.NewAggchainProvider(n, systemEB, p) providers = append(providers, p) } @@ -113,22 +130,19 @@ func Init(ctx context.Context) error { return err } - p := provider.NewGrafanaProvider(n, eb, p) + p := provider.NewGrafanaProvider(n, systemEB, p) providers = append(providers, p) } if system := config.Config().Providers.System; system != nil { - p := provider.NewSystemProvider(eb, provider.GetInterval(system.Interval)) + p := provider.NewSystemProvider(systemEB, provider.GetInterval(system.Interval)) providers = append(providers, p) } if er := config.Config().Providers.ExchangeRates; er != nil { - p := provider.NewExchangeRatesProvider(eb, *er) + p := provider.NewExchangeRatesProvider(systemEB, *er) providers = append(providers, p) } - observers = observer.GetEnabledObserverSet() - observers.Register(eb) - return nil }