Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ const constants = {
'output.format.value': 'json',
'value.converter.schemas.enable': false,
'value.converter': 'org.apache.kafka.connect.storage.StringConverter',
// Kafka message key config
// The message key is set to only contain the bucket where the event happend.
// This will make events of the same bucket always land in the same partition
// as they will have the same key
'output.format.key': 'schema',
'output.schema.key': JSON.stringify({
type: 'record',
Expand All @@ -42,22 +38,8 @@ const constants = {
}],
}, 'null'],
}, {
name: 'fullDocument',
type: [{
type: 'record',
name: 'fullDocumentRecord',
fields: [{
name: 'value',
type: [{
type: 'record',
name: 'valueRecord',
fields: [{
name: 'key',
type: ['string', 'null'],
}],
}, 'null'],
}],
}, 'null'],
name: 'key',
type: ['string', 'null'],
Comment thread
delthas marked this conversation as resolved.
}],
}),
},
Expand Down
15 changes: 15 additions & 0 deletions extensions/oplogPopulator/pipeline/PipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ class PipelineFactory {
}
const pipeline = [
stage,
// Synthesise a top-level 'key' from fullDocument.value.key
// (insert/replace) or updateDescription.updatedFields.value.key
// (update). Relies on object MD writes always $set-ing the whole
// 'value' subdocument; a partial dotted $set would mis-partition.
// See BB-768 (superseded SMT alternative in PR #2741).
{
$addFields: {
key: {
$ifNull: [
'$fullDocument.value.key',
'$updateDescription.updatedFields.value.key',
],
},
},
},
];
if (this._locationStrippingBytesThreshold > 0) {
pipeline.push({
Expand Down
45 changes: 44 additions & 1 deletion tests/functional/oplogPopulator/PipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ describe('PipelineFactory', function () {
const collectionName = 'test-pipeline-stripping';
let collection;
let setStage;
let addFieldsStage;

before(async () => {
await client.connect();
collection = db.collection(collectionName);

const factory = new MultipleBucketsPipelineFactory(THRESHOLD);
const pipeline = JSON.parse(factory.getPipeline(['test-bucket']));
setStage = pipeline[1];
addFieldsStage = pipeline[1];
setStage = pipeline[2];
});

afterEach(async () => {
Expand Down Expand Up @@ -147,4 +149,45 @@ describe('PipelineFactory', function () {
);
});
});

describe('key synthesis ($addFields)', () => {
it('should populate key from fullDocument.value.key on insert-shaped docs', async () => {
const doc = { fullDocument: { value: { key: 'my/object' } } };
await collection.insertOne(doc);
const results = await collection.aggregate([addFieldsStage]).toArray();
assert.strictEqual(results.length, 1);
assert.strictEqual(results[0].key, 'my/object');
});

it('should populate key from updateDescription.updatedFields.value.key on update-shaped docs', async () => {
const doc = { updateDescription: { updatedFields: { value: { key: 'my/object' } } } };
await collection.insertOne(doc);
const results = await collection.aggregate([addFieldsStage]).toArray();
assert.strictEqual(results.length, 1);
assert.strictEqual(results[0].key, 'my/object');
});

it('should prefer fullDocument.value.key when both are present', async () => {
const doc = {
fullDocument: { value: { key: 'from-full' } },
updateDescription: { updatedFields: { value: { key: 'from-update' } } },
};
await collection.insertOne(doc);
const results = await collection.aggregate([addFieldsStage]).toArray();
assert.strictEqual(results[0].key, 'from-full');
});

it('should leave key absent when neither path is populated', async () => {
// $ifNull returns missing (not null) when all inputs are missing.
// The connector's nullable Avro key schema emits this as null on
// the wire, so the partition outcome is the same as an explicit
// null. In production deletes are ignored, so this case doesn't
// occur for consumed events.
const doc = { fullDocument: null };
await collection.insertOne(doc);
const results = await collection.aggregate([addFieldsStage]).toArray();
assert.strictEqual(results.length, 1);
assert.strictEqual(results[0].key, undefined);
});
});
});
18 changes: 2 additions & 16 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,8 @@ const connectorConfig = {
}],
}, 'null'],
}, {
name: 'fullDocument',
type: [{
type: 'record',
name: 'fullDocumentRecord',
fields: [{
name: 'value',
type: [{
type: 'record',
name: 'valueRecord',
fields: [{
name: 'key',
type: ['string', 'null'],
}],
}, 'null'],
}],
}, 'null'],
name: 'key',
type: ['string', 'null'],
}],
}),
'heartbeat.interval.ms': 10000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,55 @@ describe('MultipleBucketsPipelineFactory', () => {
assert.strictEqual(result, '[]');
});

it('should return the pipeline with buckets and location stripping', () => {
it('should return the pipeline with buckets, key synthesis and location stripping', () => {
const buckets = ['bucket1', 'bucket2'];
const result = multipleBucketsPipelineFactory.getPipeline(buckets);
const pipeline = JSON.parse(result);

assert.strictEqual(pipeline.length, 2);
assert.strictEqual(pipeline.length, 3);
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1','bucket2']}}});
assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], {
assert.deepStrictEqual(pipeline[1], {
$addFields: {
key: {
$ifNull: [
'$fullDocument.value.key',
'$updateDescription.updatedFields.value.key',
],
},
},
});
assert.deepStrictEqual(pipeline[2].$set['fullDocument.value.location'], {
$cond: {
if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] },
then: '$$REMOVE',
else: '$fullDocument.value.location',
},
});
assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], {
assert.deepStrictEqual(pipeline[2].$set['updateDescription.updatedFields.value.location'], {
$cond: {
if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] },
then: '$$REMOVE',
else: '$updateDescription.updatedFields.value.location',
},
});
});

it('should return the pipeline with key synthesis when location stripping is disabled', () => {
const factory = new MultipleBucketsPipelineFactory(0);
const pipeline = JSON.parse(factory.getPipeline(['bucket1']));
assert.strictEqual(pipeline.length, 2);
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$in:['bucket1']}}});
assert.deepStrictEqual(pipeline[1], {
$addFields: {
key: {
$ifNull: [
'$fullDocument.value.key',
'$updateDescription.updatedFields.value.key',
],
},
},
});
});
});

describe('getOldConnectorBucketList', () => {
Expand Down
32 changes: 26 additions & 6 deletions tests/unit/oplogPopulator/pipeline/WildcardPipelineFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,31 @@ describe('WildcardPipelineFactory', () => {
});

describe('getPipeline', () => {
it('should return the pipeline with buckets and location stripping', () => {
it('should return the pipeline with buckets, key synthesis and location stripping', () => {
const buckets = ['bucket1', 'bucket2'];
const result = wildcardPipelineFactory.getPipeline(buckets);
const pipeline = JSON.parse(result);

assert.strictEqual(pipeline.length, 2);
assert.strictEqual(pipeline.length, 3);
assert.deepStrictEqual(pipeline[0], {$match:{'ns.coll':{$not:{$regex:'^(mpuShadowBucket|__).*'}}}});
assert.deepStrictEqual(pipeline[1].$set['fullDocument.value.location'], {
assert.deepStrictEqual(pipeline[1], {
$addFields: {
key: {
$ifNull: [
'$fullDocument.value.key',
'$updateDescription.updatedFields.value.key',
],
},
},
});
assert.deepStrictEqual(pipeline[2].$set['fullDocument.value.location'], {
$cond: {
if: { $gte: ['$fullDocument.value.content-length', thresholdBytes] },
then: '$$REMOVE',
else: '$fullDocument.value.location',
},
});
assert.deepStrictEqual(pipeline[1].$set['updateDescription.updatedFields.value.location'], {
assert.deepStrictEqual(pipeline[2].$set['updateDescription.updatedFields.value.location'], {
$cond: {
if: { $gte: ['$updateDescription.updatedFields.value.content-length', thresholdBytes] },
then: '$$REMOVE',
Expand All @@ -56,14 +66,24 @@ describe('WildcardPipelineFactory', () => {
});
});

it('should return the pipeline with buckets and no location stripping if disabled', () => {
it('should return the pipeline with key synthesis and no location stripping if disabled', () => {
const wildcardPipelineFactoryNoStripping = new WildcardPipelineFactory(0);

const buckets = ['bucket1', 'bucket2'];
const result = wildcardPipelineFactoryNoStripping.getPipeline(buckets);
const pipeline = JSON.parse(result);

assert.strictEqual(pipeline.length, 1);
assert.strictEqual(pipeline.length, 2);
assert.deepStrictEqual(pipeline[1], {
$addFields: {
key: {
$ifNull: [
'$fullDocument.value.key',
'$updateDescription.updatedFields.value.key',
],
},
},
});
});
});

Expand Down
Loading