Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extensions/oplogPopulator/OplogPopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ class OplogPopulator {
cronRule: this._config.connectorsUpdateCronRule,
prefix: this._config.prefix,
heartbeatIntervalMs: this._config.heartbeatIntervalMs,
transformObjectKey: this._config.transformObjectKey,
kafkaConnectHost: this._config.kafkaConnectHost,
kafkaConnectPort: this._config.kafkaConnectPort,
metricsHandler: this._metricsHandler,
Expand Down
4 changes: 4 additions & 0 deletions extensions/oplogPopulator/OplogPopulatorConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ const joiSchema = joi.object({
probeServer: probeServerJoi.default(),
connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'),
heartbeatIntervalMs: joi.number().default(10000),
// When true, oplog source connectors use the TransformObjectKey SMT to
// key messages by the raw S3 object key (BB-768). Enable only once the
// Kafka Connect image ships the TransformObjectKey plugin.
transformObjectKey: joi.boolean().default(false),
});

function configValidator(backbeatConfig, extConfig) {
Expand Down
145 changes: 94 additions & 51 deletions extensions/oplogPopulator/constants.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,94 @@
const transformObjectKeyClass =
'com.scality.kafka.connect.transforms.TransformObjectKey';

const defaultConnectorConfig = {
'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector',
'pipeline': '[]',
'collection': '',
// If no timestamp is provided, the startup mode will be equivalent
// to 'latest' which will pick up from the latest event in the oplog
'startup.mode': 'timestamp',
// JSON output converter config
// Using a string converter to avoid getting an over-stringified
// JSON that is returned by default
'output.format.value': 'json',
'value.converter.schemas.enable': false,
'value.converter': 'org.apache.kafka.connect.storage.StringConverter',
// Kafka message key config (legacy).
// The key schema projects {ns.coll, fullDocument.value.key}. fullDocument
// is null on update/delete events (BB-355 removed updateLookup), so for
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.f. https://scality.atlassian.net/browse/BB-768?focusedCommentId=477005 :

  • we always pass the full document (AFAIK) on update, so the key should be available (but maybe not in "fullDocument")
  • the delete event is not used AFAIK (we generate an update with the previous document right before it), and may be dropped instead

→ is there a simpler (as in "less maitenance") fix?

// those op types the key collapses to hash({ns, null}) and lands on a
// different partition than insert events for the same S3 object. See
// BB-768. This is overridden by smtKeyConfig when the oplogPopulator
// 'transformObjectKey' flag is enabled (and the TransformObjectKey SMT
// is available in the Kafka Connect plugin path).
'output.format.key': 'schema',
'output.schema.key': JSON.stringify({
type: 'record',
name: 'keySchema',
fields: [{
name: 'ns',
type: [{
name: 'ns',
type: 'record',
fields: [{
name: 'coll',
type: ['string', 'null'],
}],
}, 'null'],
}, {
name: 'fullDocument',
type: [{
type: 'record',
name: 'fullDocumentRecord',
fields: [{
name: 'value',
type: [{
type: 'record',
name: 'valueRecord',
fields: [{
name: 'key',
type: ['string', 'null'],
}],
}, 'null'],
}],
}, 'null'],
}],
}),
};

// Key-config overrides applied on top of defaultConnectorConfig when the
// 'transformObjectKey' flag is set. Projects documentKey._id (always
// populated on every change-stream event) and adds the TransformObjectKey
// SMT, which strips the arsenal master/version encoding so master and all
// versions of the same S3 object hash to the same partition. The SMT
// rewrites the projected Struct into a plain string, serialised by
// key.converter=StringConverter.
//
// The keys here that are absent from defaultConnectorConfig (transforms*,
// key.converter) are the ones to drop when reverting a connector to the
// legacy schema; output.schema.key is restored from defaultConnectorConfig.
const smtKeyConfig = {
'output.schema.key': JSON.stringify({
type: 'record',
name: 'keySchema',
fields: [{
name: 'documentKey',
type: [{
type: 'record',
name: 'documentKeyRecord',
fields: [{
name: '_id',
type: ['string', 'null'],
}],
}, 'null'],
}],
}),
'key.converter': 'org.apache.kafka.connect.storage.StringConverter',
'transforms': 'stripObjectKey',
'transforms.stripObjectKey.type': transformObjectKeyClass,
};

const constants = {
bucketMetastore: '__metastore',
defaultConnectorName: 'source-connector',
Expand All @@ -10,57 +101,9 @@ const constants = {
connectorUpdatedEvent: 'connector-updated',
bucketRemovedFromConnectorEvent: 'bucket-removed',
connectorsReconciledEvent: 'connectors-reconciled',
defaultConnectorConfig: {
'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector',
'pipeline': '[]',
'collection': '',
// If no timestamp is provided, the startup mode will be equivalent
// to 'latest' which will pick up from the latest event in the oplog
'startup.mode': 'timestamp',
// JSON output converter config
// Using a string converter to avoid getting an over-stringified
// JSON that is returned by default
'output.format.value': 'json',
'value.converter.schemas.enable': false,
'value.converter': 'org.apache.kafka.connect.storage.StringConverter',
// Kafka message key config
// The message key is set to only contain the bucket where the event happend.
// This will make events of the same bucket always land in the same partition
// as they will have the same key
'output.format.key': 'schema',
'output.schema.key': JSON.stringify({
type: 'record',
name: 'keySchema',
fields: [{
name: 'ns',
type: [{
name: 'ns',
type: 'record',
fields: [{
name: 'coll',
type: ['string', 'null'],
}],
}, 'null'],
}, {
name: 'fullDocument',
type: [{
type: 'record',
name: 'fullDocumentRecord',
fields: [{
name: 'value',
type: [{
type: 'record',
name: 'valueRecord',
fields: [{
name: 'key',
type: ['string', 'null'],
}],
}, 'null'],
}],
}, 'null'],
}],
}),
},
transformObjectKeyClass,
defaultConnectorConfig,
smtKeyConfig,
};

module.exports = constants;
27 changes: 24 additions & 3 deletions extensions/oplogPopulator/modules/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const paramsJoi = joi.object({
cronRule: joi.string().required(),
prefix: joi.string(),
heartbeatIntervalMs: joi.number().required(),
transformObjectKey: joi.boolean().default(false),
kafkaConnectHost: joi.string().required(),
kafkaConnectPort: joi.number().required(),
metricsHandler: joi.object()
Expand Down Expand Up @@ -81,6 +82,11 @@ class ConnectorsManager extends EventEmitter {
this._oldConnectors = [];
this._allocationStrategy = params.allocationStrategy;
this._pipelineFactory = params.pipelineFactory;
// When true, connectors are configured with the TransformObjectKey
// SMT so the oplog message key is the raw S3 object key. Enabled via
// the oplogPopulator 'transformObjectKey' config flag, set by the
// operator once Kafka Connect ships the TransformObjectKey plugin.
this._transformObjectKey = params.transformObjectKey || false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaulting is not needed here, already handling by joi ?

}

/**
Expand All @@ -102,6 +108,7 @@ class ConnectorsManager extends EventEmitter {
};
return {
...constants.defaultConnectorConfig,
...(this._transformObjectKey ? constants.smtKeyConfig : {}),
...connectorConfig
Comment thread
delthas marked this conversation as resolved.
};
}
Expand Down Expand Up @@ -165,12 +172,26 @@ class ConnectorsManager extends EventEmitter {
}
// generating a new config as the old config can be outdated (wrong topic for example)
const config = this._getDefaultConnectorConfiguration(connectorName);
// update existing connector config while leaving in fields that were
// added manually like 'offset.topic.name'
const mergedConfig = { ...oldConfig, ...config };
// When the SMT is disabled, scrub any SMT-only keys that may
// linger in oldConfig from a previous run with it enabled.
// Otherwise the connector keeps referencing the missing
// TransformObjectKey class and fails to start. The SMT-only
// keys are those in smtKeyConfig that the legacy base does not
// also define (output.schema.key is restored, not removed).
if (!this._transformObjectKey) {
for (const k of Object.keys(constants.smtKeyConfig)) {
if (!(k in constants.defaultConnectorConfig)) {
delete mergedConfig[k];
}
}
}
// initializing connector
const connector = new Connector({
name: connectorName,
// update existing connector config while leaving in fields that were
// added manually like 'offset.topic.name'
config: { ...oldConfig, ...config },
config: mergedConfig,
buckets,
getPipeline: this._pipelineFactory.getPipeline,
isRunning: true,
Expand Down
88 changes: 83 additions & 5 deletions tests/unit/oplogPopulator/ConnectorsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ const connectorConfig = {
'heartbeat.interval.ms': 10000,
};

const connectorConfigSMT = {
...connectorConfig,
'output.schema.key': JSON.stringify({
type: 'record',
name: 'keySchema',
fields: [{
name: 'documentKey',
type: [{
type: 'record',
name: 'documentKeyRecord',
fields: [{
name: '_id',
type: ['string', 'null'],
}],
}, 'null'],
}],
}),
'key.converter': 'org.apache.kafka.connect.storage.StringConverter',
'transforms': 'stripObjectKey',
'transforms.stripObjectKey.type': constants.transformObjectKeyClass,
};

describe('ConnectorsManager', () => {
let connectorsManager;
let connector1;
Expand Down Expand Up @@ -121,11 +143,18 @@ describe('ConnectorsManager', () => {
});

describe('_getDefaultConnectorConfiguration', () => {
it('should return default configuration', () => {
it('should return legacy configuration when transformObjectKey is disabled', () => {
const config = connectorsManager._getDefaultConnectorConfiguration(
'source-connector');
assert.deepEqual(config, connectorConfig);
});

it('should return SMT configuration when transformObjectKey is enabled', () => {
connectorsManager._transformObjectKey = true;
const config = connectorsManager._getDefaultConnectorConfiguration(
'source-connector');
assert.deepEqual(config, connectorConfigSMT);
});
});

describe('_generateConnectorName', () => {
Expand Down Expand Up @@ -186,6 +215,52 @@ describe('ConnectorsManager', () => {
assert.strictEqual(connectors[0].isRunning, true);
});

it('should strip stale SMT keys from oldConfig when the SMT is unavailable', async () => {
const config = {
...connectorConfig,
// Simulate a previous run where the SMT was active: the
// stored connector config still references the SMT class +
// converter. We expect _processOldConnectors to drop those
// keys when transformObjectKey is now disabled.
'transforms': 'stripObjectKey',
'transforms.stripObjectKey.type': constants.transformObjectKeyClass,
'key.converter': 'org.apache.kafka.connect.storage.StringConverter',
};
sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig')
.resolves(config);
sinon.stub(connectorsManager._pipelineFactory, 'getOldConnectorBucketList')
.returns(['bucket1']);
sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector');
connectorsManager._transformObjectKey = false;
const connectors = await connectorsManager._processOldConnectors(['source-connector']);
assert.strictEqual(connectors.length, 1);
assert.strictEqual(connectors[0].config['transforms'], undefined);
assert.strictEqual(connectors[0].config['transforms.stripObjectKey.type'], undefined);
assert.strictEqual(connectors[0].config['key.converter'], undefined);
// legacy key schema is restored, not dropped
assert.strictEqual(connectors[0].config['output.schema.key'],
constants.defaultConnectorConfig['output.schema.key']);
});

it('should keep SMT keys on oldConfig when transformObjectKey is enabled', async () => {
const config = {
...connectorConfig,
'transforms': 'stripObjectKey',
'transforms.stripObjectKey.type': constants.transformObjectKeyClass,
'key.converter': 'org.apache.kafka.connect.storage.StringConverter',
};
sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig')
.resolves(config);
sinon.stub(connectorsManager._pipelineFactory, 'getOldConnectorBucketList')
.returns(['bucket1']);
sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector');
connectorsManager._transformObjectKey = true;
const connectors = await connectorsManager._processOldConnectors(['source-connector']);
assert.strictEqual(connectors[0].config['transforms'], 'stripObjectKey');
assert.strictEqual(connectors[0].config['transforms.stripObjectKey.type'],
constants.transformObjectKeyClass);
});

it('should warn when the number of retrieved bucket in a connector exceeds the limit', async () => {
const config = { ...connectorConfig };
config['topic.namespace.map'] = 'outdated-topic';
Expand Down Expand Up @@ -455,13 +530,16 @@ describe('ConnectorsManager', () => {
sinon.restore();
});

it('should schedule connector updates', () => {
const updateConnectorsStub = sinon.stub(connectorsManager, '_updateConnectors');
it('should schedule connector updates', async () => {
const updateConnectorsStub = sinon.stub(connectorsManager, '_updateConnectors')
.resolves();
let scheduledCb;
sinon.stub(schedule, 'scheduleJob').callsFake((rule, cb) => {
cb();
scheduledCb = cb;
});
connectorsManager.scheduleConnectorUpdates();
assert(updateConnectorsStub.called);
await scheduledCb();
assert(updateConnectorsStub.calledOnce);
});
});
});
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,29 @@ describe('OplogPopulatorConfigValidator', () => {
assert.strictEqual(result.value.locationStrippingBytesThreshold, 100 * 1000000);
});
});

describe('transformObjectKey validation', () => {
it('should default to false when not provided', () => {
const result = OplogPopulatorConfigJoiSchema.validate(defaultConfig);
assert.ifError(result.error);
assert.strictEqual(result.value.transformObjectKey, false);
});

it('should accept an explicit boolean', () => {
const result = OplogPopulatorConfigJoiSchema.validate({
...defaultConfig,
transformObjectKey: true,
});
assert.ifError(result.error);
assert.strictEqual(result.value.transformObjectKey, true);
});

it('should reject a non-boolean', () => {
const result = OplogPopulatorConfigJoiSchema.validate({
...defaultConfig,
transformObjectKey: 'yes',
});
assert(result.error);
});
});
});
Loading