Skip to content
This repository was archived by the owner on Apr 25, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions sinks/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type CortexMetricSink struct {
}

var _ sinks.MetricSink = (*CortexMetricSink)(nil)
var DoIfNotDoneError = errors.New("context finished before completing metrics flush")

@andrewa-stripe andrewa-stripe Dec 18, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New also records the stack trace at the point it was called

errors.New should not be cached for usages

If error checking is necessary, we should introduce an error type and use errors.Is


type BasicAuthType struct {
Username util.StringSecret `yaml:"username"`
Expand Down Expand Up @@ -222,7 +223,7 @@ func (s *CortexMetricSink) Flush(ctx context.Context, metrics []samplers.InterMe
doIfNotDone := func(fn func() error) error {
select {
case <-ctx.Done():
return errors.New("context finished before completing metrics flush")
return DoIfNotDoneError
default:
return fn()
}
Expand All @@ -235,19 +236,24 @@ func (s *CortexMetricSink) Flush(ctx context.Context, metrics []samplers.InterMe
if len(batch)%s.batchWriteSize == 0 {
err := s.writeMetrics(ctx, batch)
if err != nil {
return err
s.logger.Error(err)
droppedMetrics += len(batch)
} else {
flushedMetrics += len(batch)
}

flushedMetrics += len(batch)
batch = []samplers.InterMetric{}
return err
}

return nil
})

if err != nil {
s.logger.Error(err)
droppedMetrics += len(metrics) - flushedMetrics
// Any other type of error will happen inside of doIfNotDone
// so we only need ot handle context cancellations

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// so we only need ot handle context cancellations
// so we only need to handle context cancellations

if err == DoIfNotDoneError {
// Don't bother with the rest of the loop, we're done.
droppedMetrics = len(metrics) - flushedMetrics
return sinks.MetricFlushResult{MetricsFlushed: flushedMetrics, MetricsDropped: droppedMetrics}, err
}
}
Expand Down
47 changes: 43 additions & 4 deletions sinks/cortex/cortex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func TestChunkedWritesRespectContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
requestCount := 0

server.onRequest(func() {
server.onRequest(func(w http.ResponseWriter, r *http.Request) {
requestCount++
if requestCount == 2 {
cancel()
Expand Down Expand Up @@ -418,6 +418,45 @@ func TestMetricsGetEmittedWithHostTag(t *testing.T) {
}
}

func TestAllBatchesAreAttemptedEvenIfSomeFail(t *testing.T) {
// Listen for prometheus writes
server := NewTestServer(t)
defer server.Close()

// Set up a sink
sink, err := NewCortexMetricSink(server.URL, 30*time.Second, "", logrus.NewEntry(logrus.New()), "test", map[string]string{}, nil, 3, false, "")
assert.NoError(t, err)
assert.NoError(t, sink.Start(trace.DefaultClient))

// chunked_input.json contains 12 timeseries samples in InterMetrics format
jsInput, err := ioutil.ReadFile("testdata/chunked_input.json")
assert.NoError(t, err)
var metrics []samplers.InterMetric
assert.NoError(t, json.Unmarshal(jsInput, &metrics))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
requestCount := 0

server.onRequest(func(w http.ResponseWriter, r *http.Request) {
requestCount++
if requestCount == 2 {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("an error!"))
}
})

// Perform the flush to the test server
flushResult, err := sink.Flush(ctx, metrics)
assert.NoError(t, err)
assert.Equal(t, sinks.MetricFlushResult{MetricsFlushed: 9, MetricsDropped: 3, MetricsSkipped: 0}, flushResult)

// ensure we see all chunks written
assert.Equal(t, 4, len(server.History()))
assert.Equal(t, 3, len(server.History()[0].data.GetTimeseries()))
assert.Equal(t, 3, len(server.History()[1].data.GetTimeseries()))
}

func TestCustomHeaders(t *testing.T) {
// Listen for prometheus writes
server := NewTestServer(t)
Expand Down Expand Up @@ -667,7 +706,7 @@ type TestServer struct {
data *prompb.WriteRequest
server *httptest.Server
history []*RequestHistory
requestFn func()
requestFn func(w http.ResponseWriter, r *http.Request)
}

// Close closes the internal test server
Expand All @@ -687,7 +726,7 @@ func (t *TestServer) History() []*RequestHistory {
return t.history
}

func (t *TestServer) onRequest(fn func()) {
func (t *TestServer) onRequest(fn func(w http.ResponseWriter, r *http.Request)) {
t.requestFn = fn
}

Expand Down Expand Up @@ -721,7 +760,7 @@ func NewTestServer(t *testing.T) *TestServer {
})

if result.requestFn != nil {
result.requestFn()
result.requestFn(w, r)
}
})

Expand Down