Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374
* [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442
* [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489
* [ENHANCEMENT] Memberlist: Add `-memberlist.packet-read-timeout`, `-memberlist.max-packet-size`, and `-memberlist.max-concurrent-connections` flags to bound inbound gossip TCP connections, preventing slow-read, OOM, and connection-flood attacks on the gossip port. #7518
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4692,6 +4692,18 @@ The `memberlist_config` configures the Gossip memberlist.
# CLI flag: -memberlist.packet-write-timeout
[packet_write_timeout: <duration> | default = 5s]

# Timeout for reading packet data from inbound connections. 0 = no limit.
# CLI flag: -memberlist.packet-read-timeout
[packet_read_timeout: <duration> | default = 5s]

# Maximum size in bytes of an inbound gossip packet. 0 = no limit.
# CLI flag: -memberlist.max-packet-size
[max_packet_size: <int> | default = 1048576]

# Maximum number of concurrent inbound TCP connections. 0 = no limit.
# CLI flag: -memberlist.max-concurrent-connections
[max_concurrent_connections: <int> | default = 100]

# Enable TLS on the memberlist transport layer.
# CLI flag: -memberlist.tls-enabled
[tls_enabled: <boolean> | default = false]
Expand Down
78 changes: 75 additions & 3 deletions pkg/ring/kv/memberlist/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"

"github.com/cortexproject/cortex/pkg/util/flagext"
cortextls "github.com/cortexproject/cortex/pkg/util/tls"
Expand Down Expand Up @@ -50,6 +51,15 @@ type TCPTransportConfig struct {
// Timeout for writing packet data. Zero = no timeout.
PacketWriteTimeout time.Duration `yaml:"packet_write_timeout"`

// Timeout for reading inbound packet data. Zero = no timeout.
PacketReadTimeout time.Duration `yaml:"packet_read_timeout"`

// Maximum size in bytes of a single inbound packet. Zero = no limit.
MaxPacketSize int64 `yaml:"max_packet_size"`

// Maximum number of concurrent inbound TCP connections. Zero = no limit.
MaxConcurrentConnections int `yaml:"max_concurrent_connections"`

// Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on
TransportDebug bool `yaml:"-"`

Expand All @@ -72,6 +82,9 @@ func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix s
f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.")
f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 5*time.Second, "Timeout used when connecting to other nodes to send packet.")
f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.")
f.DurationVar(&cfg.PacketReadTimeout, prefix+"memberlist.packet-read-timeout", 5*time.Second, "Timeout for reading packet data from inbound connections. 0 = no limit.")
f.Int64Var(&cfg.MaxPacketSize, prefix+"memberlist.max-packet-size", 1*1024*1024 /*1MB*/, "Maximum size in bytes of an inbound gossip packet. 0 = no limit.")
f.IntVar(&cfg.MaxConcurrentConnections, prefix+"memberlist.max-concurrent-connections", 100, "Maximum number of concurrent inbound TCP connections. 0 = no limit.")
Comment on lines +85 to +87
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add metrics for those? Feel that will be hard to know what is the correct number to set.

f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.")

f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.")
Expand All @@ -90,6 +103,9 @@ type TCPTransport struct {
tcpListeners []net.Listener
tlsConfig *tls.Config

// connSemaphore limits the number of concurrent inbound TCP connections.
connSemaphore *semaphore.Weighted

shutdown atomic.Int32

advertiseMu sync.RWMutex
Expand All @@ -107,6 +123,7 @@ type TCPTransport struct {
sentPacketsBytes prometheus.Counter
sentPacketsErrors prometheus.Counter
unknownConnections prometheus.Counter
rejectedConnections prometheus.Counter
}

// NewTCPTransport returns a new tcp-based transport with the given configuration. On
Expand All @@ -125,6 +142,10 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
connCh: make(chan net.Conn),
}

if config.MaxConcurrentConnections > 0 {
t.connSemaphore = semaphore.NewWeighted(int64(config.MaxConcurrentConnections))
}

var err error
if config.TLSEnabled {
t.tlsConfig, err = config.TLS.GetTLSConfig()
Expand Down Expand Up @@ -222,7 +243,24 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) {
// No error, reset loop delay
loopDelay = 0

go t.handleConnection(conn)
// Enforce concurrent connection via semaphore.
if t.connSemaphore != nil {
if !t.connSemaphore.TryAcquire(1) {
t.rejectedConnections.Inc()
level.Warn(t.logger).Log("msg", "max concurrent connections reached, closing connection", "remote", conn.RemoteAddr())
Comment thread
friedrichg marked this conversation as resolved.
Outdated
_ = conn.Close()
continue
}
}

go func() {
defer func() {
if t.connSemaphore != nil {
t.connSemaphore.Release(1)
}
}()
t.handleConnection(conn)
}()
}
}

Expand All @@ -245,6 +283,15 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
}
}()

// Apply a read deadline for the entire packet receive so that a slow or
// adversarial peer cannot hold the goroutine open indefinitely.
if t.cfg.PacketReadTimeout > 0 {
if err := conn.SetReadDeadline(time.Now().Add(t.cfg.PacketReadTimeout)); err != nil {
level.Warn(t.logger).Log("msg", "failed to set read deadline", "err", err, "remote", conn.RemoteAddr())
return
}
}

// let's read first byte, and determine what to do about this connection
msgType := []byte{0}
_, err := io.ReadFull(conn, msgType)
Expand All @@ -256,6 +303,13 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
if messageType(msgType[0]) == stream {
t.incomingStreams.Inc()

// Stream connections are handed off to memberlist which manages them
// independently – clear the deadline so memberlist can use its own
// timeouts, then pass the connection over.
if t.cfg.PacketReadTimeout > 0 {
_ = conn.SetReadDeadline(time.Time{})
}

// hand over this connection to memberlist
closeConn = false
t.connCh <- conn
Expand All @@ -280,14 +334,25 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
return
}

// read the rest to buffer -- this is the "packet" itself
buf, err := io.ReadAll(conn)
var reader io.Reader = conn
if t.cfg.MaxPacketSize > 0 {
// Read one byte beyond the limit so we can detect oversized packets.
reader = io.LimitReader(conn, t.cfg.MaxPacketSize+1)
}
buf, err := io.ReadAll(reader)
if err != nil {
t.receivedPacketsErrors.Inc()
level.Warn(t.logger).Log("msg", "error while reading packet data", "err", err, "remote", conn.RemoteAddr())
return
}

// Reject oversized packets
if t.cfg.MaxPacketSize > 0 && int64(len(buf)) > t.cfg.MaxPacketSize {
t.receivedPacketsErrors.Inc()
level.Warn(t.logger).Log("msg", "packet too large, dropping", "size", len(buf), "max", t.cfg.MaxPacketSize, "remote", conn.RemoteAddr())
Comment thread
friedrichg marked this conversation as resolved.
Outdated
return
}

if len(buf) < md5.Size {
t.receivedPacketsErrors.Inc()
level.Warn(t.logger).Log("msg", "not enough data received", "data_length", len(buf), "remote", conn.RemoteAddr())
Expand Down Expand Up @@ -634,4 +699,11 @@ func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) {
Name: "unknown_connections_total",
Help: "Number of unknown TCP connections (not a packet or stream)",
})

t.rejectedConnections = promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: t.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "rejected_connections_total",
Help: "Number of inbound TCP connections rejected because the concurrent connection limit was reached",
})
}
143 changes: 143 additions & 0 deletions pkg/ring/kv/memberlist/tcp_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/md5"
"fmt"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -117,3 +118,145 @@ func TestTCPTransport_PacketDigestMismatch(t *testing.T) {

assert.Contains(t, logs.String(), "packet digest mismatch")
}

func TestTCPTransport_PacketReadTimeout(t *testing.T) {
logger := log.NewNopLogger()

cfg := TCPTransportConfig{}
flagext.DefaultValues(&cfg)
cfg.BindAddrs = []string{"127.0.0.1"}
cfg.BindPort = 0
cfg.PacketReadTimeout = 200 * time.Millisecond

transport, err := NewTCPTransport(cfg, logger)
require.NoError(t, err)
defer transport.Shutdown() //nolint:errcheck

port := transport.GetAutoBindPort()
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
require.NoError(t, err)
defer conn.Close() //nolint:errcheck

// Send packet type byte and address header, then stall – never send payload.
ourAddr := "127.0.0.1:0"
var buf bytes.Buffer
buf.WriteByte(byte(packet))
buf.WriteByte(byte(len(ourAddr)))
buf.WriteString(ourAddr)
_, err = conn.Write(buf.Bytes())
require.NoError(t, err)

// The transport should close the connection after PacketReadTimeout.
// We verify this by trying to read from the conn; once the server side
// closes it due to the deadline, our Read should return an error.
conn.SetReadDeadline(time.Now().Add(2 * time.Second)) //nolint:errcheck
oneByte := make([]byte, 1)
_, readErr := conn.Read(oneByte)
assert.Error(t, readErr, "expected connection to be closed by server after read timeout")
}

func TestTCPTransport_MaxPacketSize(t *testing.T) {
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)

cfg := TCPTransportConfig{}
flagext.DefaultValues(&cfg)
cfg.BindAddrs = []string{"127.0.0.1"}
cfg.BindPort = 0
cfg.MaxPacketSize = 128

transport, err := NewTCPTransport(cfg, logger)
require.NoError(t, err)
defer transport.Shutdown() //nolint:errcheck

port := transport.GetAutoBindPort()
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
require.NoError(t, err)
defer conn.Close() //nolint:errcheck

// Build a packet that exceeds MaxPacketSize.
ourAddr := "127.0.0.1:0"
oversizedData := make([]byte, int(cfg.MaxPacketSize)+64)
digest := md5.Sum(oversizedData)

var buf bytes.Buffer
buf.WriteByte(byte(packet))
buf.WriteByte(byte(len(ourAddr)))
buf.WriteString(ourAddr)
buf.Write(oversizedData)
buf.Write(digest[:])

_, err = conn.Write(buf.Bytes())
require.NoError(t, err)
conn.Close() //nolint:errcheck

// Packet should be dropped; nothing must arrive on packetCh.
select {
case <-transport.PacketCh():
t.Fatal("oversized packet should have been dropped")
case <-time.After(500 * time.Millisecond):
// success
}

assert.Contains(t, logs.String(), "packet too large")
}

func TestTCPTransport_MaxConcurrentConnections(t *testing.T) {
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)

const maxConns = 3

cfg := TCPTransportConfig{}
flagext.DefaultValues(&cfg)
cfg.BindAddrs = []string{"127.0.0.1"}
cfg.BindPort = 0
cfg.PacketReadTimeout = 5 * time.Second
cfg.MaxConcurrentConnections = maxConns

transport, err := NewTCPTransport(cfg, logger)
require.NoError(t, err)
defer transport.Shutdown() //nolint:errcheck

port := transport.GetAutoBindPort()

openSlowConn := func() net.Conn {
c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
require.NoError(t, err)
// Send packet type byte to enter the packet branch, then stall.
_, err = c.Write([]byte{byte(packet)})
require.NoError(t, err)
return c
}

// Fill up the semaphore.
holders := make([]net.Conn, maxConns)
for i := range maxConns {
holders[i] = openSlowConn()
}
defer func() {
for _, c := range holders {
c.Close() //nolint:errcheck
}
}()

// Give goroutines a moment to acquire the semaphore slots.
time.Sleep(100 * time.Millisecond)
Comment thread
friedrichg marked this conversation as resolved.
Outdated

// This extra connection should be rejected.
var wg sync.WaitGroup
wg.Go(func() {
extra, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if err != nil {
return // connection may be refused outright
}
defer extra.Close() //nolint:errcheck
// Try to read; the server should close immediately.
extra.SetReadDeadline(time.Now().Add(time.Second)) //nolint:errcheck
buf := make([]byte, 1)
extra.Read(buf) //nolint:errcheck
})
wg.Wait()

assert.Contains(t, logs.String(), "max concurrent connections reached")
}
19 changes: 19 additions & 0 deletions schemas/cortex-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -5728,6 +5728,12 @@
"x-cli-flag": "memberlist.left-ingesters-timeout",
"x-format": "duration"
},
"max_concurrent_connections": {
"default": 100,
"description": "Maximum number of concurrent inbound TCP connections. 0 = no limit.",
"type": "number",
"x-cli-flag": "memberlist.max-concurrent-connections"
},
"max_join_backoff": {
"default": "1m0s",
"description": "Max backoff duration to join other cluster members.",
Expand All @@ -5741,6 +5747,12 @@
"type": "number",
"x-cli-flag": "memberlist.max-join-retries"
},
"max_packet_size": {
"default": 1048576,
"description": "Maximum size in bytes of an inbound gossip packet. 0 = no limit.",
"type": "number",
"x-cli-flag": "memberlist.max-packet-size"
},
"message_history_buffer_bytes": {
"default": 0,
"description": "How much space to use for keeping received and sent messages in memory for troubleshooting (two buffers). 0 to disable.",
Expand All @@ -5766,6 +5778,13 @@
"x-cli-flag": "memberlist.packet-dial-timeout",
"x-format": "duration"
},
"packet_read_timeout": {
"default": "5s",
"description": "Timeout for reading packet data from inbound connections. 0 = no limit.",
"type": "string",
"x-cli-flag": "memberlist.packet-read-timeout",
"x-format": "duration"
},
"packet_write_timeout": {
"default": "5s",
"description": "Timeout for writing 'packet' data.",
Expand Down
Loading