Skip to content

Commit 24b4822

Browse files
committed
[receiver/prometheusremotewrite] Handle NaN and overflow buckets for native histograms
1 parent 0e292e1 commit 24b4822

File tree

3 files changed

+239
-11
lines changed

3 files changed

+239
-11
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/prometheusremotewrite
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Properly handle NaN native histograms, and drop points associated with the native histogram overflow buckets.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [47728]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/prometheusremotewritereceiver/receiver.go

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"math"
1011
"net/http"
1112
"strings"
1213
"sync"
@@ -652,7 +653,20 @@ func (prw *prometheusRemoteWriteReceiver) addExponentialHistogramDatapoint(datap
652653
dp.SetZeroThreshold(histogram.ZeroThreshold)
653654

654655
// Set count and sum using common helper
655-
setCountAndSum(histogram, dp)
656+
if value.IsStaleNaN(histogram.Sum) {
657+
dp.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
658+
} else {
659+
setCountAndSum(histogram, dp)
660+
}
661+
662+
// The maximum bucket index is derived from the formula (2**2**-n)**i <= MaxFloat64.
663+
// MaxFloat64 is approx 2^1024. So (2^-n) * i <= 1024 => i <= 1024 * 2^n.
664+
// The bucket containing MaxFloat64 has index i_max = 1024 * 2^n.
665+
// The next bucket (i_max + 1) is the +Inf overflow bucket, which is also allowed.
666+
// Buckets beyond that must be dropped.
667+
// See https://prometheus.io/docs/specs/native_histograms/#schema for more information.
668+
overflowLimit := 1024*math.Pow(2, float64(histogram.Schema)) + 1
669+
var droppedCount uint64
656670

657671
// The difference between float and integer histograms is that float histograms are stored as absolute counts
658672
// while integer histograms are stored as deltas.
@@ -663,11 +677,11 @@ func (prw *prometheusRemoteWriteReceiver) addExponentialHistogramDatapoint(datap
663677

664678
if len(histogram.PositiveSpans) > 0 {
665679
dp.Positive().SetOffset(histogram.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
666-
convertAbsoluteBuckets(histogram.PositiveSpans, histogram.PositiveCounts, dp.Positive().BucketCounts())
680+
droppedCount += convertAbsoluteBuckets(histogram.PositiveSpans, histogram.PositiveCounts, dp.Positive().BucketCounts(), overflowLimit)
667681
}
668682
if len(histogram.NegativeSpans) > 0 {
669683
dp.Negative().SetOffset(histogram.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
670-
convertAbsoluteBuckets(histogram.NegativeSpans, histogram.NegativeCounts, dp.Negative().BucketCounts())
684+
droppedCount += convertAbsoluteBuckets(histogram.NegativeSpans, histogram.NegativeCounts, dp.Negative().BucketCounts(), overflowLimit)
671685
}
672686
} else {
673687
// Integer histograms
@@ -676,14 +690,18 @@ func (prw *prometheusRemoteWriteReceiver) addExponentialHistogramDatapoint(datap
676690

677691
if len(histogram.PositiveSpans) > 0 {
678692
dp.Positive().SetOffset(histogram.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
679-
convertDeltaBuckets(histogram.PositiveSpans, histogram.PositiveDeltas, dp.Positive().BucketCounts())
693+
droppedCount += convertDeltaBuckets(histogram.PositiveSpans, histogram.PositiveDeltas, dp.Positive().BucketCounts(), overflowLimit)
680694
}
681695
if len(histogram.NegativeSpans) > 0 {
682696
dp.Negative().SetOffset(histogram.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound
683-
convertDeltaBuckets(histogram.NegativeSpans, histogram.NegativeDeltas, dp.Negative().BucketCounts())
697+
droppedCount += convertDeltaBuckets(histogram.NegativeSpans, histogram.NegativeDeltas, dp.Negative().BucketCounts(), overflowLimit)
684698
}
685699
}
686700

701+
if droppedCount > 0 && !value.IsStaleNaN(histogram.Sum) {
702+
dp.SetCount(dp.Count() - droppedCount)
703+
}
704+
687705
attrs.CopyTo(dp.Attributes())
688706
stats.Histograms++
689707
}
@@ -738,7 +756,7 @@ func hasNegativeCounts(histogram *writev2.Histogram) bool {
738756

739757
// convertDeltaBuckets converts Prometheus native histogram spans and deltas to OpenTelemetry bucket counts
740758
// For integer buckets, the values are deltas between the buckets. i.e a bucket list of 1,2,-2 would correspond to a bucket count of 1,3,1
741-
func convertDeltaBuckets(spans []writev2.BucketSpan, deltas []int64, buckets pcommon.UInt64Slice) {
759+
func convertDeltaBuckets(spans []writev2.BucketSpan, deltas []int64, buckets pcommon.UInt64Slice, overflowLimit float64) uint64 {
742760
// The total capacity is the sum of the deltas and the offsets of the spans.
743761
totalCapacity := len(deltas)
744762
for _, span := range spans {
@@ -748,23 +766,37 @@ func convertDeltaBuckets(spans []writev2.BucketSpan, deltas []int64, buckets pco
748766

749767
bucketIdx := 0
750768
bucketCount := int64(0)
769+
var droppedCount uint64
770+
initialOffset := spans[0].Offset
771+
k := initialOffset
772+
751773
for spanIdx, span := range spans {
752774
if spanIdx > 0 {
753775
for i := int32(0); i < span.Offset; i++ {
754-
buckets.Append(uint64(0))
776+
if float64(k) <= overflowLimit {
777+
buckets.Append(uint64(0))
778+
}
779+
k++
755780
}
756781
}
757782
for i := uint32(0); i < span.Length; i++ {
758783
bucketCount += deltas[bucketIdx]
759784
bucketIdx++
760-
buckets.Append(uint64(bucketCount))
785+
786+
if float64(k) <= overflowLimit {
787+
buckets.Append(uint64(bucketCount))
788+
} else {
789+
droppedCount += uint64(bucketCount)
790+
}
791+
k++
761792
}
762793
}
794+
return droppedCount
763795
}
764796

765797
// convertAbsoluteBuckets converts Prometheus native histogram spans and absolute counts to OpenTelemetry bucket counts
766798
// For float buckets, the values are absolute counts, and must be 0 or positive.
767-
func convertAbsoluteBuckets(spans []writev2.BucketSpan, counts []float64, buckets pcommon.UInt64Slice) {
799+
func convertAbsoluteBuckets(spans []writev2.BucketSpan, counts []float64, buckets pcommon.UInt64Slice, overflowLimit float64) uint64 {
768800
// The total capacity is the sum of the counts and the offsets of the spans.
769801
totalCapacity := len(counts)
770802
for _, span := range spans {
@@ -773,17 +805,30 @@ func convertAbsoluteBuckets(spans []writev2.BucketSpan, counts []float64, bucket
773805
buckets.EnsureCapacity(totalCapacity)
774806

775807
bucketIdx := 0
808+
var droppedCount uint64
809+
initialOffset := spans[0].Offset
810+
k := initialOffset
811+
776812
for spanIdx, span := range spans {
777813
if spanIdx > 0 {
778814
for i := int32(0); i < span.Offset; i++ {
779-
buckets.Append(uint64(0))
815+
if float64(k) <= overflowLimit {
816+
buckets.Append(uint64(0))
817+
}
818+
k++
780819
}
781820
}
782821
for i := uint32(0); i < span.Length; i++ {
783-
buckets.Append(uint64(counts[bucketIdx]))
822+
if float64(k) <= overflowLimit {
823+
buckets.Append(uint64(counts[bucketIdx]))
824+
} else {
825+
droppedCount += uint64(counts[bucketIdx])
826+
}
784827
bucketIdx++
828+
k++
785829
}
786830
}
831+
return droppedCount
787832
}
788833

789834
// extractAttributes return all attributes different from job, instance, metric name and scope name/version

receiver/prometheusremotewritereceiver/receiver_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,6 +1677,162 @@ func TestTranslateV2(t *testing.T) {
16771677
Exemplars: 1,
16781678
},
16791679
},
1680+
{
1681+
name: "exponential histogram - stale NaN sum",
1682+
request: &writev2.Request{
1683+
Symbols: []string{
1684+
"",
1685+
"__name__", "test_metric", // 1, 2
1686+
"job", "service-x/test", // 3, 4
1687+
"instance", "107cn001", // 5, 6
1688+
"otel_scope_name", "scope1", // 7, 8
1689+
"otel_scope_version", "v1", // 9, 10
1690+
},
1691+
Timeseries: []writev2.TimeSeries{
1692+
{
1693+
Metadata: writev2.Metadata{
1694+
Type: writev2.Metadata_METRIC_TYPE_HISTOGRAM,
1695+
},
1696+
Histograms: []writev2.Histogram{
1697+
{
1698+
Count: &writev2.Histogram_CountInt{
1699+
CountInt: 20,
1700+
},
1701+
Sum: math.Float64frombits(value.StaleNaN),
1702+
Timestamp: 1,
1703+
StartTimestamp: 1,
1704+
ZeroThreshold: 1,
1705+
ZeroCount: &writev2.Histogram_ZeroCountInt{
1706+
ZeroCountInt: 2,
1707+
},
1708+
Schema: -4,
1709+
PositiveSpans: []writev2.BucketSpan{{Offset: 1, Length: 2}},
1710+
PositiveDeltas: []int64{10, 20},
1711+
},
1712+
},
1713+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
1714+
},
1715+
},
1716+
},
1717+
expectedStats: remote.WriteResponseStats{
1718+
Confirmed: true,
1719+
Samples: 0,
1720+
Histograms: 1,
1721+
Exemplars: 0,
1722+
},
1723+
expectedMetrics: func() pmetric.Metrics {
1724+
metrics := pmetric.NewMetrics()
1725+
rm := metrics.ResourceMetrics().AppendEmpty()
1726+
attrs := rm.Resource().Attributes()
1727+
attrs.PutStr("service.namespace", "service-x")
1728+
attrs.PutStr("service.name", "test")
1729+
attrs.PutStr("service.instance.id", "107cn001")
1730+
1731+
sm := rm.ScopeMetrics().AppendEmpty()
1732+
sm.Scope().SetName("scope1")
1733+
sm.Scope().SetVersion("v1")
1734+
1735+
m := sm.Metrics().AppendEmpty()
1736+
m.SetName("test_metric")
1737+
m.SetUnit("")
1738+
m.SetDescription("")
1739+
m.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "histogram")
1740+
1741+
hist := m.SetEmptyExponentialHistogram()
1742+
hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
1743+
1744+
dp := hist.DataPoints().AppendEmpty()
1745+
dp.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
1746+
dp.SetStartTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
1747+
dp.SetScale(-4)
1748+
dp.SetZeroCount(2)
1749+
dp.SetZeroThreshold(1)
1750+
dp.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true))
1751+
1752+
dp.Positive().SetOffset(0)
1753+
dp.Positive().BucketCounts().FromRaw([]uint64{10, 30})
1754+
1755+
return metrics
1756+
}(),
1757+
},
1758+
{
1759+
name: "exponential histogram - overflow buckets dropped",
1760+
// This test case verifies that the bucket with index 1026 is dropped because the limit is 1025 at scale 0 (1024 + 1 for +Inf bucket).
1761+
// Bucket 1025 is valid (+Inf bucket).
1762+
// The original count was 50. Bucket 1025 has count 10. Bucket 1026 has count 30 (10 + 20).
1763+
// Bucket 1026 overflows (> 1025) and is dropped.
1764+
// The total count is updated to 50 - 30 = 20.
1765+
request: &writev2.Request{
1766+
Symbols: []string{
1767+
"",
1768+
"__name__", "test_metric", // 1, 2
1769+
"job", "service-x/test", // 3, 4
1770+
"instance", "107cn001", // 5, 6
1771+
"otel_scope_name", "scope1", // 7, 8
1772+
"otel_scope_version", "v1", // 9, 10
1773+
},
1774+
Timeseries: []writev2.TimeSeries{
1775+
{
1776+
Metadata: writev2.Metadata{
1777+
Type: writev2.Metadata_METRIC_TYPE_HISTOGRAM,
1778+
},
1779+
Histograms: []writev2.Histogram{
1780+
{
1781+
Count: &writev2.Histogram_CountInt{
1782+
CountInt: 50,
1783+
},
1784+
Sum: 100,
1785+
Timestamp: 1,
1786+
StartTimestamp: 1,
1787+
Schema: 0,
1788+
PositiveSpans: []writev2.BucketSpan{{Offset: 1025, Length: 2}},
1789+
PositiveDeltas: []int64{10, 20},
1790+
},
1791+
},
1792+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
1793+
},
1794+
},
1795+
},
1796+
expectedStats: remote.WriteResponseStats{
1797+
Confirmed: true,
1798+
Samples: 0,
1799+
Histograms: 1,
1800+
Exemplars: 0,
1801+
},
1802+
expectedMetrics: func() pmetric.Metrics {
1803+
metrics := pmetric.NewMetrics()
1804+
rm := metrics.ResourceMetrics().AppendEmpty()
1805+
attrs := rm.Resource().Attributes()
1806+
attrs.PutStr("service.namespace", "service-x")
1807+
attrs.PutStr("service.name", "test")
1808+
attrs.PutStr("service.instance.id", "107cn001")
1809+
1810+
sm := rm.ScopeMetrics().AppendEmpty()
1811+
sm.Scope().SetName("scope1")
1812+
sm.Scope().SetVersion("v1")
1813+
1814+
m := sm.Metrics().AppendEmpty()
1815+
m.SetName("test_metric")
1816+
m.SetUnit("")
1817+
m.SetDescription("")
1818+
m.Metadata().PutStr(prometheus.MetricMetadataTypeKey, "histogram")
1819+
1820+
hist := m.SetEmptyExponentialHistogram()
1821+
hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
1822+
1823+
dp := hist.DataPoints().AppendEmpty()
1824+
dp.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
1825+
dp.SetStartTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
1826+
dp.SetScale(0)
1827+
dp.SetSum(100)
1828+
dp.SetCount(20)
1829+
1830+
dp.Positive().SetOffset(1024)
1831+
dp.Positive().BucketCounts().FromRaw([]uint64{10})
1832+
1833+
return metrics
1834+
}(),
1835+
},
16801836
{
16811837
name: "multiple histogram metrics with exemplars",
16821838
request: &writev2.Request{

0 commit comments

Comments
 (0)