Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/queuePopulator.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require('../lib/tracing').init();

const async = require('async');
const schedule = require('node-schedule');

Expand Down Expand Up @@ -98,6 +100,6 @@ process.on('SIGTERM', () => {
});
process.exit(1);
}
process.exit(0);
require('../lib/tracing').close().finally(() => process.exit(0));
});
});
3 changes: 2 additions & 1 deletion extensions/gc/service.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../lib/tracing').init();

const { errors } = require('arsenal');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -101,6 +102,6 @@ initAndStart();
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
garbageCollector.close(() => {
process.exit(0);
require('../../lib/tracing').close().finally(() => process.exit(0));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to require it twice ?

});
});
3 changes: 2 additions & 1 deletion extensions/lifecycle/bucketProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/tracing').init();

const async = require('async');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -133,6 +134,6 @@ async.waterfall([
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
bucketProcessor.close(() => {
process.exit(0);
require('../../../lib/tracing').close().finally(() => process.exit(0));
});
});
44 changes: 43 additions & 1 deletion extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const {
startCircuitBreakerMetricsExport,
updateCircuitBreakerConfigForImplicitOutputQueue,
} = require('../../../lib/CircuitBreaker');
const { context: otelContext, trace, SpanKind, SpanStatusCode, ROOT_CONTEXT } =
require('@opentelemetry/api');
const { traceHeadersFromCurrentContext } =
require('../../../lib/tracing/kafkaTraceContext');
const { isEnabled } = require('../../../lib/tracing');

const DEFAULT_CRON_RULE = '* * * * *';
const DEFAULT_CONCURRENCY = 10;
Expand Down Expand Up @@ -340,7 +345,8 @@ class LifecycleConductor {
}

_taskToMessage(task, taskVersion, log) {
return {
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = {
message: JSON.stringify({
action: 'processObjects',
contextInfo: {
Expand All @@ -355,6 +361,8 @@ class LifecycleConductor {
details: {},
}),
};
if (headers) {kafkaEntry.headers = headers;}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (headers) {kafkaEntry.headers = headers;}
if (headers) {
kafkaEntry.headers = headers;
}

🙏

return kafkaEntry;
}

_getAccountIds(unknownCanonicalIds, log, cb) {
Expand Down Expand Up @@ -402,6 +410,40 @@ class LifecycleConductor {
}

processBuckets(cb) {
if (!isEnabled()) {
this._processBucketsInternal((err, res) => {
if (cb) {cb(err, res);}
});
return;
}
// Root INTERNAL trace per cron firing (no upstream parent); the
// in-process scan work (Mongo bucket listing) nests under it.
const tracer = trace.getTracer('backbeat');
const span = tracer.startSpan('lifecycle.conductor.scan', {
kind: SpanKind.INTERNAL,
}, ROOT_CONTEXT);
const ctx = trace.setSpan(ROOT_CONTEXT, span);
otelContext.with(ctx, () => {
try {
this._processBucketsInternal((err, res) => {
if (err) {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
span.end();
if (cb) {cb(err, res);}
});
} catch (err) {
// sync throw: end span (don't leak), then rethrow
span.recordException(err);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the same logic somewhere else, we should factorise

span.setStatus({ code: SpanStatusCode.ERROR });
span.end();
throw err;
}
});
Comment thread
delthas marked this conversation as resolved.
}
Comment thread
delthas marked this conversation as resolved.

_processBucketsInternal(cb) {
const log = this.logger.newRequestLogger();
const start = new Date();
let nBucketsQueued = 0;
Expand Down
3 changes: 2 additions & 1 deletion extensions/lifecycle/conductor/service.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/tracing').init();

const async = require('async');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -89,6 +90,6 @@ async.waterfall([
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
lcConductor.stop(() => {
process.exit(0);
require('../../../lib/tracing').close().finally(() => process.exit(0));
});
});
3 changes: 2 additions & 1 deletion extensions/lifecycle/objectProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/tracing').init();

const async = require('async');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -120,6 +121,6 @@ async.waterfall([
process.on('SIGTERM', () => {
logger.info('received SIGTERM, exiting');
objectProcessor.close(() => {
process.exit(0);
require('../../../lib/tracing').close().finally(() => process.exit(0));
});
});
11 changes: 9 additions & 2 deletions extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const ReplicationAPI = require('../../replication/ReplicationAPI');
const { LifecycleMetrics, LIFECYCLE_MARKER_METRICS_LOCATION } = require('../LifecycleMetrics');
const locationsConfig = require('../../../conf/locationConfig.json') || {};
const { rulesSupportTransition } = require('../util/rules');
const { traceHeadersFromCurrentContext } = require('../../../lib/tracing/kafkaTraceContext');
const { decode } = versioning.VersionID;

const errorTransitionInProgress = errors.InternalError.
Expand Down Expand Up @@ -121,7 +122,10 @@ class LifecycleTask extends BackbeatTask {
* @return {undefined}
*/
_sendBucketEntry(entry, cb) {
const entries = [{ message: JSON.stringify(entry) }];
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = { message: JSON.stringify(entry) };
if (headers) {kafkaEntry.headers = headers;}
const entries = [kafkaEntry];
this.producer.sendToTopic(this.bucketTasksTopic, entries, err => {
LifecycleMetrics.onKafkaPublish(null, 'BucketTopic', 'bucket', err, 1);
return cb(err);
Expand Down Expand Up @@ -183,7 +187,10 @@ class LifecycleTask extends BackbeatTask {
location,
Date.now() - entry.getAttribute('transitionTime'));

const entries = [{ message: entry.toKafkaMessage() }];
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = { message: entry.toKafkaMessage() };
if (headers) {kafkaEntry.headers = headers;}
const entries = [kafkaEntry];
this.producer.sendToTopic(this.objectTasksTopic, entries, err => {
LifecycleMetrics.onKafkaPublish(null, 'ObjectTopic', 'bucket', err, 1);
return cb(err);
Expand Down
6 changes: 5 additions & 1 deletion extensions/notification/NotificationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const messageUtil = require('./utils/message');
const notifConstants = require('./constants');
const QueuePopulatorExtension =
require('../../lib/queuePopulator/QueuePopulatorExtension');
const { traceHeadersFromEntry } = require('../../lib/tracing/kafkaTraceContext');

class NotificationQueuePopulator extends QueuePopulatorExtension {
/**
Expand Down Expand Up @@ -290,13 +291,16 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
eventTime: message.dateTime,
matchingConfig,
});
const traceHeaders = traceHeadersFromEntry(value);
this.publish(topic,
// keeping all messages for same object
// in the same partition to keep the order.
// here we use the object name and not the
// "_id" which also includes the versionId
`${bucket}/${message.key}`,
JSON.stringify(message));
JSON.stringify(message),
undefined,
traceHeaders);
// keep track of internal topics we have pushed to
pushedToTopic[topic] = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,19 @@ class KafkaNotificationDestination extends NotificationDestination {
*/
send(messages, done) {
const starTime = Date.now();
this._notificationProducer.send(messages, error => {
// Trust boundary: drop trace headers before producing to the external
// customer Kafka (no protocol-level ingress strip like nginx for HTTP).
const safeMessages = Array.isArray(messages)
Comment thread
delthas marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Array.isArray(messages) ?

? messages.map(m => {
if (m && m.headers) {
// eslint-disable-next-line no-unused-vars
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just delete m.headers ?

const { headers, ...rest } = m;
Comment thread
delthas marked this conversation as resolved.
return rest;
}
return m;
})
: messages;
this._notificationProducer.send(safeMessages, error => {
if (error) {
const { host, topic } = this._destinationConfig;
this._log.error('error in message delivery to external Kafka destination', {
Expand Down
4 changes: 3 additions & 1 deletion extensions/notification/queueProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
require('../../../lib/tracing').init();

const assert = require('assert');
const { errors } = require('arsenal');
const async = require('async');
Expand Down Expand Up @@ -108,7 +110,7 @@ process.on('SIGTERM', () => {
});
process.exit(1);
}
process.exit(0);
require('../../../lib/tracing').close().finally(() => process.exit(0));
});
});

3 changes: 3 additions & 0 deletions extensions/replication/ReplicationAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

const ActionQueueEntry = require('../../lib/models/ActionQueueEntry');
const ReplicationMetrics = require('./ReplicationMetrics');
const { traceHeadersFromCurrentContext } = require('../../lib/tracing/kafkaTraceContext');

let { dataMoverTopic } = config.extensions.replication;
const { coldStorageArchiveTopicPrefix } = config.extensions.lifecycle;
Expand Down Expand Up @@ -78,6 +79,8 @@
key: `${bucket}/${key}`,
message: action.toKafkaMessage(),
};
const traceHeaders = traceHeadersFromCurrentContext();
if (traceHeaders) kafkaEntry.headers = traceHeaders;

Check warning on line 83 in extensions/replication/ReplicationAPI.js

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick up. I think we don't allow that in our codebase ?

let topic = dataMoverTopic;
const toLocation = action.getAttribute('toLocation');
const locationConfig = locations[toLocation];
Expand Down
7 changes: 6 additions & 1 deletion extensions/replication/ReplicationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const QueuePopulatorExtension =
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const locationsConfig = require('../../conf/locationConfig.json') || {};
const safeJsonParse = require('../../lib/util/safeJsonParse');
const { traceHeadersFromEntry } = require('../../lib/tracing/kafkaTraceContext');

class ReplicationQueuePopulator extends QueuePopulatorExtension {
constructor(params) {
Expand Down Expand Up @@ -112,11 +113,15 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension {
const publishedEntry = Object.assign({}, entry);
delete publishedEntry.logReader;

const traceHeaders = traceHeadersFromEntry(value);

this.log.trace('publishing object replication entry',
{ entry: queueEntry.getLogInfo() });
this.publish(this.repConfig.topic,
`${queueEntry.getBucket()}/${queueEntry.getObjectKey()}`,
JSON.stringify(publishedEntry));
JSON.stringify(publishedEntry),
undefined,
traceHeaders);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
require('../../../lib/tracing').init();

const async = require('async');
const assert = require('assert');
const werelogs = require('werelogs');
Expand Down Expand Up @@ -340,6 +342,6 @@ process.on('SIGTERM', () => {
});
process.exit(1);
}
process.exit(0);
require('../../../lib/tracing').close().finally(() => process.exit(0));
});
});
2 changes: 2 additions & 0 deletions extensions/replication/replicationStatusProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/tracing').init();

const werelogs = require('werelogs');

Expand Down Expand Up @@ -70,5 +71,6 @@ process.on('SIGTERM', () => {
});
process.exit(1);
}
require('../../../lib/tracing').close().finally(() => process.exit(0));
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
});
});
Comment thread
delthas marked this conversation as resolved.
37 changes: 36 additions & 1 deletion lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const {
}
} = require('./constants');

Comment thread
delthas marked this conversation as resolved.
const { context: otelContext, SpanStatusCode } = require('@opentelemetry/api');
const { isEnabled } = require('./tracing');
const { startLinkedSpanFromKafkaEntry } = require('./tracing/kafkaTraceContext');

const CLIENT_ID = 'BackbeatConsumer';
const { withTopicPrefix } = require('./util/topic');

Expand Down Expand Up @@ -530,7 +534,38 @@ class BackbeatConsumer extends EventEmitter {
const { topic, partition } = entry;
KafkaBacklogMetrics.onTaskStarted(topic, partition, this._groupId);

this._queueProcessor(entry, (err, completionArgs) => done(err, completionArgs, finishProcessingTask));
// When OTEL is off, skip the span machinery entirely (no OTEL API on
Comment thread
delthas marked this conversation as resolved.
// the per-message hot path) and keep the original processing shape.
Comment on lines +537 to +538
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// When OTEL is off, skip the span machinery entirely (no OTEL API on
// the per-message hot path) and keep the original processing shape.

The condition is pretty explicit ?

if (!isEnabled()) {
this._queueProcessor(entry, (err, completionArgs) =>
done(err, completionArgs, finishProcessingTask));
return;
}

Comment thread
delthas marked this conversation as resolved.
const { ctx, span } = startLinkedSpanFromKafkaEntry(entry, `${topic}.process`);
span.setAttribute('messaging.system', 'kafka');
span.setAttribute('messaging.destination.name', topic);
span.setAttribute('messaging.destination.partition.id', `${partition}`);
span.setAttribute('messaging.consumer.group.name', this._groupId);

Comment thread
delthas marked this conversation as resolved.
otelContext.with(ctx, () => {
try {
this._queueProcessor(entry, (err, completionArgs) => {
if (err) {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
span.end();
done(err, completionArgs, finishProcessingTask);
});
} catch (err) {
// sync throw before the callback fired: end span (don't leak), then rethrow
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
span.end();
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
throw err;
}
});
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
}

/**
Expand Down
3 changes: 2 additions & 1 deletion lib/BackbeatProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ class BackbeatProducer extends EventEmitter {
Buffer.from(item.message), // value
item.key, // key (for keyed partitioning)
Date.now(), // timestamp
sendCtx // opaque
sendCtx, // opaque
item.headers || undefined // Kafka message headers
);
});
} catch (err) {
Expand Down
5 changes: 4 additions & 1 deletion lib/queuePopulator/QueuePopulatorExtension.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class QueuePopulatorExtension {
* @param {Object} [optEntriesToPublish] - optional batch
* @return {undefined}
*/
publish(topic, key, message, optEntriesToPublish) {
publish(topic, key, message, optEntriesToPublish, headers) {
let __batch;
if (optEntriesToPublish) {
__batch = optEntriesToPublish;
Expand All @@ -81,6 +81,9 @@ class QueuePopulatorExtension {
'synchronously from the filter() method.');

const kafkaEntry = { key: encodeURIComponent(key), message };
if (headers) {
kafkaEntry.headers = headers;
}
this.log.trace('queueing kafka entry to topic',
{ key: kafkaEntry.key, topic });
if (__batch[topic] === undefined) {
Expand Down
Loading
Loading