diff --git a/extensions/oplogPopulator/OplogPopulator.js b/extensions/oplogPopulator/OplogPopulator.js index b7b4d0305..0713e1152 100644 --- a/extensions/oplogPopulator/OplogPopulator.js +++ b/extensions/oplogPopulator/OplogPopulator.js @@ -277,6 +277,7 @@ class OplogPopulator { cronRule: this._config.connectorsUpdateCronRule, prefix: this._config.prefix, heartbeatIntervalMs: this._config.heartbeatIntervalMs, + transformObjectKey: this._config.transformObjectKey, kafkaConnectHost: this._config.kafkaConnectHost, kafkaConnectPort: this._config.kafkaConnectPort, metricsHandler: this._metricsHandler, diff --git a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js index c65d5dbee..a2cf43ddd 100644 --- a/extensions/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/extensions/oplogPopulator/OplogPopulatorConfigValidator.js @@ -11,6 +11,10 @@ const joiSchema = joi.object({ probeServer: probeServerJoi.default(), connectorsUpdateCronRule: joi.string().default('*/1 * * * * *'), heartbeatIntervalMs: joi.number().default(10000), + // When true, oplog source connectors use the TransformObjectKey SMT to + // key messages by the raw S3 object key (BB-768). Enable only once the + // Kafka Connect image ships the TransformObjectKey plugin. + transformObjectKey: joi.boolean().default(false), }); function configValidator(backbeatConfig, extConfig) { diff --git a/extensions/oplogPopulator/constants.js b/extensions/oplogPopulator/constants.js index 63b3aeb9b..4918ac5eb 100644 --- a/extensions/oplogPopulator/constants.js +++ b/extensions/oplogPopulator/constants.js @@ -1,3 +1,94 @@ +const transformObjectKeyClass = + 'com.scality.kafka.connect.transforms.TransformObjectKey'; + +const defaultConnectorConfig = { + 'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector', + 'pipeline': '[]', + 'collection': '', + // If no timestamp is provided, the startup mode will be equivalent + // to 'latest' which will pick up from the latest event in the oplog + 'startup.mode': 'timestamp', + // JSON output converter config + // Using a string converter to avoid getting an over-stringified + // JSON that is returned by default + 'output.format.value': 'json', + 'value.converter.schemas.enable': false, + 'value.converter': 'org.apache.kafka.connect.storage.StringConverter', + // Kafka message key config (legacy). + // The key schema projects {ns.coll, fullDocument.value.key}. fullDocument + // is null on update/delete events (BB-355 removed updateLookup), so for + // those op types the key collapses to hash({ns, null}) and lands on a + // different partition than insert events for the same S3 object. See + // BB-768. This is overridden by smtKeyConfig when the oplogPopulator + // 'transformObjectKey' flag is enabled (and the TransformObjectKey SMT + // is available in the Kafka Connect plugin path). + 'output.format.key': 'schema', + 'output.schema.key': JSON.stringify({ + type: 'record', + name: 'keySchema', + fields: [{ + name: 'ns', + type: [{ + name: 'ns', + type: 'record', + fields: [{ + name: 'coll', + type: ['string', 'null'], + }], + }, 'null'], + }, { + name: 'fullDocument', + type: [{ + type: 'record', + name: 'fullDocumentRecord', + fields: [{ + name: 'value', + type: [{ + type: 'record', + name: 'valueRecord', + fields: [{ + name: 'key', + type: ['string', 'null'], + }], + }, 'null'], + }], + }, 'null'], + }], + }), +}; + +// Key-config overrides applied on top of defaultConnectorConfig when the +// 'transformObjectKey' flag is set. Projects documentKey._id (always +// populated on every change-stream event) and adds the TransformObjectKey +// SMT, which strips the arsenal master/version encoding so master and all +// versions of the same S3 object hash to the same partition. The SMT +// rewrites the projected Struct into a plain string, serialised by +// key.converter=StringConverter. +// +// The keys here that are absent from defaultConnectorConfig (transforms*, +// key.converter) are the ones to drop when reverting a connector to the +// legacy schema; output.schema.key is restored from defaultConnectorConfig. +const smtKeyConfig = { + 'output.schema.key': JSON.stringify({ + type: 'record', + name: 'keySchema', + fields: [{ + name: 'documentKey', + type: [{ + type: 'record', + name: 'documentKeyRecord', + fields: [{ + name: '_id', + type: ['string', 'null'], + }], + }, 'null'], + }], + }), + 'key.converter': 'org.apache.kafka.connect.storage.StringConverter', + 'transforms': 'stripObjectKey', + 'transforms.stripObjectKey.type': transformObjectKeyClass, +}; + const constants = { bucketMetastore: '__metastore', defaultConnectorName: 'source-connector', @@ -10,57 +101,9 @@ const constants = { connectorUpdatedEvent: 'connector-updated', bucketRemovedFromConnectorEvent: 'bucket-removed', connectorsReconciledEvent: 'connectors-reconciled', - defaultConnectorConfig: { - 'connector.class': 'com.mongodb.kafka.connect.MongoSourceConnector', - 'pipeline': '[]', - 'collection': '', - // If no timestamp is provided, the startup mode will be equivalent - // to 'latest' which will pick up from the latest event in the oplog - 'startup.mode': 'timestamp', - // JSON output converter config - // Using a string converter to avoid getting an over-stringified - // JSON that is returned by default - 'output.format.value': 'json', - 'value.converter.schemas.enable': false, - 'value.converter': 'org.apache.kafka.connect.storage.StringConverter', - // Kafka message key config - // The message key is set to only contain the bucket where the event happend. - // This will make events of the same bucket always land in the same partition - // as they will have the same key - 'output.format.key': 'schema', - 'output.schema.key': JSON.stringify({ - type: 'record', - name: 'keySchema', - fields: [{ - name: 'ns', - type: [{ - name: 'ns', - type: 'record', - fields: [{ - name: 'coll', - type: ['string', 'null'], - }], - }, 'null'], - }, { - name: 'fullDocument', - type: [{ - type: 'record', - name: 'fullDocumentRecord', - fields: [{ - name: 'value', - type: [{ - type: 'record', - name: 'valueRecord', - fields: [{ - name: 'key', - type: ['string', 'null'], - }], - }, 'null'], - }], - }, 'null'], - }], - }), - }, + transformObjectKeyClass, + defaultConnectorConfig, + smtKeyConfig, }; module.exports = constants; diff --git a/extensions/oplogPopulator/modules/ConnectorsManager.js b/extensions/oplogPopulator/modules/ConnectorsManager.js index 8c50ef331..74f57bc7d 100644 --- a/extensions/oplogPopulator/modules/ConnectorsManager.js +++ b/extensions/oplogPopulator/modules/ConnectorsManager.js @@ -21,6 +21,7 @@ const paramsJoi = joi.object({ cronRule: joi.string().required(), prefix: joi.string(), heartbeatIntervalMs: joi.number().required(), + transformObjectKey: joi.boolean().default(false), kafkaConnectHost: joi.string().required(), kafkaConnectPort: joi.number().required(), metricsHandler: joi.object() @@ -81,6 +82,11 @@ class ConnectorsManager extends EventEmitter { this._oldConnectors = []; this._allocationStrategy = params.allocationStrategy; this._pipelineFactory = params.pipelineFactory; + // When true, connectors are configured with the TransformObjectKey + // SMT so the oplog message key is the raw S3 object key. Enabled via + // the oplogPopulator 'transformObjectKey' config flag, set by the + // operator once Kafka Connect ships the TransformObjectKey plugin. + this._transformObjectKey = params.transformObjectKey || false; } /** @@ -102,6 +108,7 @@ class ConnectorsManager extends EventEmitter { }; return { ...constants.defaultConnectorConfig, + ...(this._transformObjectKey ? constants.smtKeyConfig : {}), ...connectorConfig }; } @@ -165,12 +172,26 @@ class ConnectorsManager extends EventEmitter { } // generating a new config as the old config can be outdated (wrong topic for example) const config = this._getDefaultConnectorConfiguration(connectorName); + // update existing connector config while leaving in fields that were + // added manually like 'offset.topic.name' + const mergedConfig = { ...oldConfig, ...config }; + // When the SMT is disabled, scrub any SMT-only keys that may + // linger in oldConfig from a previous run with it enabled. + // Otherwise the connector keeps referencing the missing + // TransformObjectKey class and fails to start. The SMT-only + // keys are those in smtKeyConfig that the legacy base does not + // also define (output.schema.key is restored, not removed). + if (!this._transformObjectKey) { + for (const k of Object.keys(constants.smtKeyConfig)) { + if (!(k in constants.defaultConnectorConfig)) { + delete mergedConfig[k]; + } + } + } // initializing connector const connector = new Connector({ name: connectorName, - // update existing connector config while leaving in fields that were - // added manually like 'offset.topic.name' - config: { ...oldConfig, ...config }, + config: mergedConfig, buckets, getPipeline: this._pipelineFactory.getPipeline, isRunning: true, diff --git a/tests/unit/oplogPopulator/ConnectorsManager.js b/tests/unit/oplogPopulator/ConnectorsManager.js index 6c81e8666..8f95655d2 100644 --- a/tests/unit/oplogPopulator/ConnectorsManager.js +++ b/tests/unit/oplogPopulator/ConnectorsManager.js @@ -65,6 +65,28 @@ const connectorConfig = { 'heartbeat.interval.ms': 10000, }; +const connectorConfigSMT = { + ...connectorConfig, + 'output.schema.key': JSON.stringify({ + type: 'record', + name: 'keySchema', + fields: [{ + name: 'documentKey', + type: [{ + type: 'record', + name: 'documentKeyRecord', + fields: [{ + name: '_id', + type: ['string', 'null'], + }], + }, 'null'], + }], + }), + 'key.converter': 'org.apache.kafka.connect.storage.StringConverter', + 'transforms': 'stripObjectKey', + 'transforms.stripObjectKey.type': constants.transformObjectKeyClass, +}; + describe('ConnectorsManager', () => { let connectorsManager; let connector1; @@ -121,11 +143,18 @@ describe('ConnectorsManager', () => { }); describe('_getDefaultConnectorConfiguration', () => { - it('should return default configuration', () => { + it('should return legacy configuration when transformObjectKey is disabled', () => { const config = connectorsManager._getDefaultConnectorConfiguration( 'source-connector'); assert.deepEqual(config, connectorConfig); }); + + it('should return SMT configuration when transformObjectKey is enabled', () => { + connectorsManager._transformObjectKey = true; + const config = connectorsManager._getDefaultConnectorConfiguration( + 'source-connector'); + assert.deepEqual(config, connectorConfigSMT); + }); }); describe('_generateConnectorName', () => { @@ -186,6 +215,52 @@ describe('ConnectorsManager', () => { assert.strictEqual(connectors[0].isRunning, true); }); + it('should strip stale SMT keys from oldConfig when the SMT is unavailable', async () => { + const config = { + ...connectorConfig, + // Simulate a previous run where the SMT was active: the + // stored connector config still references the SMT class + + // converter. We expect _processOldConnectors to drop those + // keys when transformObjectKey is now disabled. + 'transforms': 'stripObjectKey', + 'transforms.stripObjectKey.type': constants.transformObjectKeyClass, + 'key.converter': 'org.apache.kafka.connect.storage.StringConverter', + }; + sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig') + .resolves(config); + sinon.stub(connectorsManager._pipelineFactory, 'getOldConnectorBucketList') + .returns(['bucket1']); + sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector'); + connectorsManager._transformObjectKey = false; + const connectors = await connectorsManager._processOldConnectors(['source-connector']); + assert.strictEqual(connectors.length, 1); + assert.strictEqual(connectors[0].config['transforms'], undefined); + assert.strictEqual(connectors[0].config['transforms.stripObjectKey.type'], undefined); + assert.strictEqual(connectors[0].config['key.converter'], undefined); + // legacy key schema is restored, not dropped + assert.strictEqual(connectors[0].config['output.schema.key'], + constants.defaultConnectorConfig['output.schema.key']); + }); + + it('should keep SMT keys on oldConfig when transformObjectKey is enabled', async () => { + const config = { + ...connectorConfig, + 'transforms': 'stripObjectKey', + 'transforms.stripObjectKey.type': constants.transformObjectKeyClass, + 'key.converter': 'org.apache.kafka.connect.storage.StringConverter', + }; + sinon.stub(connectorsManager._kafkaConnect, 'getConnectorConfig') + .resolves(config); + sinon.stub(connectorsManager._pipelineFactory, 'getOldConnectorBucketList') + .returns(['bucket1']); + sinon.stub(connectorsManager._kafkaConnect, 'deleteConnector'); + connectorsManager._transformObjectKey = true; + const connectors = await connectorsManager._processOldConnectors(['source-connector']); + assert.strictEqual(connectors[0].config['transforms'], 'stripObjectKey'); + assert.strictEqual(connectors[0].config['transforms.stripObjectKey.type'], + constants.transformObjectKeyClass); + }); + it('should warn when the number of retrieved bucket in a connector exceeds the limit', async () => { const config = { ...connectorConfig }; config['topic.namespace.map'] = 'outdated-topic'; @@ -455,13 +530,16 @@ describe('ConnectorsManager', () => { sinon.restore(); }); - it('should schedule connector updates', () => { - const updateConnectorsStub = sinon.stub(connectorsManager, '_updateConnectors'); + it('should schedule connector updates', async () => { + const updateConnectorsStub = sinon.stub(connectorsManager, '_updateConnectors') + .resolves(); + let scheduledCb; sinon.stub(schedule, 'scheduleJob').callsFake((rule, cb) => { - cb(); + scheduledCb = cb; }); connectorsManager.scheduleConnectorUpdates(); - assert(updateConnectorsStub.called); + await scheduledCb(); + assert(updateConnectorsStub.calledOnce); }); }); }); diff --git a/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js index 1b5d423f4..f3aba27d1 100644 --- a/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js +++ b/tests/unit/oplogPopulator/OplogPopulatorConfigValidator.js @@ -24,4 +24,29 @@ describe('OplogPopulatorConfigValidator', () => { assert.strictEqual(result.value.locationStrippingBytesThreshold, 100 * 1000000); }); }); + + describe('transformObjectKey validation', () => { + it('should default to false when not provided', () => { + const result = OplogPopulatorConfigJoiSchema.validate(defaultConfig); + assert.ifError(result.error); + assert.strictEqual(result.value.transformObjectKey, false); + }); + + it('should accept an explicit boolean', () => { + const result = OplogPopulatorConfigJoiSchema.validate({ + ...defaultConfig, + transformObjectKey: true, + }); + assert.ifError(result.error); + assert.strictEqual(result.value.transformObjectKey, true); + }); + + it('should reject a non-boolean', () => { + const result = OplogPopulatorConfigJoiSchema.validate({ + ...defaultConfig, + transformObjectKey: 'yes', + }); + assert(result.error); + }); + }); });