diff --git a/sinks/cortex/cortex.go b/sinks/cortex/cortex.go index 7a97ca2f2..ffddae882 100644 --- a/sinks/cortex/cortex.go +++ b/sinks/cortex/cortex.go @@ -59,6 +59,7 @@ type CortexMetricSink struct { } var _ sinks.MetricSink = (*CortexMetricSink)(nil) +var DoIfNotDoneError = errors.New("context finished before completing metrics flush") type BasicAuthType struct { Username util.StringSecret `yaml:"username"` @@ -222,7 +223,7 @@ func (s *CortexMetricSink) Flush(ctx context.Context, metrics []samplers.InterMe doIfNotDone := func(fn func() error) error { select { case <-ctx.Done(): - return errors.New("context finished before completing metrics flush") + return DoIfNotDoneError default: return fn() } @@ -235,19 +236,24 @@ func (s *CortexMetricSink) Flush(ctx context.Context, metrics []samplers.InterMe if len(batch)%s.batchWriteSize == 0 { err := s.writeMetrics(ctx, batch) if err != nil { - return err + s.logger.Error(err) + droppedMetrics += len(batch) + } else { + flushedMetrics += len(batch) } - flushedMetrics += len(batch) batch = []samplers.InterMetric{} + return err } return nil }) - if err != nil { - s.logger.Error(err) - droppedMetrics += len(metrics) - flushedMetrics + // Any other type of error will happen inside of doIfNotDone + // so we only need ot handle context cancellations + if err == DoIfNotDoneError { + // Don't bother with the rest of the loop, we're done. + droppedMetrics = len(metrics) - flushedMetrics return sinks.MetricFlushResult{MetricsFlushed: flushedMetrics, MetricsDropped: droppedMetrics}, err } } diff --git a/sinks/cortex/cortex_test.go b/sinks/cortex/cortex_test.go index 09e8369cb..1ae484f2a 100644 --- a/sinks/cortex/cortex_test.go +++ b/sinks/cortex/cortex_test.go @@ -360,7 +360,7 @@ func TestChunkedWritesRespectContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) requestCount := 0 - server.onRequest(func() { + server.onRequest(func(w http.ResponseWriter, r *http.Request) { requestCount++ if requestCount == 2 { cancel() @@ -418,6 +418,45 @@ func TestMetricsGetEmittedWithHostTag(t *testing.T) { } } +func TestAllBatchesAreAttemptedEvenIfSomeFail(t *testing.T) { + // Listen for prometheus writes + server := NewTestServer(t) + defer server.Close() + + // Set up a sink + sink, err := NewCortexMetricSink(server.URL, 30*time.Second, "", logrus.NewEntry(logrus.New()), "test", map[string]string{}, nil, 3, false, "") + assert.NoError(t, err) + assert.NoError(t, sink.Start(trace.DefaultClient)) + + // chunked_input.json contains 12 timeseries samples in InterMetrics format + jsInput, err := ioutil.ReadFile("testdata/chunked_input.json") + assert.NoError(t, err) + var metrics []samplers.InterMetric + assert.NoError(t, json.Unmarshal(jsInput, &metrics)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + requestCount := 0 + + server.onRequest(func(w http.ResponseWriter, r *http.Request) { + requestCount++ + if requestCount == 2 { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("an error!")) + } + }) + + // Perform the flush to the test server + flushResult, err := sink.Flush(ctx, metrics) + assert.NoError(t, err) + assert.Equal(t, sinks.MetricFlushResult{MetricsFlushed: 9, MetricsDropped: 3, MetricsSkipped: 0}, flushResult) + + // ensure we see all chunks written + assert.Equal(t, 4, len(server.History())) + assert.Equal(t, 3, len(server.History()[0].data.GetTimeseries())) + assert.Equal(t, 3, len(server.History()[1].data.GetTimeseries())) +} + func TestCustomHeaders(t *testing.T) { // Listen for prometheus writes server := NewTestServer(t) @@ -667,7 +706,7 @@ type TestServer struct { data *prompb.WriteRequest server *httptest.Server history []*RequestHistory - requestFn func() + requestFn func(w http.ResponseWriter, r *http.Request) } // Close closes the internal test server @@ -687,7 +726,7 @@ func (t *TestServer) History() []*RequestHistory { return t.history } -func (t *TestServer) onRequest(fn func()) { +func (t *TestServer) onRequest(fn func(w http.ResponseWriter, r *http.Request)) { t.requestFn = fn } @@ -721,7 +760,7 @@ func NewTestServer(t *testing.T) *TestServer { }) if result.requestFn != nil { - result.requestFn() + result.requestFn(w, r) } })