Skip to content

Commit 96b5389

Browse files
authored
Merge branch 'main' into exphist_prwreceiver
2 parents 24b4822 + 3cbf39c commit 96b5389

File tree

10 files changed

+251
-28
lines changed

10 files changed

+251
-28
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
change_type: breaking
2+
component: exporter/signalfx
3+
note: Default `api_url` and `ingest_url` values derived from `realm` now use `*.observability.splunkcloud.com` instead of `*.signalfx.com`.
4+
issues: [47670]
5+
subtext: |
6+
Explicit `api_url` and `ingest_url` settings are unchanged. Update network allowlists if they targeted only `*.signalfx.com`.
7+
change_logs: [user]
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
change_type: bug_fix
2+
component: exporter/splunk_hec
3+
note: Fix HEC routing partitioning broken when the exporterhelper batcher is enabled
4+
issues: [47695]
5+
change_logs: [user]

exporter/signalfxexporter/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ The following configuration options are required:
3333
- `api_url` (no default): Destination to which [properties and
3434
tags](https://help.splunk.com/en/splunk-observability-cloud/data-tools/metric-finder-and-metadata-catalogue)
3535
are sent. If `realm` is set, this option is derived and will be
36-
`https://api.{realm}.signalfx.com`. If a value is explicitly set, the
36+
`https://api.{realm}.observability.splunkcloud.com`. If a value is explicitly set, the
3737
value of `realm` will not be used in determining `api_url`. The explicit
3838
value will be used instead.
3939
- `ingest_url` (no default): Destination where SignalFx metrics are sent. If
4040
`realm` is set, this option is derived and will be
41-
`https://ingest.{realm}.signalfx.com`. If a value is
41+
`https://ingest.{realm}.observability.splunkcloud.com`. If a value is
4242
explicitly set, the value of `realm` will not be used in determining
4343
`ingest_url`. The explicit value will be used instead. The exporter will
4444
automatically append the appropriate path: "/v2/datapoint" for metrics,
@@ -168,11 +168,11 @@ One of `realm` and `api_url` are required.
168168
- `access_token` (required, no default): The access token is the authentication token
169169
provided by SignalFx.
170170
- `realm` (no default): SignalFx realm where the data will be received.
171-
- `api_url` (default = `https://api.{realm}.signalfx.com/`): Destination to which correlation updates
171+
- `api_url` (default = `https://api.{realm}.observability.splunkcloud.com/`): Destination to which correlation updates
172172
are sent. If a value is explicitly set, the value of `realm` will not be used in determining `api_url`.
173173
The explicit value will be used instead.
174174
- `correlation` Contains options controlling the syncing of service and environment properties onto dimensions.
175-
- `endpoint` (required, default = `api_url` or `https://api.{realm}.signalfx.com/`): This is the base URL for API requests (e.g. `https://api.us0.signalfx.com`).
175+
- `endpoint` (required, default = `api_url` or `https://api.{realm}.observability.splunkcloud.com/`): This is the base URL for API requests (e.g. `https://api.us0.observability.splunkcloud.com`).
176176
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
177177
- `stale_service_timeout` (default = 5 minutes): How long to wait after a span's service name is last seen before uncorrelating it.
178178
- `max_requests` (default = 20): Max HTTP requests to be made in parallel.

exporter/signalfxexporter/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func (cfg *Config) getMetricTranslator(done chan struct{}) (*translation.MetricT
185185
func (cfg *Config) getIngestURL() (*url.URL, error) {
186186
strURL := cfg.IngestURL
187187
if cfg.IngestURL == "" {
188-
strURL = fmt.Sprintf("https://ingest.%s.signalfx.com", cfg.Realm)
188+
strURL = fmt.Sprintf("https://ingest.%s.observability.splunkcloud.com", cfg.Realm)
189189
}
190190

191191
ingestURL, err := url.Parse(strURL)
@@ -198,7 +198,7 @@ func (cfg *Config) getIngestURL() (*url.URL, error) {
198198
func (cfg *Config) getAPIURL() (*url.URL, error) {
199199
strURL := cfg.APIURL
200200
if cfg.APIURL == "" {
201-
strURL = fmt.Sprintf("https://api.%s.signalfx.com", cfg.Realm)
201+
strURL = fmt.Sprintf("https://api.%s.observability.splunkcloud.com", cfg.Realm)
202202
}
203203

204204
apiURL, err := url.Parse(strURL)

exporter/signalfxexporter/config_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -339,19 +339,19 @@ func TestConfigGetIngestURL(t *testing.T) {
339339
},
340340
want: &url.URL{
341341
Scheme: "https",
342-
Host: "ingest.us0.signalfx.com",
342+
Host: "ingest.us0.observability.splunkcloud.com",
343343
Path: "",
344344
},
345345
},
346346
{
347347
name: "Test URL overrides",
348348
cfg: &Config{
349349
Realm: "us0",
350-
IngestURL: "https://ingest.us1.signalfx.com/",
350+
IngestURL: "https://ingest.us1.observability.splunkcloud.com/",
351351
},
352352
want: &url.URL{
353353
Scheme: "https",
354-
Host: "ingest.us1.signalfx.com",
354+
Host: "ingest.us1.observability.splunkcloud.com",
355355
Path: "/",
356356
},
357357
},
@@ -390,18 +390,18 @@ func TestConfigGetAPIURL(t *testing.T) {
390390
},
391391
want: &url.URL{
392392
Scheme: "https",
393-
Host: "api.us0.signalfx.com",
393+
Host: "api.us0.observability.splunkcloud.com",
394394
},
395395
},
396396
{
397397
name: "Test URL overrides",
398398
cfg: &Config{
399399
Realm: "us0",
400-
APIURL: "https://api.us1.signalfx.com/",
400+
APIURL: "https://api.us1.observability.splunkcloud.com/",
401401
},
402402
want: &url.URL{
403403
Scheme: "https",
404-
Host: "api.us1.signalfx.com",
404+
Host: "api.us1.observability.splunkcloud.com",
405405
Path: "/",
406406
},
407407
},
@@ -439,14 +439,14 @@ func TestConfigValidateErrors(t *testing.T) {
439439
name: "Test empty realm and API URL",
440440
cfg: &Config{
441441
AccessToken: "access_token",
442-
IngestURL: "https://ingest.us1.signalfx.com/",
442+
IngestURL: "https://ingest.us1.observability.splunkcloud.com/",
443443
},
444444
},
445445
{
446446
name: "Test empty realm and Ingest URL",
447447
cfg: &Config{
448448
AccessToken: "access_token",
449-
APIURL: "https://api.us1.signalfx.com/",
449+
APIURL: "https://api.us1.observability.splunkcloud.com/",
450450
},
451451
},
452452
{

exporter/splunkhecexporter/client_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2163,6 +2163,108 @@ func TestPushLogsRetryableFailureMultipleResources(t *testing.T) {
21632163
assert.Equal(t, logs, expectedErr.Data())
21642164
}
21652165

2166+
// runBatchedLogExport is a variant of runLogExport that enables the
2167+
// exporterhelper batcher and collects all requests received until n have
2168+
// arrived or a 10-second timeout. Shutdown is used to force-flush pending
2169+
// batcher items before waiting.
2170+
func runBatchedLogExport(t *testing.T, cfg *Config, ld plog.Logs, expectedBatchesNum int) []receivedRequest {
2171+
t.Helper()
2172+
2173+
listener, err := net.Listen("tcp", "127.0.0.1:0")
2174+
require.NoError(t, err)
2175+
2176+
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
2177+
cfg.Token = "1234-1234"
2178+
2179+
// Buffered channel so the async handler goroutines never block.
2180+
rr := make(chan receivedRequest, 64)
2181+
capture := capturingData{testing: t, receivedRequest: rr, statusCode: 200, checkCompression: !cfg.DisableCompression}
2182+
s := &http.Server{
2183+
Handler: &capture,
2184+
ReadHeaderTimeout: 20 * time.Second,
2185+
}
2186+
go func() {
2187+
if e := s.Serve(listener); e != http.ErrServerClosed {
2188+
assert.NoError(t, e)
2189+
}
2190+
}()
2191+
defer s.Close()
2192+
2193+
params := exportertest.NewNopSettings(metadata.Type)
2194+
exp, err := NewFactory().CreateLogs(t.Context(), params, cfg)
2195+
require.NoError(t, err)
2196+
require.NoError(t, exp.Start(t.Context(), componenttest.NewNopHost()))
2197+
2198+
require.NoError(t, exp.ConsumeLogs(t.Context(), ld))
2199+
2200+
// Shutdown flushes all pending batcher items before returning.
2201+
require.NoError(t, exp.Shutdown(t.Context()))
2202+
2203+
// Collect exactly expectedBatchesNum requests, with a generous timeout to
2204+
// account for the async handler goroutine scheduling.
2205+
var requests []receivedRequest
2206+
deadline := time.After(10 * time.Second)
2207+
for len(requests) < expectedBatchesNum {
2208+
select {
2209+
case req := <-rr:
2210+
requests = append(requests, req)
2211+
case <-deadline:
2212+
require.Len(t, requests, expectedBatchesNum, "timed out waiting for HTTP requests")
2213+
return requests
2214+
}
2215+
}
2216+
return requests
2217+
}
2218+
2219+
// TestBatcherPartitionsByHecToken verifies that when the exporterhelper batcher
2220+
// is enabled, log resources with different HEC tokens are sent as separate HTTP
2221+
// requests (different Authorization headers), while resources sharing the same
2222+
// token are batched into a single request.
2223+
func TestBatcherPartitionsByHecToken(t *testing.T) {
2224+
makeCfg := func() *Config {
2225+
cfg := createDefaultConfig().(*Config)
2226+
cfg.QueueSettings.Get().Batch = configoptional.Some(exporterhelper.BatchConfig{
2227+
FlushTimeout: 200 * time.Millisecond,
2228+
Sizer: exporterhelper.RequestSizerTypeItems,
2229+
MinSize: 2,
2230+
})
2231+
return cfg
2232+
}
2233+
2234+
t.Run("different tokens produce separate HTTP requests", func(t *testing.T) {
2235+
ld := plog.NewLogs()
2236+
2237+
rl0 := ld.ResourceLogs().AppendEmpty()
2238+
rl0.Resource().Attributes().PutStr(splunk.HecTokenLabel, "token-A")
2239+
rl0.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log from A")
2240+
2241+
rl1 := ld.ResourceLogs().AppendEmpty()
2242+
rl1.Resource().Attributes().PutStr(splunk.HecTokenLabel, "token-B")
2243+
rl1.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log from B")
2244+
2245+
requests := runBatchedLogExport(t, makeCfg(), ld, 2)
2246+
2247+
authHeaders := []string{
2248+
requests[0].headers.Get("Authorization"),
2249+
requests[1].headers.Get("Authorization"),
2250+
}
2251+
sort.Strings(authHeaders)
2252+
assert.Equal(t, []string{"Splunk token-A", "Splunk token-B"}, authHeaders)
2253+
})
2254+
2255+
t.Run("same token resources are batched into one request", func(t *testing.T) {
2256+
ld := plog.NewLogs()
2257+
for range 2 {
2258+
rl := ld.ResourceLogs().AppendEmpty()
2259+
rl.Resource().Attributes().PutStr(splunk.HecTokenLabel, "token-same")
2260+
rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("log")
2261+
}
2262+
2263+
requests := runBatchedLogExport(t, makeCfg(), ld, 1)
2264+
assert.Equal(t, "Splunk token-same", requests[0].headers.Get("Authorization"))
2265+
})
2266+
}
2267+
21662268
// validateCompressedContains validates that GZipped `got` contains `expected` strings
21672269
func validateCompressedContains(t *testing.T, expected []string, got []byte) {
21682270
z, err := gzip.NewReader(bytes.NewReader(got))

exporter/splunkhecexporter/factory.go

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-col
55

66
import (
77
"context"
8+
"strings"
89
"time"
910

1011
"go.opentelemetry.io/collector/component"
@@ -113,7 +114,7 @@ func createTracesExporter(
113114
// explicitly disable since we rely on http.Client timeout logic.
114115
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
115116
exporterhelper.WithRetry(cfg.BackOffConfig),
116-
exporterhelper.WithQueue(cfg.QueueSettings),
117+
exporterhelper.WithQueue(hecQueueSettings(cfg.QueueSettings)),
117118
exporterhelper.WithStart(c.start),
118119
exporterhelper.WithShutdown(c.stop),
119120
)
@@ -123,7 +124,11 @@ func createTracesExporter(
123124

124125
wrapped := &baseTracesExporter{
125126
Component: e,
126-
Traces: batchperresourceattr.NewMultiBatchPerResourceTraces([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, e),
127+
Traces: batchperresourceattr.NewMultiBatchPerResourceTraces(
128+
[]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel},
129+
e,
130+
batchperresourceattr.WithMetadataInjection(),
131+
),
127132
}
128133

129134
return wrapped, nil
@@ -146,7 +151,7 @@ func createMetricsExporter(
146151
// explicitly disable since we rely on http.Client timeout logic.
147152
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
148153
exporterhelper.WithRetry(cfg.BackOffConfig),
149-
exporterhelper.WithQueue(cfg.QueueSettings),
154+
exporterhelper.WithQueue(hecQueueSettings(cfg.QueueSettings)),
150155
exporterhelper.WithStart(c.start),
151156
exporterhelper.WithShutdown(c.stop),
152157
)
@@ -156,7 +161,11 @@ func createMetricsExporter(
156161

157162
wrapped := &baseMetricsExporter{
158163
Component: e,
159-
Metrics: batchperresourceattr.NewMultiBatchPerResourceMetrics([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, e),
164+
Metrics: batchperresourceattr.NewMultiBatchPerResourceMetrics(
165+
[]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel},
166+
e,
167+
batchperresourceattr.WithMetadataInjection(),
168+
),
160169
}
161170

162171
return wrapped, nil
@@ -179,7 +188,7 @@ func createLogsExporter(
179188
// explicitly disable since we rely on http.Client timeout logic.
180189
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
181190
exporterhelper.WithRetry(cfg.BackOffConfig),
182-
exporterhelper.WithQueue(cfg.QueueSettings),
191+
exporterhelper.WithQueue(hecQueueSettings(cfg.QueueSettings)),
183192
exporterhelper.WithStart(c.start),
184193
exporterhelper.WithShutdown(c.stop),
185194
)
@@ -189,17 +198,50 @@ func createLogsExporter(
189198

190199
wrapped := &baseLogsExporter{
191200
Component: logsExporter,
192-
Logs: batchperresourceattr.NewMultiBatchPerResourceLogs([]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}, &perScopeBatcher{
193-
logsEnabled: cfg.LogDataEnabled,
194-
profilingEnabled: cfg.ProfilingDataEnabled,
195-
logger: set.Logger,
196-
next: logsExporter,
197-
}),
201+
Logs: batchperresourceattr.NewMultiBatchPerResourceLogs(
202+
[]string{splunk.HecTokenLabel, splunk.DefaultIndexLabel},
203+
&perScopeBatcher{
204+
logsEnabled: cfg.LogDataEnabled,
205+
profilingEnabled: cfg.ProfilingDataEnabled,
206+
logger: set.Logger,
207+
next: logsExporter,
208+
},
209+
batchperresourceattr.WithMetadataInjection(),
210+
),
198211
}
199212

200213
return wrapped, nil
201214
}
202215

216+
// hecRequiredMetadataKeys are the context metadata keys the exporterhelper
217+
// batcher must use to partition requests so different HEC routing targets
218+
// are never merged into the same batch.
219+
var hecRequiredMetadataKeys = []string{splunk.HecTokenLabel, splunk.DefaultIndexLabel}
220+
221+
// hecQueueSettings returns a copy of qs with hecRequiredMetadataKeys
222+
// guaranteed to be present in Batch.Partition.MetadataKeys. Keys already
223+
// set by the user are preserved; required keys are appended only when absent
224+
// (case-insensitive). Returns qs unchanged when batching is not configured.
225+
func hecQueueSettings(qs configoptional.Optional[exporterhelper.QueueBatchConfig]) configoptional.Optional[exporterhelper.QueueBatchConfig] {
226+
if !qs.HasValue() || !qs.Get().Batch.HasValue() {
227+
return qs
228+
}
229+
qCopy := *qs.Get()
230+
bCopy := *qCopy.Batch.Get()
231+
232+
existing := make(map[string]bool, len(bCopy.Partition.MetadataKeys))
233+
for _, k := range bCopy.Partition.MetadataKeys {
234+
existing[strings.ToLower(k)] = true
235+
}
236+
for _, k := range hecRequiredMetadataKeys {
237+
if !existing[strings.ToLower(k)] {
238+
bCopy.Partition.MetadataKeys = append(bCopy.Partition.MetadataKeys, k)
239+
}
240+
}
241+
qCopy.Batch = configoptional.Some(bCopy)
242+
return configoptional.Some(qCopy)
243+
}
244+
203245
func showDeprecationWarnings(cfg *Config, logger *zap.Logger) {
204246
if cfg.DeprecatedBatcher.isSet {
205247
logger.Warn("The 'batcher' field is deprecated and will be removed in a future release. Use 'sending_queue::batch' instead.")

0 commit comments

Comments
 (0)