From 31a62ac89187572ca57cb0193d889249740303de Mon Sep 17 00:00:00 2001 From: Thomas Flament Date: Mon, 1 Jun 2026 15:21:03 +0200 Subject: [PATCH 1/2] oplogPopulator: synthesise oplog key via $addFields in connector pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The current Kafka message key projects fullDocument.value.key, which is null on update events since BB-355 removed change.stream.full.document=updateLookup. Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:}, fullDocument:null}) and lands on one partition, breaking per-object ordering across op types (insert vs update) and pinning a hot bucket's update traffic to a single partition (a blocker for the oplog scaling work tracked in BB-756). Fix: insert a $addFields stage into the MongoDB connector's change-stream pipeline (right after $match, before the conditional location-strip $set) that synthesises a top-level 'key' field by $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key. The connector's output.schema.key replaces the broken fullDocument nested record with a sibling 'key' field; the existing 'ns:{coll}' projection is preserved, so the Kafka message key remains {ns:{coll:}, key:} — bucket-level isolation is unchanged. All events for the same logical S3 object (insert, master/version updates, replication-status updates, delete-marker updates) yield the same key value and hash to the same partition. Master and version documents share the same value.key, so master/version events also collapse to the same partition without prefix-stripping. The $ifNull variant relies on metadata always writing the whole 'value' subdocument (full $set). Confirmed against arsenal: there is no partial dotted-$set path for object MD today. A hypothetical future partial $set would not populate updateDescription.updatedFields.value.key and that event would mis-partition — accepted risk per the ticket discussion. The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + the added $addFields stage only, not the $match stage). Downstream oplog consumers do not read the Kafka message key, so the new key shape is consumer-transparent. History / discussion: https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122 This supersedes the earlier SMT-based approach (a Kafka Connect Single Message Transform deriving the key from documentKey._id with master/version prefix-stripping), which would have spanned three repos and added a new Java artifact + operator feature flag. Closed in favour of this single-repo fix after a per-event-cost measurement showed the $addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on the mongod — not a throughput concern at our target rates. Superseded work (to be closed): https://github.com/scality/backbeat/pull/2741 (SMT-track Backbeat PR) https://github.com/scality/Zenko/pull/2410 (ZENKO-5274 — Java SMT in kafka-connect image) https://scality.atlassian.net/browse/ZKOP-553 (operator feature flag — no longer needed) Issue: BB-768 --- extensions/oplogPopulator/constants.js | 22 +-------- .../pipeline/PipelineFactory.js | 15 +++++++ .../oplogPopulator/PipelineFactory.js | 45 ++++++++++++++++++- .../unit/oplogPopulator/ConnectorsManager.js | 18 +------- .../MultipleBucketsPipelineFactory.js | 35 +++++++++++++-- .../pipeline/WildcardPipelineFactory.js | 32 ++++++++++--- 6 files changed, 120 insertions(+), 47 deletions(-) diff --git a/extensions/oplogPopulator/constants.js b/extensions/oplogPopulator/constants.js index 63b3aeb9b..e397ff269 100644 --- a/extensions/oplogPopulator/constants.js +++ b/extensions/oplogPopulator/constants.js @@ -23,10 +23,6 @@ const constants = { 'output.format.value': 'json', 'value.converter.schemas.enable': false, 'value.converter': 'org.apache.kafka.connect.storage.StringConverter', - // Kafka message key config - // The message key is set to only contain the bucket where the event happend. - // This will make events of the same bucket always land in the same partition - // as they will have the same key 'output.format.key': 'schema', 'output.schema.key': JSON.stringify({ type: 'record', @@ -42,22 +38,8 @@ const constants = { }], }, 'null'], }, { - name: 'fullDocument', - type: [{ - type: 'record', - name: 'fullDocumentRecord', - fields: [{ - name: 'value', - type: [{ - type: 'record', - name: 'valueRecord', - fields: [{ - name: 'key', - type: ['string', 'null'], - }], - }, 'null'], - }], - }, 'null'], + name: 'key', + type: ['string', 'null'], }], }), }, diff --git a/extensions/oplogPopulator/pipeline/PipelineFactory.js b/extensions/oplogPopulator/pipeline/PipelineFactory.js index 0dd2480d9..140f28386 100644 --- a/extensions/oplogPopulator/pipeline/PipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/PipelineFactory.js @@ -70,6 +70,21 @@ class PipelineFactory { } const pipeline = [ stage, + // Synthesise a top-level 'key' from fullDocument.value.key + // (insert/replace) or updateDescription.updatedFields.value.key + // (update). Relies on object MD writes always $set-ing the whole + // 'value' subdocument; a partial dotted $set would mis-partition. + // See BB-768 (superseded SMT alternative in PR #2741). + { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }, ]; if (this._locationStrippingBytesThreshold > 0) { pipeline.push({ diff --git a/tests/functional/oplogPopulator/PipelineFactory.js b/tests/functional/oplogPopulator/PipelineFactory.js index 5fa35b2a6..3f014cdfb 100644 --- a/tests/functional/oplogPopulator/PipelineFactory.js +++ b/tests/functional/oplogPopulator/PipelineFactory.js @@ -19,6 +19,7 @@ describe('PipelineFactory', function () { const collectionName = 'test-pipeline-stripping'; let collection; let setStage; + let addFieldsStage; before(async () => { await client.connect(); @@ -26,7 +27,8 @@ describe('PipelineFactory', function () { const factory = new MultipleBucketsPipelineFactory(THRESHOLD); const pipeline = JSON.parse(factory.getPipeline(['test-bucket'])); - setStage = pipeline[1]; + addFieldsStage = pipeline[1]; + setStage = pipeline[2]; }); afterEach(async () => { @@ -147,4 +149,45 @@ describe('PipelineFactory', function () { ); }); }); + + describe('key synthesis ($addFields)', () => { + it('should populate key from fullDocument.value.key on insert-shaped docs', async () => { + const doc = { fullDocument: { value: { key: 'my/object' } } }; + await collection.insertOne(doc); + const results = await collection.aggregate([addFieldsStage]).toArray(); + assert.strictEqual(results.length, 1); + assert.strictEqual(results[0].key, 'my/object'); + }); + + it('should populate key from updateDescription.updatedFields.value.key on update-shaped docs', async () => { + const doc = { updateDescription: { updatedFields: { value: { key: 'my/object' } } } }; + await collection.insertOne(doc); + const results = await collection.aggregate([addFieldsStage]).toArray(); + assert.strictEqual(results.length, 1); + assert.strictEqual(results[0].key, 'my/object'); + }); + + it('should prefer fullDocument.value.key when both are present', async () => { + const doc = { + fullDocument: { value: { key: 'from-full' } }, + updateDescription: { updatedFields: { value: { key: 'from-update' } } }, + }; + await collection.insertOne(doc); + const results = await collection.aggregate([addFieldsStage]).toArray(); + assert.strictEqual(results[0].key, 'from-full'); + }); + + it('should leave key absent when neither path is populated', async () => { + // $ifNull returns missing (not null) when all inputs are missing. + // The connector's nullable Avro key schema emits this as null on + // the wire, so the partition outcome is the same as an explicit + // null. In production deletes are ignored, so this case doesn't + // occur for consumed events. + const doc = { fullDocument: null }; + await collection.insertOne(doc); + const results = await collection.aggregate([addFieldsStage]).toArray(); + assert.strictEqual(results.length, 1); + assert.strictEqual(results[0].key, undefined); + }); + }); }); diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 6c81e8666..53835d693 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -44,22 +44,8 @@ const connectorConfig = { }], }, 'null'], }, { - name: 'fullDocument', - type: [{ - type: 'record', - name: 'fullDocumentRecord', - fields: [{ - name: 'value', - type: [{ - type: 'record', - name: 'valueRecord', - fields: [{ - name: 'key', - type: ['string', 'null'], - }], - }, 'null'], - }], - }, 'null'], + name: 'key', + type: ['string', 'null'], }], }), 'heartbeat.interval.ms': 10000, diff --git a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index 40d9b0012..836a28875 100644 --- a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -46,21 +46,31 @@ describe('MultipleBucketsPipelineFactory', () => { assert.strictEqual(result, '[]'); }); - it('should return the pipeline with buckets and location stripping', () => { + it('should return the pipeline with buckets, key synthesis and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = multipleBucketsPipelineFactory.getPipeline(buckets); const pipeline = JSON.parse(result); - assert.strictEqual(pipeline.length, 2); + assert.strictEqual(pipeline.length, 3); assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}}); - assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], { + assert.deepStrictEqual(pipeline[1], { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }); + assert.deepStrictEqual(pipeline[2].$set['fullDocument.value.location'], { $cond: { if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] }, then: '$$REMOVE', else: '$fullDocument.value.location', }, }); - assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], { + assert.deepStrictEqual(pipeline[2].$set['updateDescription.updatedFields.value.location'], { $cond: { if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] }, then: '$$REMOVE', @@ -68,6 +78,23 @@ describe('MultipleBucketsPipelineFactory', () => { }, }); }); + + it('should return the pipeline with key synthesis when location stripping is disabled', () => { + const factory = new MultipleBucketsPipelineFactory(0); + const pipeline = JSON.parse(factory.getPipeline(['bucket1'])); + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1']}}}); + assert.deepStrictEqual(pipeline[1], { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }); + }); }); describe('getOldConnectorBucketList', () => { diff --git a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js index 34d849561..22fdf6003 100644 --- a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -33,21 +33,31 @@ describe('WildcardPipelineFactory', () => { }); describe('getPipeline', () => { - it('should return the pipeline with buckets and location stripping', () => { + it('should return the pipeline with buckets, key synthesis and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = wildcardPipelineFactory.getPipeline(buckets); const pipeline = JSON.parse(result); - assert.strictEqual(pipeline.length, 2); + assert.strictEqual(pipeline.length, 3); assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}}); - assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], { + assert.deepStrictEqual(pipeline[1], { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }); + assert.deepStrictEqual(pipeline[2].$set['fullDocument.value.location'], { $cond: { if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] }, then: '$$REMOVE', else: '$fullDocument.value.location', }, }); - assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], { + assert.deepStrictEqual(pipeline[2].$set['updateDescription.updatedFields.value.location'], { $cond: { if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] }, then: '$$REMOVE', @@ -56,14 +66,24 @@ describe('WildcardPipelineFactory', () => { }); }); - it('should return the pipeline with buckets and no location stripping if disabled', () => { + it('should return the pipeline with key synthesis and no location stripping if disabled', () => { const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(0); const buckets = ['bucket1', 'bucket2']; const result = wildcardPipelineFactoryNoStripping.getPipeline(buckets); const pipeline = JSON.parse(result); - assert.strictEqual(pipeline.length, 1); + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[1], { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }); }); }); From f53649e8f135d002e3560a26d07258dbcf0912b5 Mon Sep 17 00:00:00 2001 From: Thomas Flament Date: Mon, 1 Jun 2026 15:21:16 +0200 Subject: [PATCH 2/2] queuePopulator: counter for oplog events processed without a synthesised key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a Prometheus counter `s3_oplog_event_missing_key_total` (labelled by operationType) that increments when a consumed oplog event reaches downstream processing in ListRecordStream but lacks the top-level 'key' field synthesised by the connector pipeline in the previous commit. Should stay at zero in steady-state. A non-zero rate signals a regression — e.g. a future write path on object MD that doesn't $set the whole 'value' subdocument and so bypasses the $ifNull coalesce. Wired as a new `KafkaLogConsumerMetrics` module mirroring the LifecycleMetrics pattern (static class, per-method try/catch + handleError so a prometheus-side failure can't propagate into the oplog read path). Per-PR review feedback. Issue: BB-768 --- .../KafkaLogConsumerMetrics.js | 38 +++++++++++++++++ .../KafkaLogConsumer/ListRecordStream.js | 5 +++ .../KafkaLogConsumerMetrics.js | 41 +++++++++++++++++++ .../kafkaLogConsumer/ListRecordStream.js | 29 +++++++++++++ 4 files changed, 113 insertions(+) create mode 100644 lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js create mode 100644 tests/unit/lib/queuePopulator/kafkaLogConsumer/KafkaLogConsumerMetrics.js diff --git a/lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js b/lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js new file mode 100644 index 000000000..c00d14dd5 --- /dev/null +++ b/lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js @@ -0,0 +1,38 @@ +const { ZenkoMetrics } = require('arsenal').metrics; + +// Counts consumed oplog events that downstream queue populators will +// process but that lack a top-level 'key' field — i.e., the synthesised +// partitioning key produced by the connector pipeline (see BB-768). +// Should stay at zero in steady-state; a non-zero rate signals that +// metadata grew a write path that doesn't $set the whole 'value' +// subdocument, regressing the per-object partitioning fix. +const oplogEventMissingKey = ZenkoMetrics.createCounter({ + name: 's3_oplog_event_missing_key_total', + help: 'Total number of oplog events processed by queue populators ' + + 'with the synthesised top-level "key" field missing or null', + labelNames: ['opType'], +}); + +class KafkaLogConsumerMetrics { + static onMissingKey(log, opType) { + try { + oplogEventMissingKey.inc({ + opType: opType || 'unknown', + }); + } catch (err) { + KafkaLogConsumerMetrics.handleError( + log, err, 'KafkaLogConsumerMetrics.onMissingKey'); + } + } + + static handleError(log, err, method) { + if (log && log.error) { + log.error('failed to update prometheus metrics', { + method, + error: err.message, + }); + } + } +} + +module.exports = KafkaLogConsumerMetrics; diff --git a/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js b/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js index bc5e65959..aef757212 100644 --- a/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js +++ b/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js @@ -1,4 +1,5 @@ const stream = require('stream'); +const KafkaLogConsumerMetrics = require('./KafkaLogConsumerMetrics'); class ListRecordStream extends stream.Transform { /** @@ -122,6 +123,10 @@ class ListRecordStream extends stream.Transform { // skipping empty events return callback(null, null); } + if (changeStreamDocument.key === null || changeStreamDocument.key === undefined) { + KafkaLogConsumerMetrics.onMissingKey( + this._logger, changeStreamDocument.operationType); + } const opType = this._getType(changeStreamDocument.operationType, objectMd); const streamObject = { // timestamp of the kafka message diff --git a/tests/unit/lib/queuePopulator/kafkaLogConsumer/KafkaLogConsumerMetrics.js b/tests/unit/lib/queuePopulator/kafkaLogConsumer/KafkaLogConsumerMetrics.js new file mode 100644 index 000000000..7fde70998 --- /dev/null +++ b/tests/unit/lib/queuePopulator/kafkaLogConsumer/KafkaLogConsumerMetrics.js @@ -0,0 +1,41 @@ +const assert = require('assert'); +const sinon = require('sinon'); +const { ZenkoMetrics } = require('arsenal').metrics; +const KafkaLogConsumerMetrics = + require('../../../../../lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics'); + +describe('KafkaLogConsumerMetrics', () => { + const log = { error: sinon.stub() }; + + afterEach(() => { + sinon.restore(); + log.error.resetHistory(); + }); + + describe('onMissingKey', () => { + it('should increment the s3_oplog_event_missing_key_total counter', () => { + const metric = ZenkoMetrics.getMetric('s3_oplog_event_missing_key_total'); + const incStub = sinon.stub(metric, 'inc'); + KafkaLogConsumerMetrics.onMissingKey(log, 'update'); + assert(incStub.calledOnceWith({ opType: 'update' })); + assert(log.error.notCalled); + }); + + it('should label as "unknown" when opType is missing', () => { + const metric = ZenkoMetrics.getMetric('s3_oplog_event_missing_key_total'); + const incStub = sinon.stub(metric, 'inc'); + KafkaLogConsumerMetrics.onMissingKey(log, undefined); + assert(incStub.calledOnceWith({ opType: 'unknown' })); + }); + + it('should swallow + log errors from inc', () => { + const metric = ZenkoMetrics.getMetric('s3_oplog_event_missing_key_total'); + sinon.stub(metric, 'inc').throws(new Error('boom')); + assert.doesNotThrow(() => KafkaLogConsumerMetrics.onMissingKey(log, 'insert')); + assert(log.error.calledOnce); + assert(log.error.calledWithMatch('failed to update prometheus metrics', { + method: 'KafkaLogConsumerMetrics.onMissingKey', + })); + }); + }); +}); diff --git a/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js b/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js index 5cebb2042..eef7194a6 100644 --- a/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js +++ b/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js @@ -1,8 +1,11 @@ const assert = require('assert'); +const sinon = require('sinon'); const werelogs = require('werelogs'); const logger = new werelogs.Logger('ListRecordStream'); const ListRecordStream = require('../../../../../lib/queuePopulator/KafkaLogConsumer/ListRecordStream'); +const KafkaLogConsumerMetrics = + require('../../../../../lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics'); const changeStreamDocument = { ns: { @@ -159,6 +162,32 @@ describe('ListRecordStream', () => { }); }); + it('should call onMissingKey when key is absent on a processed event', done => { + // changeStreamDocument has no top-level 'key' field — emulates an + // event that slipped past the $addFields stage. Should still be + // processed (objectMd is present) but the metric helper should fire. + const stub = sinon.stub(KafkaLogConsumerMetrics, 'onMissingKey'); + const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument)); + listRecordStream.write(kafkaMessage); + listRecordStream.once('data', () => { + assert(stub.calledOnceWith(logger, 'insert')); + stub.restore(); + return done(); + }); + }); + + it('should NOT call onMissingKey when key is present', done => { + const stub = sinon.stub(KafkaLogConsumerMetrics, 'onMissingKey'); + const doc = { ...changeStreamDocument, key: 'example-key' }; + const kafkaMessage = getKafkaMessage(JSON.stringify(doc)); + listRecordStream.write(kafkaMessage); + listRecordStream.once('data', () => { + assert(stub.notCalled); + stub.restore(); + return done(); + }); + }); + it('should skip empty record', done => { const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument)); const EmptyKafkaMessage = getKafkaMessage('{}');