diff --git a/.github/workflows/build-kafka.yaml b/.github/workflows/build-kafka.yaml
index c24c7b679e..472ad1172b 100644
--- a/.github/workflows/build-kafka.yaml
+++ b/.github/workflows/build-kafka.yaml
@@ -66,6 +66,7 @@ jobs:
KAFKA_IMAGE=${{ env.KAFKA_IMAGE }}
KAFKA_TAG=${{ env.KAFKA_TAG }}-${{ env.BUILD_TREE_HASH }}
MONGODB_CONNECTOR_TAG=${{ env.MONGODB_CONNECTOR_TAG }}
+ KAFKA_VERSION=${{ env.kafka_version }}
tags: "${{ env.KAFKA_CONNECT_IMAGE }}:${{ env.KAFKA_CONNECT_TAG }}-${{ env.BUILD_TREE_HASH }}"
cache-from: |
type=registry,ref=${{ env.KAFKA_CONNECT_IMAGE }}:${{ env.KAFKA_CONNECT_TAG }}-${{ env.BUILD_TREE_HASH }}
diff --git a/solution/kafka-connect/Dockerfile b/solution/kafka-connect/Dockerfile
index 0668dc67a2..43cabbccea 100644
--- a/solution/kafka-connect/Dockerfile
+++ b/solution/kafka-connect/Dockerfile
@@ -17,6 +17,13 @@ WORKDIR /tmp
RUN curl -LO https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/${MONGODB_CONNECTOR_TAG}/mongo-kafka-connect-${MONGODB_CONNECTOR_TAG}-all.jar && \
mv /tmp/mongo-kafka-connect-${MONGODB_CONNECTOR_TAG}-all.jar /tmp/mongo-kafka-connect.jar
+# Build Scality SMT plugin
+FROM maven:3.9-eclipse-temurin-17 AS smt-build
+ARG KAFKA_VERSION
+WORKDIR /build
+COPY smt/ ./
+RUN mvn -B -Dkafka.version=${KAFKA_VERSION} package
+
# Use Kafka base image
FROM ${KAFKA_IMAGE}:${KAFKA_TAG}
@@ -30,3 +37,7 @@ ENV KAFKA_OPTS=-javaagent:/opt/jmx-exporter/jmx_prometheus.jar=9020:/etc/jmx-exp
# mongodb ingestor
COPY --from=mongodb-connector /tmp/mongo-kafka-connect.jar /usr/local/share/kafka/plugins/
+
+# scality smt plugin
+COPY --from=smt-build /build/target/scality-kafka-connect-transforms-*.jar \
+ /usr/local/share/kafka/plugins/
diff --git a/solution/kafka-connect/smt/.gitignore b/solution/kafka-connect/smt/.gitignore
new file mode 100644
index 0000000000..2f7896d1d1
--- /dev/null
+++ b/solution/kafka-connect/smt/.gitignore
@@ -0,0 +1 @@
+target/
diff --git a/solution/kafka-connect/smt/pom.xml b/solution/kafka-connect/smt/pom.xml
new file mode 100644
index 0000000000..27f51e4878
--- /dev/null
+++ b/solution/kafka-connect/smt/pom.xml
@@ -0,0 +1,52 @@
+
+
+ 4.0.0
+
+ com.scality.kafka.connect
+ scality-kafka-connect-transforms
+ 1.0.0
+ jar
+
+
+ 11
+ 11
+ UTF-8
+ 3.1.2
+ 5.10.2
+
+
+
+
+ org.apache.kafka
+ connect-api
+ ${kafka.version}
+ provided
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.36
+ provided
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit.version}
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.5
+
+
+
+
diff --git a/solution/kafka-connect/smt/src/main/java/com/scality/kafka/connect/transforms/TransformObjectKey.java b/solution/kafka-connect/smt/src/main/java/com/scality/kafka/connect/transforms/TransformObjectKey.java
new file mode 100644
index 0000000000..60b083b6c4
--- /dev/null
+++ b/solution/kafka-connect/smt/src/main/java/com/scality/kafka/connect/transforms/TransformObjectKey.java
@@ -0,0 +1,127 @@
+package com.scality.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Kafka Connect SMT that rewrites the message key to the raw S3 object key,
+ * stripping the Scality master/version encoding used in the MongoDB metadata
+ * collections' _id field.
+ *
+ * Encoded _id forms produced by arsenal:
+ * - V1 master: "\x7FM" + rawKey
+ * - V1 version: "\x7FV" + rawKey + "\x00" + versionId
+ * - V0 legacy: rawKey (no prefix)
+ *
+ * This SMT collapses master and all versions of the same logical S3 object
+ * onto the same Kafka partition.
+ *
+ * Expects the connector's output.schema.key to project documentKey._id, so
+ * record.key() is a Struct with a nested documentKey.{_id}. Falls through
+ * unchanged if the key is null or of an unexpected shape; unexpected shapes
+ * are logged at DEBUG so configuration mismatches can be diagnosed.
+ */
+public class TransformObjectKey> implements Transformation {
+
+ private static final Logger log = LoggerFactory.getLogger(TransformObjectKey.class);
+ private static final ConfigDef CONFIG_DEF = new ConfigDef();
+
+ private static final char SCALITY_PREFIX_BYTE = '\u007F';
+ private static final char MASTER_TAG = 'M';
+ private static final char VERSION_TAG = 'V';
+ private static final char VERSION_SEPARATOR = '\u0000';
+
+ static String stripObjectKey(String id) {
+ if (id == null || id.length() < 2 || id.charAt(0) != SCALITY_PREFIX_BYTE) {
+ return id;
+ }
+ char tag = id.charAt(1);
+ if (tag == MASTER_TAG) {
+ return id.substring(2);
+ }
+ if (tag == VERSION_TAG) {
+ String tail = id.substring(2);
+ int sep = tail.indexOf(VERSION_SEPARATOR);
+ return sep >= 0 ? tail.substring(0, sep) : tail;
+ }
+ return id;
+ }
+
+ @Override
+ public R apply(R record) {
+ String id = extractDocumentKeyId(record.key());
+ if (id == null) {
+ return record;
+ }
+ String stripped = stripObjectKey(id);
+ // Pass null partition so Connect's partitioner re-hashes on the new key.
+ // Forwarding record.kafkaPartition() would pin the message to whatever
+ // partition the source connector chose, making this SMT a no-op for routing.
+ return record.newRecord(
+ record.topic(),
+ null,
+ Schema.STRING_SCHEMA,
+ stripped,
+ record.valueSchema(),
+ record.value(),
+ record.timestamp(),
+ record.headers());
+ }
+
+ private static String extractDocumentKeyId(Object key) {
+ if (key == null) {
+ return null;
+ }
+ if (key instanceof String) {
+ return (String) key;
+ }
+ if (!(key instanceof Struct)) {
+ log.debug("Unsupported key type {}; passing through unchanged",
+ key.getClass().getName());
+ return null;
+ }
+ Struct s = (Struct) key;
+ if (s.schema().field("documentKey") == null) {
+ log.debug("Key Struct has no documentKey field; passing through unchanged");
+ return null;
+ }
+ Object docKey = s.get("documentKey");
+ if (!(docKey instanceof Struct)) {
+ log.debug("documentKey is not a Struct ({}); passing through unchanged",
+ docKey == null ? "null" : docKey.getClass().getName());
+ return null;
+ }
+ Struct d = (Struct) docKey;
+ if (d.schema().field("_id") == null) {
+ log.debug("documentKey Struct has no _id field; passing through unchanged");
+ return null;
+ }
+ Object id = d.get("_id");
+ if (!(id instanceof String)) {
+ log.debug("documentKey._id is not a String ({}); passing through unchanged",
+ id == null ? "null" : id.getClass().getName());
+ return null;
+ }
+ return (String) id;
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void configure(Map configs) {
+ }
+}
diff --git a/solution/kafka-connect/smt/src/test/java/com/scality/kafka/connect/transforms/TransformObjectKeyTest.java b/solution/kafka-connect/smt/src/test/java/com/scality/kafka/connect/transforms/TransformObjectKeyTest.java
new file mode 100644
index 0000000000..3037574e24
--- /dev/null
+++ b/solution/kafka-connect/smt/src/test/java/com/scality/kafka/connect/transforms/TransformObjectKeyTest.java
@@ -0,0 +1,161 @@
+package com.scality.kafka.connect.transforms;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+class TransformObjectKeyTest {
+
+ private static final String VID = "98765432101234567890ABCD";
+ private static final String M = "\u007FM";
+ private static final String V = "\u007FV";
+ private static final String NUL = "\u0000";
+
+ @Test
+ void stripsV1Master() {
+ assertEquals("my/object", TransformObjectKey.stripObjectKey(M + "my/object"));
+ }
+
+ @Test
+ void stripsV1Version() {
+ assertEquals("my/object",
+ TransformObjectKey.stripObjectKey(V + "my/object" + NUL + VID));
+ }
+
+ @Test
+ void passesThroughV0Legacy() {
+ assertEquals("legacy-key-no-prefix",
+ TransformObjectKey.stripObjectKey("legacy-key-no-prefix"));
+ }
+
+ @Test
+ void keepsNullInsideMasterRawKey() {
+ assertEquals("foo" + NUL + "bar",
+ TransformObjectKey.stripObjectKey(M + "foo" + NUL + "bar"));
+ }
+
+ @Test
+ void versionWithEmptyKey() {
+ assertEquals("", TransformObjectKey.stripObjectKey(V + NUL + "vidonly"));
+ }
+
+ @Test
+ void masterWithEmptyKey() {
+ assertEquals("", TransformObjectKey.stripObjectKey(M));
+ }
+
+ @Test
+ void versionWithoutSeparator() {
+ assertEquals("orphan", TransformObjectKey.stripObjectKey(V + "orphan"));
+ }
+
+ @Test
+ void unrecognizedPrefixPassesThrough() {
+ assertEquals("NotMV-passthrough",
+ TransformObjectKey.stripObjectKey("NotMV-passthrough"));
+ }
+
+ @Test
+ void unicodeRawKey() {
+ assertEquals("ünîçødé/path",
+ TransformObjectKey.stripObjectKey(M + "ünîçødé/path"));
+ }
+
+ @Test
+ void nullId() {
+ assertNull(TransformObjectKey.stripObjectKey(null));
+ }
+
+ @Test
+ void applyRewritesStructKey() {
+ Schema docKeySchema = SchemaBuilder.struct()
+ .field("_id", Schema.STRING_SCHEMA).build();
+ Schema keySchema = SchemaBuilder.struct()
+ .field("documentKey", docKeySchema).build();
+ Struct docKey = new Struct(docKeySchema).put("_id", M + "bucket/obj");
+ Struct key = new Struct(keySchema).put("documentKey", docKey);
+
+ SourceRecord in = sourceRecord(keySchema, key);
+
+ try (TransformObjectKey smt = new TransformObjectKey<>()) {
+ smt.configure(Collections.emptyMap());
+ SourceRecord out = smt.apply(in);
+ assertEquals("bucket/obj", out.key());
+ assertEquals(Schema.STRING_SCHEMA, out.keySchema());
+ assertSame(in.value(), out.value());
+ }
+ }
+
+ @Test
+ void applyPassesThroughOnNullKey() {
+ SourceRecord in = sourceRecord(null, null);
+ try (TransformObjectKey smt = new TransformObjectKey<>()) {
+ assertSame(in, smt.apply(in));
+ }
+ }
+
+ @Test
+ void applyPassesThroughOnStructWithoutDocumentKey() {
+ Schema keySchema = SchemaBuilder.struct()
+ .field("other", Schema.STRING_SCHEMA).build();
+ Struct key = new Struct(keySchema).put("other", "irrelevant");
+ SourceRecord in = sourceRecord(keySchema, key);
+ try (TransformObjectKey smt = new TransformObjectKey<>()) {
+ assertSame(in, smt.apply(in));
+ }
+ }
+
+ @Test
+ void applyPassesThroughOnDocumentKeyWithoutId() {
+ Schema docKeySchema = SchemaBuilder.struct()
+ .field("other", Schema.STRING_SCHEMA).build();
+ Schema keySchema = SchemaBuilder.struct()
+ .field("documentKey", docKeySchema).build();
+ Struct docKey = new Struct(docKeySchema).put("other", "x");
+ Struct key = new Struct(keySchema).put("documentKey", docKey);
+ SourceRecord in = sourceRecord(keySchema, key);
+ try (TransformObjectKey smt = new TransformObjectKey<>()) {
+ assertSame(in, smt.apply(in));
+ }
+ }
+
+ @Test
+ void applyPassesThroughOnNonStringId() {
+ Schema docKeySchema = SchemaBuilder.struct()
+ .field("_id", Schema.INT64_SCHEMA).build();
+ Schema keySchema = SchemaBuilder.struct()
+ .field("documentKey", docKeySchema).build();
+ Struct docKey = new Struct(docKeySchema).put("_id", 42L);
+ Struct key = new Struct(keySchema).put("documentKey", docKey);
+ SourceRecord in = sourceRecord(keySchema, key);
+ try (TransformObjectKey smt = new TransformObjectKey<>()) {
+ assertSame(in, smt.apply(in));
+ }
+ }
+
+ @Test
+ void applyTreatsRawStringKeyAsId() {
+ SourceRecord in = sourceRecord(Schema.STRING_SCHEMA, M + "bucket/obj");
+ try (TransformObjectKey smt = new TransformObjectKey<>()) {
+ SourceRecord out = smt.apply(in);
+ assertEquals("bucket/obj", out.key());
+ assertEquals(Schema.STRING_SCHEMA, out.keySchema());
+ }
+ }
+
+ private static SourceRecord sourceRecord(Schema keySchema, Object key) {
+ return new SourceRecord(
+ Collections.emptyMap(), Collections.emptyMap(),
+ "topic", 0,
+ keySchema, key,
+ Schema.STRING_SCHEMA, "value");
+ }
+}