Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion yarn-project/telemetry-client/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const telemetryClientConfigMappings: ConfigMappingsType<TelemetryClientCo
otelBspMaxQueueSize: {
env: 'OTEL_BSP_MAX_QUEUE_SIZE',
description: 'The maximum number of completed spans to queue before export.',
...numberConfigHelper(2048),
...numberConfigHelper(16384),
},
otelIncludeMetrics: {
env: 'OTEL_INCLUDE_METRICS',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { jest } from '@jest/globals';
import { SpanKind, SpanStatusCode } from '@opentelemetry/api';
import { ExportResultCode } from '@opentelemetry/core';
import type { ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-node';
Expand All @@ -17,7 +18,7 @@ class CollectingSpanExporter implements SpanExporter {
}
}

const log = { warn: () => {} } as any;
const makeLog = () => ({ warn: jest.fn() }) as any;

function makeSpan(durationMs: number, statusCode = SpanStatusCode.OK): ReadableSpan {
const seconds = Math.floor(durationMs / 1000);
Expand Down Expand Up @@ -45,7 +46,7 @@ function makeSpan(durationMs: number, statusCode = SpanStatusCode.OK): ReadableS
describe('MonitoredBatchSpanProcessor', () => {
it('does not export successful spans shorter than the configured duration', async () => {
const exporter = new CollectingSpanExporter();
const processor = new MonitoredBatchSpanProcessor(exporter, log, { minTraceDurationMs: 10 });
const processor = new MonitoredBatchSpanProcessor(exporter, makeLog(), { minTraceDurationMs: 10 });

processor.onEnd(makeSpan(9));
processor.onEnd(makeSpan(10));
Expand All @@ -56,7 +57,7 @@ describe('MonitoredBatchSpanProcessor', () => {

it('exports short error spans', async () => {
const exporter = new CollectingSpanExporter();
const processor = new MonitoredBatchSpanProcessor(exporter, log, { minTraceDurationMs: 10 });
const processor = new MonitoredBatchSpanProcessor(exporter, makeLog(), { minTraceDurationMs: 10 });

processor.onEnd(makeSpan(1, SpanStatusCode.ERROR));
await processor.forceFlush();
Expand All @@ -66,11 +67,37 @@ describe('MonitoredBatchSpanProcessor', () => {

it('allows short successful spans when the minimum duration is disabled', async () => {
const exporter = new CollectingSpanExporter();
const processor = new MonitoredBatchSpanProcessor(exporter, log, { minTraceDurationMs: 0 });
const processor = new MonitoredBatchSpanProcessor(exporter, makeLog(), { minTraceDurationMs: 0 });

processor.onEnd(makeSpan(1));
await processor.forceFlush();

expect(exporter.spans.map(span => span.name)).toEqual(['span-1']);
});

it('warns when the queue fills up', () => {
const log = makeLog();
const processor = new MonitoredBatchSpanProcessor(new CollectingSpanExporter(), log, {
maxQueueSize: 2,
minTraceDurationMs: 0,
});

processor.onEnd(makeSpan(1));
processor.onEnd(makeSpan(1));
expect(log.warn).not.toHaveBeenCalled();

processor.onEnd(makeSpan(1));
expect(log.warn).toHaveBeenCalledWith(expect.stringContaining('queue full'), expect.anything());
});

it('does not drop spans below the previous 2048 default with the larger default queue', () => {
const log = makeLog();
const processor = new MonitoredBatchSpanProcessor(new CollectingSpanExporter(), log, { minTraceDurationMs: 0 });

for (let i = 0; i < 2049; i++) {
processor.onEnd(makeSpan(1));
}

expect(log.warn).not.toHaveBeenCalled();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ const DROP_WARNING_INTERVAL_MS = 30_000;

const DEFAULT_MIN_TRACE_DURATION_MS = 10;

const DEFAULT_MAX_QUEUE_SIZE = 16384;

/** Cap on the per-export batch size, so a large queue can actually be drained instead of dribbling out
* at the SDK default of 512 spans per scheduled export. Kept <= maxQueueSize per the BatchSpanProcessor contract. */
const DEFAULT_MAX_EXPORT_BATCH_SIZE = 2048;

export type MonitoredBatchSpanProcessorConfig = BufferConfig & {
minTraceDurationMs?: number;
};
Expand All @@ -30,8 +36,9 @@ export class MonitoredBatchSpanProcessor extends BatchSpanProcessor {
private lastWarningTime = 0;

constructor(exporter: SpanExporter, log: Logger, config?: MonitoredBatchSpanProcessorConfig) {
const maxQueueSize = config?.maxQueueSize ?? 2048;
super(exporter, { ...config, maxQueueSize });
const maxQueueSize = config?.maxQueueSize ?? DEFAULT_MAX_QUEUE_SIZE;
const maxExportBatchSize = Math.min(config?.maxExportBatchSize ?? DEFAULT_MAX_EXPORT_BATCH_SIZE, maxQueueSize);
super(exporter, { ...config, maxQueueSize, maxExportBatchSize });
this.maxQueueSize = maxQueueSize;
this.minTraceDurationMs = Math.max(0, config?.minTraceDurationMs ?? DEFAULT_MIN_TRACE_DURATION_MS);
this.log = log;
Expand Down
50 changes: 50 additions & 0 deletions yarn-project/telemetry-client/src/otel.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { createLogger } from '@aztec/foundation/log';

import { jest } from '@jest/globals';

import { OpenTelemetryClient } from './otel.js';

describe('OpenTelemetryClient', () => {
const makeProvider = () => ({
forceFlush: jest.fn(() => Promise.resolve()),
shutdown: jest.fn(() => Promise.resolve()),
});

const makeClient = (meterProvider: any, loggerProvider: any) =>
// The constructor is protected; the prover-node/aztec-node share a single client, which is the case we exercise.
new (OpenTelemetryClient as any)(
{ attributes: {} },
meterProvider,
{ getTracer: () => ({}) },
loggerProvider,
undefined,
createLogger('test'),
) as OpenTelemetryClient;

it('only shuts the providers down once when stop is called multiple times', async () => {
const meterProvider = makeProvider();
const loggerProvider = makeProvider();
const client = makeClient(meterProvider, loggerProvider);

await Promise.all([client.stop(), client.stop()]);
await client.stop();

expect(meterProvider.shutdown).toHaveBeenCalledTimes(1);
expect(loggerProvider.shutdown).toHaveBeenCalledTimes(1);
});

it('does not force flush after the client has been stopped', async () => {
const meterProvider = makeProvider();
const loggerProvider = makeProvider();
const client = makeClient(meterProvider, loggerProvider);

await client.stop();
meterProvider.forceFlush.mockClear();
loggerProvider.forceFlush.mockClear();

await client.flush();

expect(meterProvider.forceFlush).not.toHaveBeenCalled();
expect(loggerProvider.forceFlush).not.toHaveBeenCalled();
});
});
15 changes: 14 additions & 1 deletion yarn-project/telemetry-client/src/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ export class OpenTelemetryClient implements TelemetryClient {
private meters: Map<string, WrappedMeter> = new Map<string, WrappedMeter>();
private tracers: Map<string, Tracer> = new Map<string, Tracer>();

/** Memoized shutdown promise. The telemetry client is shared between the aztec-node and an embedded prover-node,
* so stop() can be invoked more than once; the providers throw "shutdown may only be called once" and
* "invalid attempt to force flush after shutdown" if that happens. Guarding here makes stop()/flush() idempotent. */
private stopPromise: Promise<void> | undefined;

protected constructor(
private resource: IResource,
private meterProvider: MeterProvider,
Expand Down Expand Up @@ -169,14 +174,22 @@ export class OpenTelemetryClient implements TelemetryClient {
}

public async flush() {
// Flushing after the providers have been shut down throws "invalid attempt to force flush after shutdown".
if (this.stopPromise) {
return;
}
await Promise.all([
this.meterProvider.forceFlush(),
this.loggerProvider?.forceFlush(),
this.traceProvider instanceof NodeTracerProvider ? this.traceProvider.forceFlush() : Promise.resolve(),
]);
}

public async stop() {
public stop() {
return (this.stopPromise ??= this.doStop());
}

private async doStop() {
this.nodejsMetricsMonitor?.stop();

const flushAndShutdown = async (provider?: { forceFlush: () => Promise<void>; shutdown: () => Promise<void> }) => {
Expand Down
Loading