Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 53 additions & 36 deletions packages/sampler-aws-xray/src/aws-xray-sampling-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ export class AWSXRaySamplingClient {
public fetchSamplingTargets(
requestBody: GetSamplingTargetsBody,
callback: (responseObject: GetSamplingTargetsResponse) => void
) {
this.makeSamplingRequest<GetSamplingTargetsResponse>(
): Promise<void> {
return this.makeSamplingRequest<GetSamplingTargetsResponse>(
this.samplingTargetsEndpoint,
callback,
(message: string) => this.samplerDiag.debug(message),
Expand All @@ -52,8 +52,8 @@ export class AWSXRaySamplingClient {

public fetchSamplingRules(
callback: (responseObject: GetSamplingRulesResponse) => void
) {
this.makeSamplingRequest<GetSamplingRulesResponse>(
): Promise<void> {
return this.makeSamplingRequest<GetSamplingRulesResponse>(
this.getSamplingRulesEndpoint,
callback,
(message: string) => this.samplerDiag.error(message)
Expand All @@ -65,7 +65,7 @@ export class AWSXRaySamplingClient {
callback: (responseObject: T) => void,
logger: DiagLogFunction,
requestBodyJsonString?: string
): void {
): Promise<void> {
const options: http.RequestOptions = {
method: 'POST',
headers: {},
Expand All @@ -78,41 +78,58 @@ export class AWSXRaySamplingClient {
};
}

// Ensure AWS X-Ray Sampler does not generate traces itself
context.with(suppressTracing(context.active()), () => {
const req: http.ClientRequest = http
.request(url, options, response => {
response.setEncoding('utf-8');
let responseData = '';
response.on('data', dataChunk => (responseData += dataChunk));
response.on('end', () => {
if (response.statusCode === 200 && responseData.length > 0) {
let responseObject: T | undefined = undefined;
return new Promise<void>(resolve => {
// Ensure AWS X-Ray Sampler does not generate traces itself
context.with(suppressTracing(context.active()), () => {
const req: http.ClientRequest = http
.request(url, options, response => {
response.setEncoding('utf-8');
let responseData = '';
response.on('data', dataChunk => (responseData += dataChunk));
response.on('end', () => {
try {
responseObject = JSON.parse(responseData) as T;
} catch (e: unknown) {
logger(`Error occurred when parsing responseData from ${url}`);
}
if (response.statusCode === 200 && responseData.length > 0) {
let responseObject: T | undefined = undefined;
try {
responseObject = JSON.parse(responseData) as T;
} catch (error: unknown) {
logger(
`Error occurred when parsing responseData from ${url}`,
error
);
}

if (responseObject) {
callback(responseObject);
if (responseObject) {
callback(responseObject);
}
} else {
this.samplerDiag.debug(
`${url} Response Code is: ${response.statusCode}`
);
this.samplerDiag.debug(
`${url} responseData is: ${responseData}`
);
}
} catch (error: unknown) {
logger(
`Error occurred when processing response from ${url}`,
error
);
} finally {
resolve();
}
} else {
this.samplerDiag.debug(
`${url} Response Code is: ${response.statusCode}`
);
this.samplerDiag.debug(`${url} responseData is: ${responseData}`);
}
});
})
.on('error', (error: unknown) => {
logger(`Error occurred when making an HTTP POST to ${url}`, error);
resolve();
});
})
.on('error', (error: unknown) => {
logger(`Error occurred when making an HTTP POST to ${url}: ${error}`);
});
if (requestBodyJsonString) {
req.end(requestBodyJsonString);
} else {
req.end();
}
if (requestBodyJsonString) {
req.end(requestBodyJsonString);
} else {
req.end();
}
});
});
}
}
33 changes: 22 additions & 11 deletions packages/sampler-aws-xray/src/remote-sampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ export class _AWSXRayRemoteSampler implements Sampler {
private rulePollingJitterMillis: number;
private targetPollingJitterMillis: number;
private samplingClient: AWSXRaySamplingClient;
/** Promise that resolves once the initial sampling rules fetch completes. */
public readonly initialRulesFetch: Promise<void> = Promise.resolve();

constructor(samplerConfig: AWSXRayRemoteSamplerConfig) {
this.samplerDiag = diag.createComponentLogger({
Expand Down Expand Up @@ -150,7 +152,7 @@ export class _AWSXRayRemoteSampler implements Sampler {
);

// Start the Sampling Rules poller
this.startSamplingRulesPoller();
this.initialRulesFetch = this.startSamplingRulesPoller();

// Start the Sampling Targets poller where the first poll occurs after the default interval
this.startSamplingTargetsPoller();
Expand Down Expand Up @@ -226,40 +228,49 @@ export class _AWSXRayRemoteSampler implements Sampler {
clearInterval(this.targetPoller);
}

private startSamplingRulesPoller(): void {
// Execute first update
this.getAndUpdateSamplingRules();
private startSamplingRulesPoller(): Promise<void> {
// Execute first update and return its promise
const firstFetch = this.getAndUpdateSamplingRules();
// Update sampling rules every 5 minutes (or user-defined polling interval)
this.rulePoller = setInterval(
() => this.getAndUpdateSamplingRules(),
() =>
this.getAndUpdateSamplingRules().catch((e: unknown) =>
this.samplerDiag.error('Error refreshing sampling rules', e)
),
this.rulePollingIntervalMillis + this.rulePollingJitterMillis
);
this.rulePoller.unref();
return firstFetch;
}

private startSamplingTargetsPoller(): void {
// Update sampling targets every targetPollingInterval (usually 10 seconds)
this.targetPoller = setInterval(
() => this.getAndUpdateSamplingTargets(),
() =>
this.getAndUpdateSamplingTargets().catch((e: unknown) =>
this.samplerDiag.error('Error refreshing sampling targets', e)
),
this.targetPollingInterval * 1000 + this.targetPollingJitterMillis
);
this.targetPoller.unref();
}

private getAndUpdateSamplingTargets(): void {
private getAndUpdateSamplingTargets(): Promise<void> {
const requestBody: GetSamplingTargetsBody = {
SamplingStatisticsDocuments:
this.ruleCache.createSamplingStatisticsDocuments(this.clientId),
};

this.samplingClient.fetchSamplingTargets(
return this.samplingClient.fetchSamplingTargets(
requestBody,
this.updateSamplingTargets.bind(this)
);
}

private getAndUpdateSamplingRules(): void {
this.samplingClient.fetchSamplingRules(this.updateSamplingRules.bind(this));
private getAndUpdateSamplingRules(): Promise<void> {
return this.samplingClient.fetchSamplingRules(
this.updateSamplingRules.bind(this)
);
}

private updateSamplingRules(responseObject: GetSamplingRulesResponse): void {
Expand Down Expand Up @@ -312,7 +323,7 @@ export class _AWSXRayRemoteSampler implements Sampler {
'Performing out-of-band sampling rule polling to fetch updated rules.'
);
clearInterval(this.rulePoller);
this.startSamplingRulesPoller();
void this.startSamplingRulesPoller();
}
} catch (error: unknown) {
this.samplerDiag.debug('Error occurred when updating Sampling Targets');
Expand Down
Loading
Loading