From 33c110dceb492fc8fb5944db34b385f9bef070bd Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 7 Feb 2022 10:10:18 +0100 Subject: [PATCH] Pulsar HerdDB Source - initial commit --- herddb-thirdparty/pom.xml | 1 + herddb-thirdparty/pulsar-source/pom.xml | 46 +++++ .../main/java/herddb/pulsar/HerdDBSource.java | 192 ++++++++++++++++++ .../herddb/pulsar/HerdDBSourceConfig.java | 23 +++ .../META-INF/services/pulsar-io.yaml | 23 +++ 5 files changed, 285 insertions(+) create mode 100644 herddb-thirdparty/pulsar-source/pom.xml create mode 100644 herddb-thirdparty/pulsar-source/src/main/java/herddb/pulsar/HerdDBSource.java create mode 100644 herddb-thirdparty/pulsar-source/src/main/java/herddb/pulsar/HerdDBSourceConfig.java create mode 100644 herddb-thirdparty/pulsar-source/src/main/resources/META-INF/services/pulsar-io.yaml diff --git a/herddb-thirdparty/pom.xml b/herddb-thirdparty/pom.xml index 18370b439..5a2d23e48 100644 --- a/herddb-thirdparty/pom.xml +++ b/herddb-thirdparty/pom.xml @@ -32,6 +32,7 @@ ${maven.build.timestamp} + pulsar-source openjpa openjpa-test diff --git a/herddb-thirdparty/pulsar-source/pom.xml b/herddb-thirdparty/pulsar-source/pom.xml new file mode 100644 index 000000000..cf94d25b5 --- /dev/null +++ b/herddb-thirdparty/pulsar-source/pom.xml @@ -0,0 +1,46 @@ + + + + + org.herddb + herddb-thirdparty + 0.25.0-SNAPSHOT + ../pom.xml + + 4.0.0 + HerdDB Pulsar Source + herddb-pulsar-source + jar + + ${maven.build.timestamp} + + + + org.apache.pulsar + pulsar-io-core + 2.9.1 + + + ${project.groupId} + herddb-core + ${project.version} + + + diff --git a/herddb-thirdparty/pulsar-source/src/main/java/herddb/pulsar/HerdDBSource.java b/herddb-thirdparty/pulsar-source/src/main/java/herddb/pulsar/HerdDBSource.java new file mode 100644 index 000000000..3fa48918d --- /dev/null +++ b/herddb-thirdparty/pulsar-source/src/main/java/herddb/pulsar/HerdDBSource.java @@ -0,0 +1,192 @@ +/* + * Licensed to Diennea S.r.l. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Diennea S.r.l. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package herddb.pulsar; + +import herddb.cdc.ChangeDataCapture; +import herddb.client.ClientConfiguration; +import herddb.codec.DataAccessorForFullRecord; +import herddb.log.LogSequenceNumber; +import herddb.model.Column; +import herddb.model.ColumnTypes; +import herddb.model.Table; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.*; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.functions.api.KVRecord; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.PushSource; +import org.apache.pulsar.io.core.SourceContext; + +import java.util.Collections; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class HerdDBSource extends PushSource> + implements ChangeDataCapture.MutationListener { + + private static final Logger LOG = Logger.getLogger(HerdDBSource.class.getName()); + + private ChangeDataCapture changeDataCapture; + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + String tableSpaceUUID = (String) config.get("tableSpaceUUID"); + String url = (String) config.get("url"); + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.readJdbcUrl(url); + // TODO: support checkpoints + LogSequenceNumber startingPosition = LogSequenceNumber.START_OF_TIME; + changeDataCapture = new ChangeDataCapture(tableSpaceUUID, clientConfig, this, startingPosition, new InMemoryTableHistoryStorage()); + } + + @Override + public void close() throws Exception { + if (changeDataCapture != null) { + changeDataCapture.close(); + } + } + + @Override + public void accept(ChangeDataCapture.Mutation mutation) { + Record> record = buildRecord(mutation); + this.consume(record); + } + + private Record> buildRecord(ChangeDataCapture.Mutation mutation) { + LogSequenceNumber logSequenceNumber = mutation.getLogSequenceNumber(); + Table table = mutation.getTable(); + ChangeDataCapture.MutationType mutationType = mutation.getMutationType(); + DataAccessorForFullRecord record = mutation.getRecord(); + long timestamp = mutation.getTimestamp(); + LOG.log(Level.INFO, "buildRecord for {0}", mutation); + + KeyValueSchema schema = buildSchema(table); + KeyValue pulsarRecord = buildRecord(schema, table, mutationType, record); + return new KVRecord() { + + @Override + public KeyValue getValue() { + return pulsarRecord; + } + + @Override + public Schema getKeySchema() { + return schema.getKeySchema(); + } + + @Override + public Schema getValueSchema() { + return schema.getValueSchema(); + } + + @Override + public KeyValueEncodingType getKeyValueEncodingType() { + return KeyValueEncodingType.SEPARATED; + } + }; + } + + private KeyValue buildRecord(KeyValueSchema schema, + Table table, ChangeDataCapture.MutationType mutationType, + DataAccessorForFullRecord record) { + GenericRecordBuilder keyBuilder = ((GenericSchema) schema.getKeySchema()).newRecordBuilder(); + GenericRecordBuilder valueBuilder = ((GenericSchema) schema.getValueSchema()).newRecordBuilder(); + for (Column col : table.columns) { + boolean isPk = table.isPrimaryKeyColumn(col.name); + GenericRecordBuilder builder = isPk ? keyBuilder : valueBuilder; + Object value = record.get(col.name); + builder.set(col.name, value); + } + return new KeyValue(keyBuilder.build(), valueBuilder.build()); + } + + private KeyValueSchema buildSchema(Table table) { + Schema keySchema = buildSchema(table.name+"Key", table.primaryKey, table); + String[] otherColumns = new String[table.columns.length - table.primaryKey.length]; + int pos = 0; + for (int i = 0; i < table.columns.length; i++) { + Column column = table.columns[i]; + if (!table.isPrimaryKeyColumn(column.name)) { + otherColumns[pos++] = column.name; + } + } + Schema valueSchema = buildSchema(table.name+"Value", otherColumns, table); + return (KeyValueSchema) + Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.SEPARATED); + } + + private Schema buildSchema(String name, String[] columns, Table table) { + RecordSchemaBuilder builder = SchemaBuilder.record(name); + for (String column : columns) { + Column col = table.getColumn(column); + FieldSchemaBuilder field = builder.field(col.name) + .type(convertType(col.type)); + if (ColumnTypes.isNotNullDataType(col.type) || table.isPrimaryKeyColumn(col.name)) { + field.required(); + } + } + SchemaInfo build = builder.build(SchemaType.JSON); + return GenericSchema.of(build); + } + + private static SchemaType convertType(int type) { + switch (type) { + case ColumnTypes.INTEGER: + case ColumnTypes.NOTNULL_INTEGER: + return SchemaType.INT32; + case ColumnTypes.STRING: + case ColumnTypes.NOTNULL_STRING: + return SchemaType.STRING; + default: + throw new IllegalArgumentException("Type " +type + " )" + ColumnTypes.typeToString(type) + + ") is not supported yet"); + } + } + + private static class InMemoryTableHistoryStorage implements ChangeDataCapture.TableSchemaHistoryStorage { + + private Map> definitions = new ConcurrentHashMap<>(); + + @Override + public void storeSchema(LogSequenceNumber lsn, Table table) { + LOG.log(Level.INFO, "storeSchema {0} {1}", new Object[] {lsn, table.name}); + SortedMap tableHistory = definitions.computeIfAbsent(table.name, (n)-> Collections.synchronizedSortedMap(new TreeMap<>())); + tableHistory.put(lsn, table); + } + + @Override + public Table fetchSchema(LogSequenceNumber lsn, String tableName) { + LOG.log(Level.INFO, "fetchSchema {0} {1}", new Object[] {lsn, tableName}); + SortedMap tableHistory = definitions.computeIfAbsent(tableName, (n)-> Collections.synchronizedSortedMap(new TreeMap<>())); + SortedMap after = tableHistory.headMap(lsn); + if (after.isEmpty()) { + return after.get(tableHistory.lastKey()); + } + return after.values().iterator().next(); + } + } +} \ No newline at end of file diff --git a/herddb-thirdparty/pulsar-source/src/main/java/herddb/pulsar/HerdDBSourceConfig.java b/herddb-thirdparty/pulsar-source/src/main/java/herddb/pulsar/HerdDBSourceConfig.java new file mode 100644 index 000000000..471dc124f --- /dev/null +++ b/herddb-thirdparty/pulsar-source/src/main/java/herddb/pulsar/HerdDBSourceConfig.java @@ -0,0 +1,23 @@ +/* + * Licensed to Diennea S.r.l. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Diennea S.r.l. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package herddb.pulsar; + +public class HerdDBSourceConfig { +} diff --git a/herddb-thirdparty/pulsar-source/src/main/resources/META-INF/services/pulsar-io.yaml b/herddb-thirdparty/pulsar-source/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 000000000..c9c9114bd --- /dev/null +++ b/herddb-thirdparty/pulsar-source/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: herddb +description: HerdDB source +sourceClass: herddb.pulsar.HerdDBSource +sourceConfigClass: herddb.pulsar.HerdDBSourceConfig