Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 130 additions & 65 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,53 @@
## @env PANOPTICHAIN_PROVIDERS_RPC_0_ACCOUNTS_0_TAG - string - optional
## A tag to label this account in metrics (e.g., "sequencer", "proposer").
##
## @param observers - object - optional - default: observers.rpc
## Override the observer set for this specific provider. If omitted, the
## observers.rpc config is used as the fallback (all observers enabled by
## default). Only observers listed here will emit metrics for this provider.
## YAML anchors (&) and references (*) can be used to share observer groups
## across providers without repeating the list.
##
## Tip: define observers.rpc with an anchor (&full) so the full group is
## available to reference on per-provider overrides:
##
## observers:
## rpc: &full
## enabled:
## - block
## - finalized_height
## - block_interval
## # ... all desired RPC observers
## system:
## enabled:
## - exchange_rates
## - hash_divergence
## - system
##
## x-observer-groups:
## light: &light
## enabled:
## - block
## - finalized_height
##
## providers:
## rpc:
## - name: "Polygon Mainnet"
## url: "https://internal-rpc"
## label: "internal"
## observers: *full
## - name: "Polygon Mainnet"
## url: "https://external-rpc"
## label: "external"
## observers: *light
##
## @param enabled - list of strings - optional
## Enable only specific observers for this provider.
##
## @param disabled - list of strings - optional
## Disable specific observers for this provider (only applies when
## `enabled` is not set, i.e. all-minus-disabled semantics).
##
## @param time_to_mine - object - optional
## The `time_to_mine` configuration. This will periodically send
## transactions and record the time it took to be included in a block. If
Expand Down Expand Up @@ -513,71 +560,89 @@
# - eur

## @param observers - object - optional
## Observers control which metrics will be emitted. By default, all observers are
## enabled unless explicitly disabled.
## Controls which observers are active, split by scope.
##
## observers.rpc — fallback set for RPC providers without a custom observers
## group. Defaults to all observers when not configured.
## Define with a YAML anchor (&full) to reuse on providers.
##
## observers.system — set for non-RPC providers: exchange_rates, system,
## hash_divergence, heimdall, sensor_network, etc. These
## run on a separate event bus from the RPC providers.
## Defaults to all observers when not configured.
##
## Each sub-key supports:
## enabled - enable only the listed observers (all others disabled)
## disabled - disable the listed observers (only applies when enabled is unset)
#
# observers:
#
## @param disabled - list of strings - optional
## @env PANOPTICHAIN_OBSERVERS_DISABLED - list of strings - optional
## Disable specific observers from the observer list.
#
# disabled:
#
## @param enabled - list of strings - optional
## @env PANOPTICHAIN_OBSERVERS_ENABLED - list of strings - optional - default enables all observers
## Enable only specific observers. Below are all available observers:
#
# enabled:
# - "account_balances"
# - "base_fee_per_gas"
# - "block"
# - "block_interval"
# - "bogon_block"
# - "bridge_event"
# - "checkpoint"
# - "claim_event"
# - "deposit_counts"
# - "double_sign"
# - "empty_block"
# - "exchange_rates"
# - "exit_roots"
# - "finalized_height"
# - "gas_limit"
# - "gas_used"
# - "hash_divergence"
# - "heimdall_block"
# - "heimdall_block_interval"
# - "heimdall_checkpoint"
# - "heimdall_height"
# - "heimdall_missed_block_proposal"
# - "heimdall_missed_checkpoint_proposal"
# - "heimdall_missed_milestone_proposal"
# - "heimdall_signature_count"
# - "milestone"
# - "missed_block_proposal"
# - "refresh_state_time"
# - "reorg"
# - "sealed_out_of_turn"
# - "sensor_block_events"
# - "sensor_blocks"
# - "sensor_bogon_block"
# - "state_sync"
# - "stolen_block"
# - "system"
# - "time_to_finalized"
# - "time_to_mine"
# - "transaction_cost"
# - "transaction_count"
# - "transaction_gas_fee_cap"
# - "transaction_gas_limit"
# - "transaction_gas_price"
# - "transaction_gas_tip_cap"
# - "transaction_pool"
# - "transaction_value"
# - "trusted_batch"
# - "uncles"
# - "validator_wallet_balance"
# - "zkevm_batches"
# - "rollup_manager"
# - "span"
## @param rpc - object - optional
## @param rpc.enabled - list of strings - optional
## @env PANOPTICHAIN_OBSERVERS_RPC_ENABLED - list of strings - optional
## @param rpc.disabled - list of strings - optional
## @env PANOPTICHAIN_OBSERVERS_RPC_DISABLED - list of strings - optional
#
# rpc:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new nesting level forces this to be a breaking change

# enabled:
# - "account_balances"
# - "base_fee_per_gas"
# - "block"
# - "block_interval"
# - "bogon_block"
# - "bridge_event"
# - "checkpoint"
# - "claim_event"
# - "deposit_counts"
# - "double_sign"
# - "empty_block"
# - "exit_roots"
# - "finalized_height"
# - "gas_limit"
# - "gas_used"
# - "heimdall_block"
# - "heimdall_block_interval"
# - "heimdall_checkpoint"
# - "heimdall_height"
# - "heimdall_missed_block_proposal"
# - "heimdall_missed_checkpoint_proposal"
# - "heimdall_missed_milestone_proposal"
# - "heimdall_signature_count"
# - "milestone"
# - "missed_block_proposal"
# - "refresh_state_time"
# - "reorg"
# - "sealed_out_of_turn"
# - "sensor_block_events"
# - "sensor_blocks"
# - "sensor_bogon_block"
# - "state_sync"
# - "stolen_block"
# - "time_to_finalized"
# - "time_to_mine"
# - "transaction_cost"
# - "transaction_count"
# - "transaction_gas_fee_cap"
# - "transaction_gas_limit"
# - "transaction_gas_price"
# - "transaction_gas_tip_cap"
# - "transaction_pool"
# - "transaction_value"
# - "trusted_batch"
# - "uncles"
# - "validator_wallet_balance"
# - "zkevm_batches"
# - "rollup_manager"
# - "span"
#
## @param system - object - optional
## @param system.enabled - list of strings - optional
## @env PANOPTICHAIN_OBSERVERS_SYSTEM_ENABLED - list of strings - optional
## @param system.disabled - list of strings - optional
## @env PANOPTICHAIN_OBSERVERS_SYSTEM_DISABLED - list of strings - optional
#
# system:
# enabled:
# - "exchange_rates"
# - "hash_divergence"
# - "system"
12 changes: 11 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type RPC struct {
Accounts []Account `mapstructure:"accounts"`
BlockLookBack *uint64 `mapstructure:"block_look_back"`
TxPool bool `mapstructure:"txpool"`
Observers *Observers `mapstructure:"observers"`
}

// Contracts maps specific contracts to their addresses. This is used to
Expand Down Expand Up @@ -174,6 +175,15 @@ type Observers struct {
Disabled []string `mapstructure:"disabled"`
}

// GlobalObservers separates observer configuration by scope. RPC is the
// fallback set for RPC providers without a custom observers group. System is
// the set for non-RPC providers (exchange_rates, system, hash_divergence).
// Both default to all known observers when not configured.
type GlobalObservers struct {
RPC Observers `mapstructure:"rpc"`
System Observers `mapstructure:"system"`
}

// HTTP defines the properties that used for exposing metrics.
type HTTP struct {
PromPort int `mapstructure:"port" validate:"required"`
Expand Down Expand Up @@ -221,7 +231,7 @@ type config struct {
Runner Runner `mapstructure:"runner"`
HTTP HTTP `mapstructure:"http"`
Providers Providers `mapstructure:"providers"`
Observers Observers `mapstructure:"observers"`
Observers GlobalObservers `mapstructure:"observers"`
Networks []Network `mapstructure:"networks"`
Logs Logs `mapstructure:"logs"`
}
Expand Down
29 changes: 20 additions & 9 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/0xPolygon/panoptichain/config"
)
Expand All @@ -26,45 +25,57 @@ const (
SPN
)

// registerOrExisting registers c and returns it, or returns the previously
// registered collector if one with the same name already exists.
func registerOrExisting[C prometheus.Collector](c C) C {
if err := prometheus.Register(c); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
return are.ExistingCollector.(C)
}
panic(err)
}
return c
}

// NewCounter returns a Prometheus counter object with labels for network
// and provider.
func NewCounter(subsystem Subsystem, name, help string, labels ...string) *prometheus.CounterVec {
return promauto.NewCounterVec(prometheus.CounterOpts{
return registerOrExisting(prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: config.Config().Namespace,
Subsystem: strings.ToLower(subsystem.String()),
Name: name,
Help: help,
}, append([]string{"network", "provider"}, labels...))
}, append([]string{"network", "provider"}, labels...)))
}

// NewGauge returns a Prometheus gauge with labels for network and provider.
func NewGauge(subsystem Subsystem, name, help string, labels ...string) *prometheus.GaugeVec {
return promauto.NewGaugeVec(prometheus.GaugeOpts{
return registerOrExisting(prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Config().Namespace,
Subsystem: strings.ToLower(subsystem.String()),
Name: name,
Help: help,
}, append([]string{"network", "provider"}, labels...))
}, append([]string{"network", "provider"}, labels...)))
}

// NewGaugeWithoutLabels returns a Prometheus gauge without labels.
func NewGaugeWithoutLabels(subsystem Subsystem, name, help string) prometheus.Gauge {
return promauto.NewGauge(prometheus.GaugeOpts{
return registerOrExisting(prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: config.Config().Namespace,
Subsystem: strings.ToLower(subsystem.String()),
Name: name,
Help: help,
})
}))
}

// NewHistogram returns a configured histogram with labels for network and
// provider.
func NewHistogram(subsystem Subsystem, name, help string, buckets []float64, labels ...string) *prometheus.HistogramVec {
return promauto.NewHistogramVec(prometheus.HistogramOpts{
return registerOrExisting(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: config.Config().Namespace,
Subsystem: strings.ToLower(subsystem.String()),
Name: name,
Help: help,
Buckets: buckets,
}, append([]string{"network", "provider"}, labels...))
}, append([]string{"network", "provider"}, labels...)))
}
11 changes: 9 additions & 2 deletions observer/exchange_rates.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/0xPolygon/panoptichain/config"
"github.com/0xPolygon/panoptichain/observer/topics"
Expand All @@ -23,11 +22,19 @@ type ExchangeRatesObserver struct {
func (o *ExchangeRatesObserver) Register(eb *EventBus) {
eb.Subscribe(topics.ExchangeRate, o)

o.gauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
g := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: config.Config().Namespace,
Name: "exchange_rates",
Help: "The exchange rate between the base and quote currencies",
}, []string{"base", "quote"})
if err := prometheus.Register(g); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
g = are.ExistingCollector.(*prometheus.GaugeVec)
} else {
panic(err)
}
}
o.gauge = g
}

func (o *ExchangeRatesObserver) Notify(ctx context.Context, m Message) {
Expand Down
12 changes: 7 additions & 5 deletions observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,12 @@ func (eb *EventBus) Subscribe(topic Topic, o Observer) {
// Publish is called by the providers when they want to send a message to all
// subscribers.
func (eb *EventBus) Publish(ctx context.Context, topic Topic, m Message) {
if len(eb.observers[topic.String()]) == 0 {
subs, subscribed := eb.observers[topic.String()]
if subscribed && len(subs) == 0 {
log.Warn().Str("topic", topic.String()).Msg("Topic published to empty subscriber set")
}

for _, s := range eb.observers[topic.String()] {
for _, s := range subs {
eb.jobs <- struct{}{}
go func(o Observer) {
o.Notify(ctx, m)
Expand Down Expand Up @@ -192,8 +193,9 @@ var observersMap = map[string]Observer{
"zkevm_batches": new(ZkEVMBatchObserver),
}

func GetEnabledObserverSet() ObserverSet {
cfg := config.Config().Observers
// GetObserverSetFrom builds an ObserverSet from the given Observers config.
// If cfg.Enabled is empty, all known observers are enabled by default.
func GetObserverSetFrom(cfg config.Observers) ObserverSet {
set := make(map[string]struct{})

for _, name := range cfg.Enabled {
Expand All @@ -217,7 +219,7 @@ func GetEnabledObserverSet() ObserverSet {
delete(set, name)
}

observers := make(ObserverSet, 0, len(observersMap))
observers := make(ObserverSet, 0, len(set))
for name := range set {
observers = append(observers, observersMap[name])
}
Expand Down
Loading
Loading