diff --git a/extensions/oplogPopulator/constants.js b/extensions/oplogPopulator/constants.js index 63b3aeb9b..e397ff269 100644 --- a/extensions/oplogPopulator/constants.js +++ b/extensions/oplogPopulator/constants.js @@ -23,10 +23,6 @@ const constants = { 'output.format.value': 'json', 'value.converter.schemas.enable': false, 'value.converter': 'org.apache.kafka.connect.storage.StringConverter', - // Kafka message key config - // The message key is set to only contain the bucket where the event happend. - // This will make events of the same bucket always land in the same partition - // as they will have the same key 'output.format.key': 'schema', 'output.schema.key': JSON.stringify({ type: 'record', @@ -42,22 +38,8 @@ const constants = { }], }, 'null'], }, { - name: 'fullDocument', - type: [{ - type: 'record', - name: 'fullDocumentRecord', - fields: [{ - name: 'value', - type: [{ - type: 'record', - name: 'valueRecord', - fields: [{ - name: 'key', - type: ['string', 'null'], - }], - }, 'null'], - }], - }, 'null'], + name: 'key', + type: ['string', 'null'], }], }), }, diff --git a/extensions/oplogPopulator/pipeline/PipelineFactory.js b/extensions/oplogPopulator/pipeline/PipelineFactory.js index 0dd2480d9..140f28386 100644 --- a/extensions/oplogPopulator/pipeline/PipelineFactory.js +++ b/extensions/oplogPopulator/pipeline/PipelineFactory.js @@ -70,6 +70,21 @@ class PipelineFactory { } const pipeline = [ stage, + // Synthesise a top-level 'key' from fullDocument.value.key + // (insert/replace) or updateDescription.updatedFields.value.key + // (update). Relies on object MD writes always $set-ing the whole + // 'value' subdocument; a partial dotted $set would mis-partition. + // See BB-768 (superseded SMT alternative in PR #2741). + { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }, ]; if (this._locationStrippingBytesThreshold > 0) { pipeline.push({ diff --git a/lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js b/lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js new file mode 100644 index 000000000..c00d14dd5 --- /dev/null +++ b/lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics.js @@ -0,0 +1,38 @@ +const { ZenkoMetrics } = require('arsenal').metrics; + +// Counts consumed oplog events that downstream queue populators will +// process but that lack a top-level 'key' field — i.e., the synthesised +// partitioning key produced by the connector pipeline (see BB-768). +// Should stay at zero in steady-state; a non-zero rate signals that +// metadata grew a write path that doesn't $set the whole 'value' +// subdocument, regressing the per-object partitioning fix. +const oplogEventMissingKey = ZenkoMetrics.createCounter({ + name: 's3_oplog_event_missing_key_total', + help: 'Total number of oplog events processed by queue populators ' + + 'with the synthesised top-level "key" field missing or null', + labelNames: ['opType'], +}); + +class KafkaLogConsumerMetrics { + static onMissingKey(log, opType) { + try { + oplogEventMissingKey.inc({ + opType: opType || 'unknown', + }); + } catch (err) { + KafkaLogConsumerMetrics.handleError( + log, err, 'KafkaLogConsumerMetrics.onMissingKey'); + } + } + + static handleError(log, err, method) { + if (log && log.error) { + log.error('failed to update prometheus metrics', { + method, + error: err.message, + }); + } + } +} + +module.exports = KafkaLogConsumerMetrics; diff --git a/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js b/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js index bc5e65959..aef757212 100644 --- a/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js +++ b/lib/queuePopulator/KafkaLogConsumer/ListRecordStream.js @@ -1,4 +1,5 @@ const stream = require('stream'); +const KafkaLogConsumerMetrics = require('./KafkaLogConsumerMetrics'); class ListRecordStream extends stream.Transform { /** @@ -122,6 +123,10 @@ class ListRecordStream extends stream.Transform { // skipping empty events return callback(null, null); } + if (changeStreamDocument.key === null || changeStreamDocument.key === undefined) { + KafkaLogConsumerMetrics.onMissingKey( + this._logger, changeStreamDocument.operationType); + } const opType = this._getType(changeStreamDocument.operationType, objectMd); const streamObject = { // timestamp of the kafka message diff --git a/tests/functional/oplogPopulator/PipelineFactory.js b/tests/functional/oplogPopulator/PipelineFactory.js index 5fa35b2a6..3f014cdfb 100644 --- a/tests/functional/oplogPopulator/PipelineFactory.js +++ b/tests/functional/oplogPopulator/PipelineFactory.js @@ -19,6 +19,7 @@ describe('PipelineFactory', function () { const collectionName = 'test-pipeline-stripping'; let collection; let setStage; + let addFieldsStage; before(async () => { await client.connect(); @@ -26,7 +27,8 @@ describe('PipelineFactory', function () { const factory = new MultipleBucketsPipelineFactory(THRESHOLD); const pipeline = JSON.parse(factory.getPipeline(['test-bucket'])); - setStage = pipeline[1]; + addFieldsStage = pipeline[1]; + setStage = pipeline[2]; }); afterEach(async () => { @@ -147,4 +149,45 @@ describe('PipelineFactory', function () { ); }); }); + + describe('key synthesis ($addFields)', () => { + it('should populate key from fullDocument.value.key on insert-shaped docs', async () => { + const doc = { fullDocument: { value: { key: 'my/object' } } }; + await collection.insertOne(doc); + const results = await collection.aggregate([addFieldsStage]).toArray(); + assert.strictEqual(results.length, 1); + assert.strictEqual(results[0].key, 'my/object'); + }); + + it('should populate key from updateDescription.updatedFields.value.key on update-shaped docs', async () => { + const doc = { updateDescription: { updatedFields: { value: { key: 'my/object' } } } }; + await collection.insertOne(doc); + const results = await collection.aggregate([addFieldsStage]).toArray(); + assert.strictEqual(results.length, 1); + assert.strictEqual(results[0].key, 'my/object'); + }); + + it('should prefer fullDocument.value.key when both are present', async () => { + const doc = { + fullDocument: { value: { key: 'from-full' } }, + updateDescription: { updatedFields: { value: { key: 'from-update' } } }, + }; + await collection.insertOne(doc); + const results = await collection.aggregate([addFieldsStage]).toArray(); + assert.strictEqual(results[0].key, 'from-full'); + }); + + it('should leave key absent when neither path is populated', async () => { + // $ifNull returns missing (not null) when all inputs are missing. + // The connector's nullable Avro key schema emits this as null on + // the wire, so the partition outcome is the same as an explicit + // null. In production deletes are ignored, so this case doesn't + // occur for consumed events. + const doc = { fullDocument: null }; + await collection.insertOne(doc); + const results = await collection.aggregate([addFieldsStage]).toArray(); + assert.strictEqual(results.length, 1); + assert.strictEqual(results[0].key, undefined); + }); + }); }); diff --git a/tests/unit/lib/queuePopulator/kafkaLogConsumer/KafkaLogConsumerMetrics.js b/tests/unit/lib/queuePopulator/kafkaLogConsumer/KafkaLogConsumerMetrics.js new file mode 100644 index 000000000..7fde70998 --- /dev/null +++ b/tests/unit/lib/queuePopulator/kafkaLogConsumer/KafkaLogConsumerMetrics.js @@ -0,0 +1,41 @@ +const assert = require('assert'); +const sinon = require('sinon'); +const { ZenkoMetrics } = require('arsenal').metrics; +const KafkaLogConsumerMetrics = + require('../../../../../lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics'); + +describe('KafkaLogConsumerMetrics', () => { + const log = { error: sinon.stub() }; + + afterEach(() => { + sinon.restore(); + log.error.resetHistory(); + }); + + describe('onMissingKey', () => { + it('should increment the s3_oplog_event_missing_key_total counter', () => { + const metric = ZenkoMetrics.getMetric('s3_oplog_event_missing_key_total'); + const incStub = sinon.stub(metric, 'inc'); + KafkaLogConsumerMetrics.onMissingKey(log, 'update'); + assert(incStub.calledOnceWith({ opType: 'update' })); + assert(log.error.notCalled); + }); + + it('should label as "unknown" when opType is missing', () => { + const metric = ZenkoMetrics.getMetric('s3_oplog_event_missing_key_total'); + const incStub = sinon.stub(metric, 'inc'); + KafkaLogConsumerMetrics.onMissingKey(log, undefined); + assert(incStub.calledOnceWith({ opType: 'unknown' })); + }); + + it('should swallow + log errors from inc', () => { + const metric = ZenkoMetrics.getMetric('s3_oplog_event_missing_key_total'); + sinon.stub(metric, 'inc').throws(new Error('boom')); + assert.doesNotThrow(() => KafkaLogConsumerMetrics.onMissingKey(log, 'insert')); + assert(log.error.calledOnce); + assert(log.error.calledWithMatch('failed to update prometheus metrics', { + method: 'KafkaLogConsumerMetrics.onMissingKey', + })); + }); + }); +}); diff --git a/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js b/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js index 5cebb2042..eef7194a6 100644 --- a/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js +++ b/tests/unit/lib/queuePopulator/kafkaLogConsumer/ListRecordStream.js @@ -1,8 +1,11 @@ const assert = require('assert'); +const sinon = require('sinon'); const werelogs = require('werelogs'); const logger = new werelogs.Logger('ListRecordStream'); const ListRecordStream = require('../../../../../lib/queuePopulator/KafkaLogConsumer/ListRecordStream'); +const KafkaLogConsumerMetrics = + require('../../../../../lib/queuePopulator/KafkaLogConsumer/KafkaLogConsumerMetrics'); const changeStreamDocument = { ns: { @@ -159,6 +162,32 @@ describe('ListRecordStream', () => { }); }); + it('should call onMissingKey when key is absent on a processed event', done => { + // changeStreamDocument has no top-level 'key' field — emulates an + // event that slipped past the $addFields stage. Should still be + // processed (objectMd is present) but the metric helper should fire. + const stub = sinon.stub(KafkaLogConsumerMetrics, 'onMissingKey'); + const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument)); + listRecordStream.write(kafkaMessage); + listRecordStream.once('data', () => { + assert(stub.calledOnceWith(logger, 'insert')); + stub.restore(); + return done(); + }); + }); + + it('should NOT call onMissingKey when key is present', done => { + const stub = sinon.stub(KafkaLogConsumerMetrics, 'onMissingKey'); + const doc = { ...changeStreamDocument, key: 'example-key' }; + const kafkaMessage = getKafkaMessage(JSON.stringify(doc)); + listRecordStream.write(kafkaMessage); + listRecordStream.once('data', () => { + assert(stub.notCalled); + stub.restore(); + return done(); + }); + }); + it('should skip empty record', done => { const kafkaMessage = getKafkaMessage(JSON.stringify(changeStreamDocument)); const EmptyKafkaMessage = getKafkaMessage('{}'); diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 6c81e8666..53835d693 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -44,22 +44,8 @@ const connectorConfig = { }], }, 'null'], }, { - name: 'fullDocument', - type: [{ - type: 'record', - name: 'fullDocumentRecord', - fields: [{ - name: 'value', - type: [{ - type: 'record', - name: 'valueRecord', - fields: [{ - name: 'key', - type: ['string', 'null'], - }], - }, 'null'], - }], - }, 'null'], + name: 'key', + type: ['string', 'null'], }], }), 'heartbeat.interval.ms': 10000, diff --git a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js index 40d9b0012..836a28875 100644 --- a/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/MultipleBucketsPipelineFactory.js @@ -46,21 +46,31 @@ describe('MultipleBucketsPipelineFactory', () => { assert.strictEqual(result, '[]'); }); - it('should return the pipeline with buckets and location stripping', () => { + it('should return the pipeline with buckets, key synthesis and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = multipleBucketsPipelineFactory.getPipeline(buckets); const pipeline = JSON.parse(result); - assert.strictEqual(pipeline.length, 2); + assert.strictEqual(pipeline.length, 3); assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}}); - assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], { + assert.deepStrictEqual(pipeline[1], { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }); + assert.deepStrictEqual(pipeline[2].$set['fullDocument.value.location'], { $cond: { if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] }, then: '$$REMOVE', else: '$fullDocument.value.location', }, }); - assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], { + assert.deepStrictEqual(pipeline[2].$set['updateDescription.updatedFields.value.location'], { $cond: { if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] }, then: '$$REMOVE', @@ -68,6 +78,23 @@ describe('MultipleBucketsPipelineFactory', () => { }, }); }); + + it('should return the pipeline with key synthesis when location stripping is disabled', () => { + const factory = new MultipleBucketsPipelineFactory(0); + const pipeline = JSON.parse(factory.getPipeline(['bucket1'])); + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1']}}}); + assert.deepStrictEqual(pipeline[1], { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }); + }); }); describe('getOldConnectorBucketList', () => { diff --git a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js index 34d849561..22fdf6003 100644 --- a/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js +++ b/tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js @@ -33,21 +33,31 @@ describe('WildcardPipelineFactory', () => { }); describe('getPipeline', () => { - it('should return the pipeline with buckets and location stripping', () => { + it('should return the pipeline with buckets, key synthesis and location stripping', () => { const buckets = ['bucket1', 'bucket2']; const result = wildcardPipelineFactory.getPipeline(buckets); const pipeline = JSON.parse(result); - assert.strictEqual(pipeline.length, 2); + assert.strictEqual(pipeline.length, 3); assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}}); - assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], { + assert.deepStrictEqual(pipeline[1], { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }); + assert.deepStrictEqual(pipeline[2].$set['fullDocument.value.location'], { $cond: { if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] }, then: '$$REMOVE', else: '$fullDocument.value.location', }, }); - assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], { + assert.deepStrictEqual(pipeline[2].$set['updateDescription.updatedFields.value.location'], { $cond: { if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] }, then: '$$REMOVE', @@ -56,14 +66,24 @@ describe('WildcardPipelineFactory', () => { }); }); - it('should return the pipeline with buckets and no location stripping if disabled', () => { + it('should return the pipeline with key synthesis and no location stripping if disabled', () => { const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(0); const buckets = ['bucket1', 'bucket2']; const result = wildcardPipelineFactoryNoStripping.getPipeline(buckets); const pipeline = JSON.parse(result); - assert.strictEqual(pipeline.length, 1); + assert.strictEqual(pipeline.length, 2); + assert.deepStrictEqual(pipeline[1], { + $addFields: { + key: { + $ifNull: [ + '$fullDocument.value.key', + '$updateDescription.updatedFields.value.key', + ], + }, + }, + }); }); });