Skip to content

Commit cffe3bd

Browse files
feat(instrumentation-aws-sdk): inject trace context into Kinesis PutRecord/PutRecords
1 parent 89c6a01 commit cffe3bd

File tree

4 files changed

+307
-11
lines changed

4 files changed

+307
-11
lines changed

packages/instrumentation-aws-sdk/src/services/kinesis.ts

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,18 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
import { Attributes, SpanKind } from '@opentelemetry/api';
16+
import {
17+
Attributes,
18+
SpanKind,
19+
context,
20+
propagation,
21+
diag,
22+
} from '@opentelemetry/api';
1723
import { AttributeNames } from '../enums';
24+
import {
25+
ATTR_MESSAGING_DESTINATION_NAME,
26+
ATTR_MESSAGING_SYSTEM,
27+
} from '../semconv';
1828
import { AwsSdkInstrumentationConfig, NormalizedRequest } from '../types';
1929
import { RequestMetadata, ServiceExtension } from './ServiceExtension';
2030

@@ -23,20 +33,101 @@ export class KinesisServiceExtension implements ServiceExtension {
2333
request: NormalizedRequest,
2434
_config: AwsSdkInstrumentationConfig
2535
): RequestMetadata {
26-
const streamName = request.commandInput?.StreamName;
27-
const spanKind: SpanKind = SpanKind.CLIENT;
36+
const streamName = this.extractStreamName(request.commandInput);
37+
let spanKind: SpanKind = SpanKind.CLIENT;
38+
let spanName: string | undefined;
2839
const spanAttributes: Attributes = {};
2940

3041
if (streamName) {
3142
spanAttributes[AttributeNames.AWS_KINESIS_STREAM_NAME] = streamName;
3243
}
3344

45+
switch (request.commandName) {
46+
case 'PutRecord':
47+
case 'PutRecords':
48+
spanKind = SpanKind.PRODUCER;
49+
spanName = `${streamName ?? 'unknown'} send`;
50+
spanAttributes[ATTR_MESSAGING_SYSTEM] = 'aws_kinesis';
51+
if (streamName) {
52+
spanAttributes[ATTR_MESSAGING_DESTINATION_NAME] = streamName;
53+
}
54+
break;
55+
}
56+
3457
const isIncoming = false;
3558

3659
return {
3760
isIncoming,
3861
spanAttributes,
3962
spanKind,
63+
spanName,
4064
};
4165
}
66+
67+
requestPostSpanHook = (request: NormalizedRequest): void => {
68+
switch (request.commandName) {
69+
case 'PutRecord':
70+
this.injectSpanContextIntoKinesisRecord(request.commandInput);
71+
break;
72+
case 'PutRecords':
73+
{
74+
const records = request.commandInput?.Records;
75+
if (Array.isArray(records)) {
76+
records.forEach((record: Record<string, unknown>) => {
77+
this.injectSpanContextIntoKinesisRecord(record);
78+
});
79+
}
80+
}
81+
break;
82+
}
83+
};
84+
85+
private injectSpanContextIntoKinesisRecord(
86+
entry: Record<string, unknown>
87+
): void {
88+
const data = entry?.Data;
89+
if (data == null) return;
90+
91+
try {
92+
let dataString: string;
93+
let isBuffer = false;
94+
95+
if (typeof data === 'string') {
96+
dataString = data;
97+
} else if (data instanceof Uint8Array) {
98+
isBuffer = true;
99+
dataString = new TextDecoder().decode(data);
100+
} else {
101+
return;
102+
}
103+
104+
const parsed = JSON.parse(dataString);
105+
propagation.inject(context.active(), parsed);
106+
const injected = JSON.stringify(parsed);
107+
108+
entry.Data = isBuffer ? new TextEncoder().encode(injected) : injected;
109+
} catch {
110+
diag.debug(
111+
'aws-sdk instrumentation: Failed to inject context into Kinesis record Data, data is not valid JSON.'
112+
);
113+
}
114+
}
115+
116+
private extractStreamName(
117+
commandInput: Record<string, unknown>
118+
): string | undefined {
119+
if (commandInput?.StreamName) {
120+
return commandInput.StreamName as string;
121+
}
122+
const streamArn = commandInput?.StreamARN as string | undefined;
123+
if (streamArn) {
124+
// ARN format: arn:aws:kinesis:<region>:<account>:stream/<stream-name>
125+
const streamPrefix = 'stream/';
126+
const idx = streamArn.lastIndexOf(streamPrefix);
127+
if (idx >= 0) {
128+
return streamArn.substring(idx + streamPrefix.length);
129+
}
130+
}
131+
return undefined;
132+
}
42133
}

packages/instrumentation-aws-sdk/test/kinesis.test.ts

Lines changed: 211 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ import { getTestSpans } from '@opentelemetry/contrib-test-utils';
1818
import './load-instrumentation';
1919

2020
import { AttributeNames } from '../src/enums';
21-
import { DescribeStreamCommand, KinesisClient } from '@aws-sdk/client-kinesis';
21+
import {
22+
DescribeStreamCommand,
23+
KinesisClient,
24+
PutRecordCommand,
25+
PutRecordsCommand,
26+
} from '@aws-sdk/client-kinesis';
2227
import { NodeHttpHandler } from '@smithy/node-http-handler';
2328
import * as fs from 'fs';
2429
import * as nock from 'nock';
@@ -30,10 +35,18 @@ import { expect } from 'expect';
3035
const region = 'us-east-1';
3136

3237
describe('Kinesis - v3', () => {
38+
let client: KinesisClient;
39+
const dummyStreamName = 'dummy-stream-name';
40+
41+
beforeEach(() => {
42+
client = new KinesisClient({
43+
region: region,
44+
requestHandler: new NodeHttpHandler(),
45+
});
46+
});
47+
3348
describe('DescribeStream', () => {
3449
it('Request span attributes - adds Stream Name', async () => {
35-
const dummyStreamName = 'dummy-stream-name';
36-
3750
nock(`https://kinesis.${region}.amazonaws.com`)
3851
.post('/')
3952
.reply(
@@ -48,11 +61,6 @@ describe('Kinesis - v3', () => {
4861
StreamName: dummyStreamName,
4962
};
5063

51-
// Use NodeHttpHandler to use HTTP instead of HTTP2 because nock does not support HTTP2
52-
const client = new KinesisClient({
53-
region: region,
54-
requestHandler: new NodeHttpHandler(),
55-
});
5664
await client.send(new DescribeStreamCommand(params));
5765

5866
const testSpans: ReadableSpan[] = getTestSpans();
@@ -69,4 +77,199 @@ describe('Kinesis - v3', () => {
6977
expect(describeSpan.kind).toBe(SpanKind.CLIENT);
7078
});
7179
});
80+
81+
describe('PutRecord', () => {
82+
it('injects trace context into JSON Data', async () => {
83+
nock(`https://kinesis.${region}.amazonaws.com`)
84+
.post('/')
85+
.reply(
86+
200,
87+
fs.readFileSync(
88+
'./test/mock-responses/kinesis-put-record.json',
89+
'utf8'
90+
)
91+
);
92+
93+
const payload = { message: 'hello' };
94+
const params = {
95+
StreamName: dummyStreamName,
96+
PartitionKey: 'pk-1',
97+
Data: new TextEncoder().encode(JSON.stringify(payload)),
98+
};
99+
100+
await client.send(new PutRecordCommand(params));
101+
102+
const testSpans: ReadableSpan[] = getTestSpans();
103+
const putRecordSpans: ReadableSpan[] = testSpans.filter(
104+
(s: ReadableSpan) => {
105+
return s.name === `${dummyStreamName} send`;
106+
}
107+
);
108+
expect(putRecordSpans.length).toBe(1);
109+
const span = putRecordSpans[0];
110+
expect(span.kind).toBe(SpanKind.PRODUCER);
111+
expect(
112+
span.attributes[AttributeNames.AWS_KINESIS_STREAM_NAME]
113+
).toBe(dummyStreamName);
114+
expect(span.attributes['messaging.system']).toBe('aws_kinesis');
115+
expect(span.attributes['messaging.destination.name']).toBe(
116+
dummyStreamName
117+
);
118+
119+
// Verify trace context was injected into the Data payload
120+
const injectedData = JSON.parse(
121+
new TextDecoder().decode(params.Data)
122+
);
123+
expect(injectedData.message).toBe('hello');
124+
expect(injectedData.traceparent).toBeDefined();
125+
expect(typeof injectedData.traceparent).toBe('string');
126+
});
127+
128+
it('extracts stream name from StreamARN', async () => {
129+
const endpoint = `https://kinesis.${region}.amazonaws.com`;
130+
nock(endpoint)
131+
.post('/')
132+
.reply(
133+
200,
134+
fs.readFileSync(
135+
'./test/mock-responses/kinesis-put-record.json',
136+
'utf8'
137+
)
138+
);
139+
140+
const arnClient = new KinesisClient({
141+
region: region,
142+
requestHandler: new NodeHttpHandler(),
143+
endpoint: endpoint,
144+
});
145+
146+
const payload = { message: 'hello' };
147+
const params = {
148+
StreamARN: `arn:aws:kinesis:${region}:123456789012:stream/my-stream-from-arn`,
149+
PartitionKey: 'pk-1',
150+
Data: new TextEncoder().encode(JSON.stringify(payload)),
151+
};
152+
153+
await arnClient.send(new PutRecordCommand(params));
154+
155+
const testSpans: ReadableSpan[] = getTestSpans();
156+
const putRecordSpans: ReadableSpan[] = testSpans.filter(
157+
(s: ReadableSpan) => {
158+
return s.name === 'my-stream-from-arn send';
159+
}
160+
);
161+
expect(putRecordSpans.length).toBe(1);
162+
const span = putRecordSpans[0];
163+
expect(span.kind).toBe(SpanKind.PRODUCER);
164+
expect(
165+
span.attributes[AttributeNames.AWS_KINESIS_STREAM_NAME]
166+
).toBe('my-stream-from-arn');
167+
expect(span.attributes['messaging.destination.name']).toBe(
168+
'my-stream-from-arn'
169+
);
170+
});
171+
172+
it('handles non-JSON data gracefully', async () => {
173+
nock(`https://kinesis.${region}.amazonaws.com`)
174+
.post('/')
175+
.reply(
176+
200,
177+
fs.readFileSync(
178+
'./test/mock-responses/kinesis-put-record.json',
179+
'utf8'
180+
)
181+
);
182+
183+
const nonJsonData = 'this is not json';
184+
const params = {
185+
StreamName: dummyStreamName,
186+
PartitionKey: 'pk-1',
187+
Data: new TextEncoder().encode(nonJsonData),
188+
};
189+
190+
await client.send(new PutRecordCommand(params));
191+
192+
const testSpans: ReadableSpan[] = getTestSpans();
193+
const putRecordSpans: ReadableSpan[] = testSpans.filter(
194+
(s: ReadableSpan) => {
195+
return s.name === `${dummyStreamName} send`;
196+
}
197+
);
198+
expect(putRecordSpans.length).toBe(1);
199+
const span = putRecordSpans[0];
200+
expect(span.kind).toBe(SpanKind.PRODUCER);
201+
202+
// Data should remain unchanged since it's not valid JSON
203+
const resultData = new TextDecoder().decode(params.Data);
204+
expect(resultData).toBe(nonJsonData);
205+
});
206+
});
207+
208+
describe('PutRecords', () => {
209+
it('injects trace context into all records', async () => {
210+
nock(`https://kinesis.${region}.amazonaws.com`)
211+
.post('/')
212+
.reply(
213+
200,
214+
fs.readFileSync(
215+
'./test/mock-responses/kinesis-put-records.json',
216+
'utf8'
217+
)
218+
);
219+
220+
const records = [
221+
{
222+
Data: new TextEncoder().encode(JSON.stringify({ id: 1 })),
223+
PartitionKey: 'pk-1',
224+
},
225+
{
226+
Data: new TextEncoder().encode(JSON.stringify({ id: 2 })),
227+
PartitionKey: 'pk-2',
228+
},
229+
];
230+
231+
const params = {
232+
StreamName: dummyStreamName,
233+
Records: records,
234+
};
235+
236+
await client.send(new PutRecordsCommand(params));
237+
238+
const testSpans: ReadableSpan[] = getTestSpans();
239+
const putRecordsSpans: ReadableSpan[] = testSpans.filter(
240+
(s: ReadableSpan) => {
241+
return s.name === `${dummyStreamName} send`;
242+
}
243+
);
244+
expect(putRecordsSpans.length).toBe(1);
245+
const span = putRecordsSpans[0];
246+
expect(span.kind).toBe(SpanKind.PRODUCER);
247+
expect(
248+
span.attributes[AttributeNames.AWS_KINESIS_STREAM_NAME]
249+
).toBe(dummyStreamName);
250+
expect(span.attributes['messaging.system']).toBe('aws_kinesis');
251+
expect(span.attributes['messaging.destination.name']).toBe(
252+
dummyStreamName
253+
);
254+
255+
// Verify trace context was injected into all records
256+
for (const record of records) {
257+
const injectedData = JSON.parse(
258+
new TextDecoder().decode(record.Data)
259+
);
260+
expect(injectedData.traceparent).toBeDefined();
261+
expect(typeof injectedData.traceparent).toBe('string');
262+
}
263+
264+
// Verify original data is preserved
265+
const firstRecord = JSON.parse(
266+
new TextDecoder().decode(records[0].Data)
267+
);
268+
expect(firstRecord.id).toBe(1);
269+
const secondRecord = JSON.parse(
270+
new TextDecoder().decode(records[1].Data)
271+
);
272+
expect(secondRecord.id).toBe(2);
273+
});
274+
});
72275
});
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"ShardId": "shardId-000000000000", "SequenceNumber": "49654869303348141538950437146463849462533012903803486226"}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"Records": [{"ShardId": "shardId-000000000000", "SequenceNumber": "49654869303348141538950437146463849462533012903803486226"}], "FailedRecordCount": 0}

0 commit comments

Comments
 (0)