Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374
* [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442
* [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489
* [ENHANCEMENT] Memberlist: Add `-memberlist.packet-read-timeout`, `-memberlist.max-packet-size`, and `-memberlist.max-concurrent-connections` flags to bound inbound gossip TCP connections, preventing slow-read, OOM, and connection-flood attacks on the gossip port. #7518
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4692,6 +4692,18 @@ The `memberlist_config` configures the Gossip memberlist.
# CLI flag: -memberlist.packet-write-timeout
[packet_write_timeout: <duration> | default = 5s]

# Timeout for reading packet data from inbound connections. 0 = no limit.
# CLI flag: -memberlist.packet-read-timeout
[packet_read_timeout: <duration> | default = 5s]

# Maximum size in bytes of an inbound gossip packet. 0 = no limit.
# CLI flag: -memberlist.max-packet-size
[max_packet_size: <int> | default = 1048576]

# Maximum number of concurrent inbound TCP connections. 0 = no limit.
# CLI flag: -memberlist.max-concurrent-connections
[max_concurrent_connections: <int> | default = 100]

# Enable TLS on the memberlist transport layer.
# CLI flag: -memberlist.tls-enabled
[tls_enabled: <boolean> | default = false]
Expand Down
107 changes: 101 additions & 6 deletions pkg/ring/kv/memberlist/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"

"github.com/cortexproject/cortex/pkg/util/flagext"
cortextls "github.com/cortexproject/cortex/pkg/util/tls"
Expand Down Expand Up @@ -50,6 +51,15 @@ type TCPTransportConfig struct {
// Timeout for writing packet data. Zero = no timeout.
PacketWriteTimeout time.Duration `yaml:"packet_write_timeout"`

// Timeout for reading inbound packet data. Zero = no timeout.
PacketReadTimeout time.Duration `yaml:"packet_read_timeout"`

// Maximum size in bytes of a single inbound packet. Zero = no limit.
MaxPacketSize int64 `yaml:"max_packet_size"`

// Maximum number of concurrent inbound TCP connections. Zero = no limit.
MaxConcurrentConnections int `yaml:"max_concurrent_connections"`

// Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on
TransportDebug bool `yaml:"-"`

Expand All @@ -72,6 +82,9 @@ func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix s
f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.")
f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 5*time.Second, "Timeout used when connecting to other nodes to send packet.")
f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.")
f.DurationVar(&cfg.PacketReadTimeout, prefix+"memberlist.packet-read-timeout", 5*time.Second, "Timeout for reading packet data from inbound connections. 0 = no limit.")
f.Int64Var(&cfg.MaxPacketSize, prefix+"memberlist.max-packet-size", 1*1024*1024 /*1MB*/, "Maximum size in bytes of an inbound gossip packet. 0 = no limit.")
f.IntVar(&cfg.MaxConcurrentConnections, prefix+"memberlist.max-concurrent-connections", 100, "Maximum number of concurrent inbound TCP connections. 0 = no limit.")
Comment on lines +85 to +87
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add metrics for those? Feel that will be hard to know what is the correct number to set.

f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.")

f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.")
Expand All @@ -90,6 +103,9 @@ type TCPTransport struct {
tcpListeners []net.Listener
tlsConfig *tls.Config

// connSemaphore limits the number of concurrent inbound TCP connections.
connSemaphore *semaphore.Weighted

shutdown atomic.Int32

advertiseMu sync.RWMutex
Expand All @@ -107,6 +123,7 @@ type TCPTransport struct {
sentPacketsBytes prometheus.Counter
sentPacketsErrors prometheus.Counter
unknownConnections prometheus.Counter
rejectedConnections prometheus.Counter
}

// NewTCPTransport returns a new tcp-based transport with the given configuration. On
Expand All @@ -125,6 +142,10 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
connCh: make(chan net.Conn),
}

if config.MaxConcurrentConnections > 0 {
t.connSemaphore = semaphore.NewWeighted(int64(config.MaxConcurrentConnections))
}

var err error
if config.TLSEnabled {
t.tlsConfig, err = config.TLS.GetTLSConfig()
Expand Down Expand Up @@ -222,7 +243,25 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) {
// No error, reset loop delay
loopDelay = 0

go t.handleConnection(conn)
// Enforce concurrent connection via semaphore.
if t.connSemaphore != nil {
if !t.connSemaphore.TryAcquire(1) {
t.rejectedConnections.Inc()
level.Debug(t.logger).Log("msg", "max concurrent connections reached, closing connection", "remote", conn.RemoteAddr())
_ = conn.Close()
continue
}
}

go func() {
// handleConnection returns true when it wrapped the conn in a
// semaphoreConn and transferred ownership of the slot to that
// wrapper (stream path). In that case we must not release here.
semTransferred := t.handleConnection(conn)
if t.connSemaphore != nil && !semTransferred {
t.connSemaphore.Release(1)
}
}()
}
}

Expand All @@ -235,7 +274,7 @@ func (t *TCPTransport) debugLog() log.Logger {
return noopLogger
}

func (t *TCPTransport) handleConnection(conn net.Conn) {
func (t *TCPTransport) handleConnection(conn net.Conn) (semTransferred bool) {
t.debugLog().Log("msg", "New connection", "addr", conn.RemoteAddr())

closeConn := true
Expand All @@ -245,6 +284,15 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
}
}()

// Apply a read deadline for the entire packet receive so that a slow or
// adversarial peer cannot hold the goroutine open indefinitely.
if t.cfg.PacketReadTimeout > 0 {
if err := conn.SetReadDeadline(time.Now().Add(t.cfg.PacketReadTimeout)); err != nil {
level.Warn(t.logger).Log("msg", "failed to set read deadline", "err", err, "remote", conn.RemoteAddr())
return
}
}

// let's read first byte, and determine what to do about this connection
msgType := []byte{0}
_, err := io.ReadFull(conn, msgType)
Expand All @@ -256,9 +304,23 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
if messageType(msgType[0]) == stream {
t.incomingStreams.Inc()

// hand over this connection to memberlist
// Stream connections are handed off to memberlist which manages them
// independently – clear the deadline so memberlist can use its own
// timeouts, then pass the connection over.
if t.cfg.PacketReadTimeout > 0 {
_ = conn.SetReadDeadline(time.Time{})
}

// hand over this connection to memberlist.
// If the semaphore is active, wrap the conn so that the slot is held
// for the real lifetime of the stream. The memberlist will close it.
closeConn = false
t.connCh <- conn
if t.connSemaphore != nil {
t.connCh <- &semaphoreConn{Conn: conn, sem: t.connSemaphore}
semTransferred = true
} else {
t.connCh <- conn
}
} else if messageType(msgType[0]) == packet {
// it's a memberlist "packet", which contains an address and data.
t.receivedPackets.Inc()
Expand All @@ -280,14 +342,25 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
return
}

// read the rest to buffer -- this is the "packet" itself
buf, err := io.ReadAll(conn)
var reader io.Reader = conn
if t.cfg.MaxPacketSize > 0 {
// Read one byte beyond the limit so we can detect oversized packets.
reader = io.LimitReader(conn, t.cfg.MaxPacketSize+1)
}
buf, err := io.ReadAll(reader)
if err != nil {
t.receivedPacketsErrors.Inc()
level.Warn(t.logger).Log("msg", "error while reading packet data", "err", err, "remote", conn.RemoteAddr())
return
}

// Reject oversized packets
if t.cfg.MaxPacketSize > 0 && int64(len(buf)) > t.cfg.MaxPacketSize {
t.receivedPacketsErrors.Inc()
level.Warn(t.logger).Log("msg", "packet too large, dropping", "size", len(buf), "max", t.cfg.MaxPacketSize, "remote", conn.RemoteAddr())
Comment thread
friedrichg marked this conversation as resolved.
Outdated
return
}

if len(buf) < md5.Size {
t.receivedPacketsErrors.Inc()
level.Warn(t.logger).Log("msg", "not enough data received", "data_length", len(buf), "remote", conn.RemoteAddr())
Expand Down Expand Up @@ -318,6 +391,7 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
t.unknownConnections.Inc()
level.Error(t.logger).Log("msg", "unknown message type", "msgType", msgType, "remote", conn.RemoteAddr())
}
return
}

type addr string
Expand All @@ -330,6 +404,20 @@ func (a addr) String() string {
return string(a)
}

// semaphoreConn wraps a net.Conn and releases a semaphore slot exactly once
// when the connection is closed. It is used on the stream path to keep the
// concurrent-connection slot held for the real lifetime of the connection.
type semaphoreConn struct {
net.Conn
sem *semaphore.Weighted
once sync.Once
}

func (c *semaphoreConn) Close() error {
c.once.Do(func() { c.sem.Release(1) })
return c.Conn.Close()
}

func (t *TCPTransport) getConnection(addr string, timeout time.Duration) (net.Conn, error) {
if t.cfg.TLSEnabled {
return tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", addr, t.tlsConfig)
Expand Down Expand Up @@ -634,4 +722,11 @@ func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) {
Name: "unknown_connections_total",
Help: "Number of unknown TCP connections (not a packet or stream)",
})

t.rejectedConnections = promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: t.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "rejected_connections_total",
Help: "Number of inbound TCP connections rejected because the concurrent connection limit was reached",
})
}
Loading
Loading