Skip to content

Commit 8dd7ce2

Browse files
authored
[exporter/splunkhec] Fix HEC routing partitioning when batcher is enabled (#47725)
Fixes #47695 - Adds `batchperresourceattr.WithMetadataInjection()` to all three `New*BatchPerResource*` calls so each sub-batch carries its HEC token and index as `client.Metadata` in the context - Adds `hecQueueSettings()` to ensure `Batch.Partition.MetadataKeys` always contains the HEC routing keys when the user enables batching, preventing the batcher from merging requests across different routing partitions
1 parent 15087d3 commit 8dd7ce2

File tree

4 files changed

+227
-11
lines changed

4 files changed

+227
-11
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
change_type: bug_fix
2+
component: exporter/splunk_hec
3+
note: Fix HEC routing partitioning broken when the exporterhelper batcher is enabled
4+
issues: [47695]
5+
change_logs: [user]

exporter/splunkhecexporter/client_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2163,6 +2163,108 @@ func TestPushLogsRetryableFailureMultipleResources(t *testing.T) {
21632163
assert.Equal(t, logs, expectedErr.Data())
21642164
}
21652165

2166+
// runBatchedLogExport is a variant of runLogExport that enables the
2167+
// exporterhelper batcher and collects all requests received until n have
2168+
// arrived or a 10-second timeout. Shutdown is used to force-flush pending
2169+
// batcher items before waiting.
2170+
func runBatchedLogExport(t *testing.T, cfg *Config, ld plog.Logs, expectedBatchesNum int) []receivedRequest {
2171+
t.Helper()
2172+
2173+
listener, err := net.Listen("tcp", "127.0.0.1:0")
2174+
require.NoError(t, err)
2175+
2176+
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
2177+
cfg.Token = "1234-1234"
2178+
2179+
// Buffered channel so the async handler goroutines never block.
2180+
rr := make(chan receivedRequest, 64)
2181+
capture := capturingData{testing: t, receivedRequest: rr, statusCode: 200, checkCompression: !cfg.DisableCompression}
2182+
s := &http.Server{
2183+
Handler: &capture,
2184+
ReadHeaderTimeout: 20 * time.Second,
2185+
}
2186+
go func() {
2187+
if e := s.Serve(listener); e != http.ErrServerClosed {
2188+
assert.NoError(t, e)
2189+
}
2190+
}()
2191+
defer s.Close()
2192+
2193+
params := exportertest.NewNopSettings(metadata.Type)
2194+
exp, err := NewFactory().CreateLogs(t.Context(), params, cfg)
2195+
require.NoError(t, err)
2196+
require.NoError(t, exp.Start(t.Context(), componenttest.NewNopHost()))
2197+
2198+
require.NoError(t, exp.ConsumeLogs(t.Context(), ld))
2199+
2200+
// Shutdown flushes all pending batcher items before returning.
2201+
require.NoError(t, exp.Shutdown(t.Context()))
2202+
2203+
// Collect exactly expectedBatchesNum requests, with a generous timeout to
2204+
// account for the async handler goroutine scheduling.
2205+
var requests []receivedRequest
2206+
deadline := time.After(10 * time.Second)
2207+
for len(requests) < expectedBatchesNum {
2208+
select {
2209+
case req := <-rr:
2210+
requests = append(requests, req)
2211+
case <-deadline:
2212+
require.Len(t, requests, expectedBatchesNum, "timed out waiting for HTTP requests")
2213+
return requests
2214+
}
2215+
}
2216+
return requests
2217+
}
2218+
2219+
// TestBatcherPartitionsByHecToken verifies that when the exporterhelper batcher
2220+
// is enabled, log resources with different HEC tokens are sent as separate HTTP
2221+
// requests (different Authorization headers), while resources sharing the same
2222+
// token are batched into a single request.
2223+
func TestBatcherPartitionsByHecToken(t *testing.T) {
2224+
makeCfg := func() *Config {
2225+
cfg := createDefaultConfig().(*Config)
2226+
cfg.QueueSettings.Get().Batch = configoptional.Some(exporterhelper.BatchConfig{
2227+
FlushTimeout: 200 * time.Millisecond,
2228+
Sizer: exporterhelper.RequestSizerTypeItems,
2229+
MinSize: 2,
2230+
})
2231+
return cfg
2232+
}
2233+
2234+
t.Run("different tokens produce separate HTTP requests", func(t *testing.T) {
2235+
ld := plog.NewLogs()
2236+
2237+
rl0 := ld.ResourceLogs().AppendEmpty()
2238+
rl0.Resource().Attributes().PutStr(splunk.HecTokenLabel, "token-A")
2239+
rl0.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log from A")
2240+
2241+
rl1 := ld.ResourceLogs().AppendEmpty()
2242+
rl1.Resource().Attributes().PutStr(splunk.HecTokenLabel, "token-B")
2243+
rl1.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log from B")
2244+
2245+
requests := runBatchedLogExport(t, makeCfg(), ld, 2)
2246+
2247+
authHeaders := []string{
2248+
requests[0].headers.Get("Authorization"),
2249+
requests[1].headers.Get("Authorization"),
2250+
}
2251+
sort.Strings(authHeaders)
2252+
assert.Equal(t, []string{"Splunk token-A", "Splunk token-B"}, authHeaders)
2253+
})
2254+
2255+
t.Run("same token resources are batched into one request", func(t *testing.T) {
2256+
ld := plog.NewLogs()
2257+
for range 2 {
2258+
rl := ld.ResourceLogs().AppendEmpty()
2259+
rl.Resource().Attributes().PutStr(splunk.HecTokenLabel, "token-same")
2260+
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log")
2261+
}
2262+
2263+
requests := runBatchedLogExport(t, makeCfg(), ld, 1)
2264+
assert.Equal(t, "Splunk token-same", requests[0].headers.Get("Authorization"))
2265+
})
2266+
}
2267+
21662268
// validateCompressedContains validates that GZipped `got` contains `expected` strings
21672269
func validateCompressedContains(t *testing.T, expected []string, got []byte) {
21682270
z, err := gzip.NewReader(bytes.NewReader(got))

exporter/splunkhecexporter/factory.go

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-col
55

66
import (
77
"context"
8+
"strings"
89
"time"
910

1011
"go.opentelemetry.io/collector/component"
@@ -113,7 +114,7 @@ func createTracesExporter(
113114
// explicitly disable since we rely on http.Client timeout logic.
114115
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
115116
exporterhelper.WithRetry(cfg.BackOffConfig),
116-
exporterhelper.WithQueue(cfg.QueueSettings),
117+
exporterhelper.WithQueue(hecQueueSettings(cfg.QueueSettings)),
117118
exporterhelper.WithStart(c.start),
118119
exporterhelper.WithShutdown(c.stop),
119120
)
@@ -123,7 +124,11 @@ func createTracesExporter(
123124

124125
wrapped := &baseTracesExporter{
125126
Component: e,
126-
Traces: batchperresourceattr.NewMultiBatchPerResourceTraces([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, e),
127+
Traces: batchperresourceattr.NewMultiBatchPerResourceTraces(
128+
[]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel},
129+
e,
130+
batchperresourceattr.WithMetadataInjection(),
131+
),
127132
}
128133

129134
return wrapped, nil
@@ -146,7 +151,7 @@ func createMetricsExporter(
146151
// explicitly disable since we rely on http.Client timeout logic.
147152
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
148153
exporterhelper.WithRetry(cfg.BackOffConfig),
149-
exporterhelper.WithQueue(cfg.QueueSettings),
154+
exporterhelper.WithQueue(hecQueueSettings(cfg.QueueSettings)),
150155
exporterhelper.WithStart(c.start),
151156
exporterhelper.WithShutdown(c.stop),
152157
)
@@ -156,7 +161,11 @@ func createMetricsExporter(
156161

157162
wrapped := &baseMetricsExporter{
158163
Component: e,
159-
Metrics: batchperresourceattr.NewMultiBatchPerResourceMetrics([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, e),
164+
Metrics: batchperresourceattr.NewMultiBatchPerResourceMetrics(
165+
[]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel},
166+
e,
167+
batchperresourceattr.WithMetadataInjection(),
168+
),
160169
}
161170

162171
return wrapped, nil
@@ -179,7 +188,7 @@ func createLogsExporter(
179188
// explicitly disable since we rely on http.Client timeout logic.
180189
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
181190
exporterhelper.WithRetry(cfg.BackOffConfig),
182-
exporterhelper.WithQueue(cfg.QueueSettings),
191+
exporterhelper.WithQueue(hecQueueSettings(cfg.QueueSettings)),
183192
exporterhelper.WithStart(c.start),
184193
exporterhelper.WithShutdown(c.stop),
185194
)
@@ -189,17 +198,50 @@ func createLogsExporter(
189198

190199
wrapped := &baseLogsExporter{
191200
Component: logsExporter,
192-
Logs: batchperresourceattr.NewMultiBatchPerResourceLogs([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, &perScopeBatcher{
193-
logsEnabled: cfg.LogDataEnabled,
194-
profilingEnabled: cfg.ProfilingDataEnabled,
195-
logger: set.Logger,
196-
next: logsExporter,
197-
}),
201+
Logs: batchperresourceattr.NewMultiBatchPerResourceLogs(
202+
[]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel},
203+
&perScopeBatcher{
204+
logsEnabled: cfg.LogDataEnabled,
205+
profilingEnabled: cfg.ProfilingDataEnabled,
206+
logger: set.Logger,
207+
next: logsExporter,
208+
},
209+
batchperresourceattr.WithMetadataInjection(),
210+
),
198211
}
199212

200213
return wrapped, nil
201214
}
202215

216+
// hecRequiredMetadataKeys are the context metadata keys the exporterhelper
217+
// batcher must use to partition requests so different HEC routing targets
218+
// are never merged into the same batch.
219+
var hecRequiredMetadataKeys = []string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}
220+
221+
// hecQueueSettings returns a copy of qs with hecRequiredMetadataKeys
222+
// guaranteed to be present in Batch.Partition.MetadataKeys. Keys already
223+
// set by the user are preserved; required keys are appended only when absent
224+
// (case-insensitive). Returns qs unchanged when batching is not configured.
225+
func hecQueueSettings(qs configoptional.Optional[exporterhelper.QueueBatchConfig]) configoptional.Optional[exporterhelper.QueueBatchConfig] {
226+
if !qs.HasValue() || !qs.Get().Batch.HasValue() {
227+
return qs
228+
}
229+
qCopy := *qs.Get()
230+
bCopy := *qCopy.Batch.Get()
231+
232+
existing := make(map[string]bool, len(bCopy.Partition.MetadataKeys))
233+
for _, k := range bCopy.Partition.MetadataKeys {
234+
existing[strings.ToLower(k)] = true
235+
}
236+
for _, k := range hecRequiredMetadataKeys {
237+
if !existing[strings.ToLower(k)] {
238+
bCopy.Partition.MetadataKeys = append(bCopy.Partition.MetadataKeys, k)
239+
}
240+
}
241+
qCopy.Batch = configoptional.Some(bCopy)
242+
return configoptional.Some(qCopy)
243+
}
244+
203245
func showDeprecationWarnings(cfg *Config, logger *zap.Logger) {
204246
if cfg.DeprecatedBatcher.isSet {
205247
logger.Warn("The 'batcher' field is deprecated and will be removed in a future release. Use 'sending_queue::batch' instead.")

exporter/splunkhecexporter/factory_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"go.opentelemetry.io/collector/exporter/exportertest"
1717

1818
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter/internal/metadata"
19+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
1920
)
2021

2122
func TestCreateDefaultConfig(t *testing.T) {
@@ -129,3 +130,69 @@ func TestFactory_EnabledBatchingMakesExporterMutable(t *testing.T) {
129130
require.NoError(t, err)
130131
assert.True(t, le.Capabilities().MutatesData)
131132
}
133+
134+
func TestHecQueueSettings(t *testing.T) {
135+
t.Run("no queue settings returned unchanged", func(t *testing.T) {
136+
out := hecQueueSettings(configoptional.None[exporterhelper.QueueBatchConfig]())
137+
assert.False(t, out.HasValue())
138+
})
139+
140+
t.Run("queue without batch returned unchanged", func(t *testing.T) {
141+
qs := configoptional.Some(exporterhelper.QueueBatchConfig{NumConsumers: 2, QueueSize: 100})
142+
out := hecQueueSettings(qs)
143+
require.True(t, out.HasValue())
144+
assert.False(t, out.Get().Batch.HasValue())
145+
})
146+
147+
someBatch := func(keys []string) exporterhelper.QueueBatchConfig {
148+
cfg := exporterhelper.NewDefaultQueueConfig()
149+
batch := exporterhelper.BatchConfig{
150+
FlushTimeout: 200 * time.Millisecond,
151+
Sizer: exporterhelper.RequestSizerTypeItems,
152+
MinSize: 8192,
153+
}
154+
batch.Partition.MetadataKeys = keys
155+
cfg.Batch = configoptional.Some(batch)
156+
return cfg
157+
}
158+
159+
t.Run("required keys added when missing", func(t *testing.T) {
160+
out := hecQueueSettings(configoptional.Some(someBatch(nil)))
161+
keys := out.Get().Batch.Get().Partition.MetadataKeys
162+
assert.Contains(t, keys, splunk.HecTokenLabel)
163+
assert.Contains(t, keys, splunk.DefaultIndexLabel)
164+
})
165+
166+
t.Run("user keys preserved and required keys appended", func(t *testing.T) {
167+
out := hecQueueSettings(configoptional.Some(someBatch([]string{"custom_key"})))
168+
keys := out.Get().Batch.Get().Partition.MetadataKeys
169+
assert.Contains(t, keys, "custom_key")
170+
assert.Contains(t, keys, splunk.HecTokenLabel)
171+
assert.Contains(t, keys, splunk.DefaultIndexLabel)
172+
})
173+
174+
t.Run("no duplicates when required keys already present", func(t *testing.T) {
175+
out := hecQueueSettings(configoptional.Some(someBatch([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel})))
176+
keys := out.Get().Batch.Get().Partition.MetadataKeys
177+
count := 0
178+
for _, k := range keys {
179+
if k == splunk.HecTokenLabel || k == splunk.DefaultIndexLabel {
180+
count++
181+
}
182+
}
183+
assert.Equal(t, 2, count)
184+
})
185+
186+
t.Run("case-insensitive deduplication", func(t *testing.T) {
187+
out := hecQueueSettings(configoptional.Some(someBatch([]string{"Com.Splunk.Hec.Access_Token", "COM.SPLUNK.INDEX"})))
188+
keys := out.Get().Batch.Get().Partition.MetadataKeys
189+
assert.Len(t, keys, 2)
190+
})
191+
192+
t.Run("original config not mutated", func(t *testing.T) {
193+
orig := someBatch(nil)
194+
origKeys := orig.Batch.Get().Partition.MetadataKeys
195+
_ = hecQueueSettings(configoptional.Some(orig))
196+
assert.Equal(t, origKeys, orig.Batch.Get().Partition.MetadataKeys)
197+
})
198+
}

0 commit comments

Comments
 (0)