Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions pkg/clickhouse/router/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type Engine struct {
// produces multiple.
routesByEvent map[xatu.Event_Name][]route.Route

// disabledByConfig marks event names that had at least one registered
// route before config-driven filtering removed all of them. These are
// dropped (StatusDelivered) rather than NAK'd at Route() time.
disabledByConfig map[xatu.Event_Name]struct{}

metrics *telemetry.Metrics
logSampler *telemetry.LogSampler
}
Expand All @@ -44,34 +49,55 @@ func New(
log logrus.FieldLogger,
routes []route.Route,
disabledEvents []xatu.Event_Name,
disabledTables map[string]struct{},
metrics *telemetry.Metrics,
) *Engine {
r := &Engine{
log: log.WithField("component", "router"),
routesByEvent: make(map[xatu.Event_Name][]route.Route, len(routes)),
metrics: metrics,
logSampler: telemetry.NewLogSampler(logSampleInterval),
log: log.WithField("component", "router"),
routesByEvent: make(map[xatu.Event_Name][]route.Route, len(routes)),
disabledByConfig: make(map[xatu.Event_Name]struct{}, len(disabledEvents)),
metrics: metrics,
logSampler: telemetry.NewLogSampler(logSampleInterval),
}

disabled := make(map[xatu.Event_Name]struct{}, len(disabledEvents))
for _, name := range disabledEvents {
disabled[name] = struct{}{}
}

// Register routes by event name.
// Register routes by event name, skipping routes whose target table is
// disabled or whose event is disabled. Event names that had any route
// before filtering are recorded in disabledByConfig so they are dropped
// (not NAK'd) at Route() time when all their routes are filtered out.
for _, route := range routes {
tableDisabled := false
if _, ok := disabledTables[route.TableName()]; ok {
tableDisabled = true
}

for _, name := range route.EventNames() {
if _, isDisabled := disabled[name]; isDisabled {
_, eventDisabled := disabled[name]

if tableDisabled || eventDisabled {
r.disabledByConfig[name] = struct{}{}

continue
}

r.routesByEvent[name] = append(r.routesByEvent[name], route)
}
}

// An event that had at least one surviving route is not disabled —
// drop it from disabledByConfig so Route() does not short-circuit it.
for name := range r.routesByEvent {
delete(r.disabledByConfig, name)
}

// Log registration summary
log.WithField("registered_events", len(r.routesByEvent)).
WithField("disabled_events", len(disabled)).
WithField("disabled_tables", len(disabledTables)).
Info("Routing engine initialized")

return r
Expand All @@ -93,6 +119,23 @@ func (r *Engine) Route(event *xatu.DecoratedEvent) Outcome {
// matching route is deployed.
routesForEvent, ok := r.routesByEvent[eventName]
if !ok {
if _, disabledByConfig := r.disabledByConfig[eventName]; disabledByConfig {
if r.metrics != nil {
r.metrics.MessagesDropped().WithLabelValues(eventName.String(), "disabled_by_config").Inc()
}

if ok, suppressed := r.logSampler.Allow("config_drop:" + eventName.String()); ok {
entry := r.log.WithField("event_name", eventName.String())
if suppressed > 0 {
entry = entry.WithField("suppressed", suppressed)
}

entry.Debug("Event has no enabled routes after config filtering — dropping")
}

return Outcome{Status: StatusDelivered}
}

if reason, intentionallyUnsupported := route.UnsupportedReason(eventName); intentionallyUnsupported {
if r.metrics != nil {
r.metrics.MessagesDropped().WithLabelValues(eventName.String(), "no_flattener").Inc()
Expand Down
95 changes: 91 additions & 4 deletions pkg/clickhouse/router/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/stretchr/testify/require"
)

const (
testEventID = "e1"
testTableLibp2pPeer = "libp2p_peer"
testTableLibp2pConnect = "libp2p_connected"
)

type filterTestRoute struct {
table route.TableName
events []xatu.Event_Name
Expand Down Expand Up @@ -55,6 +61,7 @@ func TestNewRouterSkipsDisabledEvents(t *testing.T) {
logrus.New(),
routes,
[]xatu.Event_Name{xatu.Event_LIBP2P_TRACE_CONNECTED},
nil,
newTestMetrics(),
)

Expand All @@ -67,6 +74,86 @@ func TestNewRouterSkipsDisabledEvents(t *testing.T) {
require.Equal(t, "table_b", disconnectedRoutes[0].TableName())
}

func TestNewRouterSkipsDisabledTables(t *testing.T) {
routes := []route.Route{
filterTestRoute{
table: route.TableName(testTableLibp2pPeer),
events: []xatu.Event_Name{xatu.Event_LIBP2P_TRACE_CONNECTED, xatu.Event_LIBP2P_TRACE_DISCONNECTED},
},
filterTestRoute{
table: route.TableName(testTableLibp2pConnect),
events: []xatu.Event_Name{xatu.Event_LIBP2P_TRACE_CONNECTED},
},
}

router := New(
logrus.New(),
routes,
nil,
map[string]struct{}{testTableLibp2pPeer: {}},
newTestMetrics(),
)

require.Contains(t, router.routesByEvent, xatu.Event_LIBP2P_TRACE_CONNECTED)
require.Len(t, router.routesByEvent[xatu.Event_LIBP2P_TRACE_CONNECTED], 1)
require.Equal(t, testTableLibp2pConnect, router.routesByEvent[xatu.Event_LIBP2P_TRACE_CONNECTED][0].TableName())
require.NotContains(t, router.routesByEvent, xatu.Event_LIBP2P_TRACE_DISCONNECTED)
}

func TestRouteEventWithAllTablesDisabledIsDropped(t *testing.T) {
routes := []route.Route{
filterTestRoute{
table: route.TableName(testTableLibp2pPeer),
events: []xatu.Event_Name{xatu.Event_LIBP2P_TRACE_DISCONNECTED},
},
}

router := New(
logrus.New(),
routes,
nil,
map[string]struct{}{testTableLibp2pPeer: {}},
newTestMetrics(),
)

outcome := router.Route(&xatu.DecoratedEvent{
Event: &xatu.Event{
Id: testEventID,
Name: xatu.Event_LIBP2P_TRACE_DISCONNECTED,
},
})

require.Equal(t, StatusDelivered, outcome.Status)
require.Empty(t, outcome.Results)
}

func TestRouteDisabledEventIsDropped(t *testing.T) {
routes := []route.Route{
filterTestRoute{
table: route.TableName(testTableLibp2pConnect),
events: []xatu.Event_Name{xatu.Event_LIBP2P_TRACE_CONNECTED},
},
}

router := New(
logrus.New(),
routes,
[]xatu.Event_Name{xatu.Event_LIBP2P_TRACE_CONNECTED},
nil,
newTestMetrics(),
)

outcome := router.Route(&xatu.DecoratedEvent{
Event: &xatu.Event{
Id: testEventID,
Name: xatu.Event_LIBP2P_TRACE_CONNECTED,
},
})

require.Equal(t, StatusDelivered, outcome.Status)
require.Empty(t, outcome.Results)
}

func TestRouteIntentionallyUnsupportedEventIsDropped(t *testing.T) {
routes := []route.Route{
filterTestRoute{
Expand All @@ -75,11 +162,11 @@ func TestRouteIntentionallyUnsupportedEventIsDropped(t *testing.T) {
},
}

router := New(logrus.New(), routes, nil, newTestMetrics())
router := New(logrus.New(), routes, nil, nil, newTestMetrics())

outcome := router.Route(&xatu.DecoratedEvent{
Event: &xatu.Event{
Id: "e1",
Id: testEventID,
Name: xatu.Event_BEACON_API_ETH_V1_DEBUG_FORK_CHOICE_V2,
},
})
Expand All @@ -96,11 +183,11 @@ func TestRouteUnknownEventIsNAKed(t *testing.T) {
},
}

router := New(logrus.New(), routes, nil, newTestMetrics())
router := New(logrus.New(), routes, nil, nil, newTestMetrics())

outcome := router.Route(&xatu.DecoratedEvent{
Event: &xatu.Event{
Id: "e1",
Id: testEventID,
Name: xatu.Event_Name(999_999),
},
})
Expand Down
38 changes: 38 additions & 0 deletions pkg/consumoor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ type Config struct {

// DisabledEvents is a list of event names to drop without processing.
DisabledEvents []string `yaml:"disabledEvents"`

// DisabledTables is a list of ClickHouse table names whose routes should
// be skipped. Events that route to these tables are not flattened or
// written. Events that route to both a disabled and an enabled table
// continue to be written to the enabled table.
DisabledTables []string `yaml:"disabledTables"`
}

// Validate checks the configuration for errors.
Expand All @@ -47,6 +53,38 @@ func (c *Config) Validate() error {
return err
}

if err := c.validateDisabledTables(); err != nil {
return err
}

return nil
}

// DisabledTableSet returns the configured disabled tables as a set for
// fast membership lookup. Empty strings are ignored.
func (c *Config) DisabledTableSet() map[string]struct{} {
out := make(map[string]struct{}, len(c.DisabledTables))

for _, name := range c.DisabledTables {
trimmed := strings.TrimSpace(name)
if trimmed == "" {
continue
}

out[trimmed] = struct{}{}
}

return out
}

// validateDisabledTables rejects empty entries so typos surface at startup.
func (c *Config) validateDisabledTables() error {
for i, name := range c.DisabledTables {
if strings.TrimSpace(name) == "" {
return fmt.Errorf("disabledTables[%d] is empty", i)
}
}

return nil
}

Expand Down
37 changes: 37 additions & 0 deletions pkg/consumoor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,43 @@ func TestDisabledEventEnums(t *testing.T) {
})
}

func TestDisabledTablesValidation(t *testing.T) {
mkConfig := func(disabled []string) *Config {
return &Config{
MetricsAddr: ":9090",
Kafka: *validKafkaConfig(),
ClickHouse: *validClickHouseConfig(),
DisabledTables: disabled,
}
}

t.Run("accepts empty list", func(t *testing.T) {
require.NoError(t, mkConfig(nil).Validate())
})

t.Run("accepts non-empty entries", func(t *testing.T) {
require.NoError(t, mkConfig([]string{testTableLibp2pPeer}).Validate())
})

t.Run("rejects empty entry", func(t *testing.T) {
err := mkConfig([]string{testTableLibp2pPeer, " "}).Validate()
require.Error(t, err)
assert.Contains(t, err.Error(), "disabledTables[1]")
})
}

func TestDisabledTableSet(t *testing.T) {
cfg := &Config{
DisabledTables: []string{testTableLibp2pPeer, " " + testTableLibp2pConnect + " ", ""},
}

got := cfg.DisabledTableSet()
assert.Equal(t, map[string]struct{}{
testTableLibp2pPeer: {},
testTableLibp2pConnect: {},
}, got)
}

func TestClickHouseConfigValidateChGo(t *testing.T) {
t.Run("accepts valid ch-go settings", func(t *testing.T) {
cfg := validClickHouseConfig()
Expand Down
Loading
Loading