diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java index 9f508b01d..df93fb15b 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java @@ -18,11 +18,13 @@ package org.apache.wayang.api.sql.calcite.converter; +import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.stream.Collectors; import org.apache.calcite.rel.core.AggregateCall; - +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.wayang.api.sql.calcite.converter.functions.AggregateAddCols; import org.apache.wayang.api.sql.calcite.converter.functions.AggregateFunction; import org.apache.wayang.api.sql.calcite.converter.functions.AggregateKeyExtractor; @@ -57,15 +59,40 @@ Operator visit(final WayangAggregate wayangRelNode) { Record.class); childOp.connectTo(0, mapOperator, 0); - final Operator aggregateOperator = wayangRelNode.getGroupCount() > 0 ? new ReduceByOperator<>( - new TransformationDescriptor<>(new AggregateKeyExtractor(groupingFields), Record.class, Object.class), - new ReduceDescriptor<>(new AggregateFunction(aggregateCalls), - DataUnitType.createGrouped(Record.class), - DataUnitType.createBasicUnchecked(Record.class))) - : new GlobalReduceOperator<>( - new ReduceDescriptor<>(new AggregateFunction(aggregateCalls), - DataUnitType.createGrouped(Record.class), - DataUnitType.createBasicUnchecked(Record.class))); + + final Operator aggregateOperator; + + if (wayangRelNode.getGroupCount() > 0) { + aggregateOperator = new ReduceByOperator<>( + new TransformationDescriptor<>( + new AggregateKeyExtractor(groupingFields), Record.class, + Object.class), + new ReduceDescriptor<>(new AggregateFunction(aggregateCalls), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class))); + } else { + final List reductionFunctions = wayangRelNode.getNamedAggCalls().stream() + .map(agg -> agg.left.getAggregation().getName()).toList(); + + final List fields = wayangRelNode.getInput().getRowType().getFieldList().stream() + .map(RelDataTypeField::getName).toList(); + + final List aliases = wayangRelNode.getRowType().getFieldList().stream() + .map(RelDataTypeField::getName).toList(); + + final String[] reductionStatements = new String[reductionFunctions.size()]; + + for (int i = 0; i < reductionStatements.length; i++) { + reductionStatements[i] = reductionFunctions.get(i) + "(" + fields.get(i) + ") AS " + aliases.get(i); + } + + aggregateOperator = new GlobalReduceOperator<>( + new ReduceDescriptor<>(new AggregateFunction(aggregateCalls), + DataUnitType.createGrouped(Record.class), + DataUnitType.createBasicUnchecked(Record.class)) + .withSqlImplementation( + Arrays.stream(reductionStatements).collect(Collectors.joining(",")))); + } mapOperator.connectTo(0, aggregateOperator, 0); @@ -74,6 +101,7 @@ Operator visit(final WayangAggregate wayangRelNode) { Record.class, Record.class); aggregateOperator.connectTo(0, mapOperator2, 0); + return mapOperator2; } } \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java index 9bde24477..77537826d 100755 --- a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java +++ b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; + import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; @@ -31,6 +32,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -65,9 +67,12 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RuleSet; import org.apache.calcite.tools.RuleSets; + import org.apache.wayang.api.sql.calcite.convention.WayangConvention; import org.apache.wayang.api.sql.calcite.converter.functions.FilterPredicateImpl; import org.apache.wayang.api.sql.calcite.converter.functions.ProjectMapFuncImpl; @@ -87,10 +92,12 @@ import org.apache.wayang.core.util.Tuple; import org.apache.wayang.java.Java; import org.apache.wayang.jdbc.execution.JdbcExecutor; +import org.apache.wayang.jdbc.operators.JdbcGlobalReduceOperator; import org.apache.wayang.jdbc.operators.JdbcProjectionOperator; import org.apache.wayang.jdbc.operators.JdbcTableSource; import org.apache.wayang.postgres.mapping.ProjectionMapping; import org.apache.wayang.spark.Spark; + import org.json.simple.parser.ParseException; import org.junit.jupiter.api.Test; @@ -98,26 +105,64 @@ import com.fasterxml.jackson.databind.ObjectMapper; class SqlToWayangRelTest { + @Test + public void serializeFilter() throws Exception { + // create filterPredicateImpl for serialisation + final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); + final RexBuilder rb = new RexBuilder(typeFactory); + final RexNode leftOperand = rb.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0); + final RexNode rightOperand = rb.makeLiteral("test"); + final RexNode cond = rb.makeCall(SqlStdOperatorTable.EQUALS, leftOperand, rightOperand); + final SerializablePredicate fpImpl = new FilterPredicateImpl(cond); + + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.writeObject(fpImpl); + objectOutputStream.close(); + + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); + final ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + final Object deserializedObject = objectInputStream.readObject(); + objectInputStream.close(); + + assertTrue(((FilterPredicateImpl) deserializedObject).test(new Record("test"))); + } + + @Test + void sqlApiReduceTest() throws Exception { + final JavaTypeFactoryImpl typeFactory = new JavaTypeFactoryImpl(); + + final VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.empty()); + planner.addRelTraitDef(ConventionTraitDef.INSTANCE); + + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final RelDataType rowType = new Builder(typeFactory).add("ID", typeFactory.createJavaType(Integer.class)) + .add("NAME", typeFactory.createJavaType(String.class)).build(); + + rootSchema.add("T1", new AbstractTable() { + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) { + return rowType; + } + }); + + final FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(rootSchema) + .costFactory(RelOptCostImpl.FACTORY).build(); + + final RelBuilder relBuilder = RelBuilder.create(config); + + final RelNode relTree = relBuilder.scan("T1").aggregate(relBuilder.groupKey(), relBuilder.count()).build(); + + final SqlDialect dialect = SqlDialect.DatabaseProduct.CALCITE.getDialect(); + final RelToSqlConverter converter = new RelToSqlConverter(dialect); + final SqlNode sqlNode = converter.visitRoot(relTree).asStatement(); - /** - * Method for building {@link WayangPlan}s useful for testing, benchmarking and - * other usages where you want to handle the intermediate {@link WayangPlan} - * - * @param sql sql query string with the {@code ;} cut off - * @param udfJars - * @return a {@link WayangPlan} of a given sql string - * @throws SqlParseException - * @throws SQLException - */ - private Tuple2, WayangPlan> buildCollectorAndWayangPlan(final SqlContext context, - final String sql, final String... udfJars) throws SqlParseException, SQLException { final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); - final Optimizer optimizer = Optimizer.create(SchemaUtils.getSchema(context.getConfiguration()), - configProperties, relDataTypeFactory); + final Optimizer optimizer = Optimizer.create(CalciteSchema.from(rootSchema), configProperties, + relDataTypeFactory); - final SqlNode sqlNode = optimizer.parseSql(sql); final SqlNode validatedSqlNode = optimizer.validate(sqlNode); final RelNode relNode = optimizer.convert(validatedSqlNode); @@ -129,11 +174,36 @@ private Tuple2, WayangPlan> buildCollectorAndWayangPlan(final final RelNode wayangRel = optimizer.optimize(relNode, relNode.getTraitSet().plus(WayangConvention.INSTANCE), rules); - final Collection collector = new ArrayList<>(); + final WayangPlan plan = Optimizer.convert(wayangRel, new ArrayList()); - final WayangPlan wayangPlan = Optimizer.convertWithConfig(wayangRel, context.getConfiguration(), collector); + final ProjectionMapping projectionMapping = new ProjectionMapping(); + final PlanTransformation projectionTransformation = projectionMapping.getTransformations().iterator().next() + .thatReplaces(); - return new Tuple2<>(collector, wayangPlan); + final org.apache.wayang.postgres.mapping.GlobalReduceMapping globalReduceMapping = new org.apache.wayang.postgres.mapping.GlobalReduceMapping(); + final PlanTransformation globalReduceTransformation = globalReduceMapping.getTransformations().iterator().next() + .thatReplaces(); + + plan.applyTransformations(List.of(projectionTransformation, globalReduceTransformation)); + + final Collection operators = PlanTraversal.upstream().traverse(plan.getSinks()).getTraversedNodes(); + + final JdbcTableSource table = operators.stream().filter(op -> op instanceof JdbcTableSource) + .map(JdbcTableSource.class::cast).findFirst().orElseThrow(); + final JdbcGlobalReduceOperator globalReduce = operators.stream() + .filter(op -> op instanceof JdbcGlobalReduceOperator).map(JdbcGlobalReduceOperator.class::cast) + .findFirst().orElseThrow(); + + final HashMap> edges = new HashMap<>(); + + edges.put(globalReduce, List.of(table)); + edges.put(table, List.of()); + + final JdbcExecutor jdbcExecutor = mock(); + final StringBuilder query = JdbcExecutor.createSqlString(jdbcExecutor, table, Arrays.asList(), null, + globalReduce, null, null, Arrays.asList()); + + assertTrue(query.toString().contains("COUNT"), "expected query to contain 'count', got: " + query); } @Test @@ -154,7 +224,6 @@ void javaFilterWithCastUsingNumbers() throws Exception { assertTrue(result.stream().allMatch(field -> field.getField(1).equals(1))); } - @Test void javaFilterWithCast() throws Exception { final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); @@ -232,12 +301,15 @@ public RelDataType getRowType(final RelDataTypeFactory typeFactory) { final Collection operators = PlanTraversal.upstream().traverse(plan.getSinks()).getTraversedNodes(); final JdbcTableSource table = operators.stream().filter(op -> op instanceof JdbcTableSource) - .map(JdbcTableSource.class::cast).findFirst().orElseThrow(() -> new RuntimeException("Table not found")); + .map(JdbcTableSource.class::cast).findFirst() + .orElseThrow(() -> new RuntimeException("Table not found")); final JdbcProjectionOperator projection = operators.stream().filter(op -> op instanceof JdbcProjectionOperator) - .map(JdbcProjectionOperator.class::cast).findFirst().orElseThrow(() -> new RuntimeException("Projection not found")); + .map(JdbcProjectionOperator.class::cast).findFirst() + .orElseThrow(() -> new RuntimeException("Projection not found")); final JdbcExecutor jdbcExecutor = mock(); - final StringBuilder query = JdbcExecutor.createSqlString(jdbcExecutor, table, Arrays.asList(), projection, Arrays.asList()); + final StringBuilder query = JdbcExecutor.createSqlString(jdbcExecutor, table, Arrays.asList(), projection, null, + null, null, Arrays.asList()); assertEquals("SELECT ID, NAME FROM T1;", query.toString()); } @@ -344,7 +416,8 @@ void javaAverage() throws Exception { sqlContext.execute(wayangPlan); assertEquals(1, result.size()); - assertEquals(0.875f, result.stream().findFirst().orElseThrow(() -> new RuntimeException("No record found")).getDouble(0)); + assertEquals(0.875f, + result.stream().findFirst().orElseThrow(() -> new RuntimeException("No record found")).getDouble(0)); } @Test @@ -676,29 +749,6 @@ void serializeProjection() throws Exception { assertEquals(impl.apply(testRecord), deserializedImpl.apply(testRecord)); } - @Test - public void serializeFilter() throws Exception { - // create filterPredicateImpl for serialisation - final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); - final RexBuilder rb = new RexBuilder(typeFactory); - final RexNode leftOperand = rb.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0); - final RexNode rightOperand = rb.makeLiteral("test"); - final RexNode cond = rb.makeCall(SqlStdOperatorTable.EQUALS, leftOperand, rightOperand); - final SerializablePredicate fpImpl = new FilterPredicateImpl(cond); - - final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); - objectOutputStream.writeObject(fpImpl); - objectOutputStream.close(); - - final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); - final ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); - final Object deserializedObject = objectInputStream.readObject(); - objectInputStream.close(); - - assertTrue(((FilterPredicateImpl) deserializedObject).test(new Record("test"))); - } - @Test void exampleFilterTableRefToTableRef() throws Exception { final SqlContext sqlContext = createSqlContext("/data/exampleRefToRef.csv"); @@ -725,7 +775,8 @@ void exampleMinWithStrings() throws Exception { final WayangPlan wayangPlan = t.field1; sqlContext.execute(wayangPlan); - assertEquals("AA", result.stream().findAny().orElseThrow(() -> new RuntimeException("No record found")).getString(0)); + assertEquals("AA", + result.stream().findAny().orElseThrow(() -> new RuntimeException("No record found")).getString(0)); } @Test @@ -793,14 +844,12 @@ void sqlApiMultiConditionJoinGeneratesJdbcSql() throws Exception { final RelDataType ordersRowType = new Builder(typeFactory) .add("order_id", typeFactory.createJavaType(Integer.class)) .add("customer_id", typeFactory.createJavaType(Integer.class)) - .add("product_id", typeFactory.createJavaType(Integer.class)) - .build(); + .add("product_id", typeFactory.createJavaType(Integer.class)).build(); final RelDataType shipmentsRowType = new Builder(typeFactory) .add("order_id", typeFactory.createJavaType(Integer.class)) .add("customer_id", typeFactory.createJavaType(Integer.class)) - .add("ship_date", typeFactory.createJavaType(String.class)) - .build(); + .add("ship_date", typeFactory.createJavaType(String.class)).build(); rootSchema.add("orders", new AbstractTable() { @Override @@ -845,30 +894,27 @@ public RelDataType getRowType(final RelDataTypeFactory typeFactory) { final Collection operators = PlanTraversal.upstream().traverse(plan.getSinks()).getTraversedNodes(); - final JdbcTableSource ordersTable = operators.stream() - .filter(op -> op instanceof JdbcTableSource) - .map(JdbcTableSource.class::cast) - .filter(table -> table.getTableName().equals("orders")) - .findFirst().orElseThrow(() -> new RuntimeException("Orders table not found")); + final JdbcTableSource ordersTable = operators.stream().filter(op -> op instanceof JdbcTableSource) + .map(JdbcTableSource.class::cast).filter(table -> table.getTableName().equals("orders")).findFirst() + .orElseThrow(() -> new RuntimeException("Orders table not found")); - final JdbcTableSource shipmentsTable = operators.stream() - .filter(op -> op instanceof JdbcTableSource) - .map(JdbcTableSource.class::cast) - .filter(table -> table.getTableName().equals("shipments")) - .findFirst().orElseThrow(() -> new RuntimeException("Shipments table not found")); + final JdbcTableSource shipmentsTable = operators.stream().filter(op -> op instanceof JdbcTableSource) + .map(JdbcTableSource.class::cast).filter(table -> table.getTableName().equals("shipments")).findFirst() + .orElseThrow(() -> new RuntimeException("Shipments table not found")); assertNotNull(ordersTable, "orders table should be present"); assertNotNull(shipmentsTable, "shipments table should be present"); final org.apache.wayang.basic.operators.JoinOperator joinOp = operators.stream() .filter(op -> op instanceof org.apache.wayang.basic.operators.JoinOperator) - .map(op -> (org.apache.wayang.basic.operators.JoinOperator) op) - .findFirst().orElseThrow(() -> new RuntimeException("Join operator not found")); + .map(op -> (org.apache.wayang.basic.operators.JoinOperator) op).findFirst() + .orElseThrow(() -> new RuntimeException("Join operator not found")); assertNotNull(joinOp, "Join operator should be present"); // Verify the join operator has SQL implementations with correct field names - // This validates that WayangMultiConditionJoinVisitor called withSqlImplementation() + // This validates that WayangMultiConditionJoinVisitor called + // withSqlImplementation() final Tuple leftSqlImpl = joinOp.getKeyDescriptor0().getSqlImplementation(); final Tuple rightSqlImpl = joinOp.getKeyDescriptor1().getSqlImplementation(); @@ -882,17 +928,54 @@ public RelDataType getRowType(final RelDataTypeFactory typeFactory) { // Verify field names are comma-separated (multi-condition) final String leftFields = leftSqlImpl.field1; final String rightFields = rightSqlImpl.field1; - + assertTrue(leftFields.contains("order_id") && leftFields.contains("customer_id"), "Left SQL implementation should contain both order_id and customer_id, got: " + leftFields); assertTrue(rightFields.contains("order_id") && rightFields.contains("customer_id"), "Right SQL implementation should contain both order_id and customer_id, got: " + rightFields); - + // Verify comma-separated format assertTrue(leftFields.contains(","), "Left fields should be comma-separated for multi-condition join"); assertTrue(rightFields.contains(","), "Right fields should be comma-separated for multi-condition join"); } + /** + * Method for building {@link WayangPlan}s useful for testing, benchmarking and + * other usages where you want to handle the intermediate {@link WayangPlan} + * + * @param sql sql query string with the {@code ;} cut off + * @param udfJars + * @return a {@link WayangPlan} of a given sql string + * @throws SqlParseException + * @throws SQLException + */ + private Tuple2, WayangPlan> buildCollectorAndWayangPlan(final SqlContext context, + final String sql, final String... udfJars) throws SqlParseException, SQLException { + final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); + final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); + + final Optimizer optimizer = Optimizer.create(SchemaUtils.getSchema(context.getConfiguration()), + configProperties, relDataTypeFactory); + + final SqlNode sqlNode = optimizer.parseSql(sql); + final SqlNode validatedSqlNode = optimizer.validate(sqlNode); + final RelNode relNode = optimizer.convert(validatedSqlNode); + + final RuleSet rules = RuleSets.ofList(CoreRules.FILTER_INTO_JOIN, WayangRules.WAYANG_TABLESCAN_RULE, + WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, WayangRules.WAYANG_PROJECT_RULE, + WayangRules.WAYANG_FILTER_RULE, WayangRules.WAYANG_JOIN_RULE, WayangRules.WAYANG_AGGREGATE_RULE, + WayangRules.WAYANG_SORT_RULE); + + final RelNode wayangRel = optimizer.optimize(relNode, relNode.getTraitSet().plus(WayangConvention.INSTANCE), + rules); + + final Collection collector = new ArrayList<>(); + + final WayangPlan wayangPlan = Optimizer.convertWithConfig(wayangRel, context.getConfiguration(), collector); + + return new Tuple2<>(collector, wayangPlan); + } + private SqlContext createSqlContext(final String tableResourceName) throws IOException, ParseException, SQLException { final String calciteModel = "{\r\n" + " \"calcite\": {\r\n" + " \"version\": \"1.0\",\r\n" diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/ReduceDescriptor.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/ReduceDescriptor.java index 0bd385347..a41e4bd97 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/ReduceDescriptor.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/function/ReduceDescriptor.java @@ -18,6 +18,8 @@ package org.apache.wayang.core.function; +import java.util.function.BinaryOperator; + import org.apache.wayang.core.optimizer.costs.LoadEstimator; import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator; @@ -25,11 +27,10 @@ import org.apache.wayang.core.types.DataUnitGroupType; import org.apache.wayang.core.types.DataUnitType; -import java.util.function.BinaryOperator; - /** - * This descriptor pertains to functions that take multiple data units and aggregate them into a single data unit - * by means of a tree-like fold, i.e., using a commutative, associative function.. + * This descriptor pertains to functions that take multiple data units and + * aggregate them into a single data unit by means of a tree-like fold, i.e., + * using a commutative, associative function.. */ public class ReduceDescriptor extends FunctionDescriptor { @@ -39,43 +40,67 @@ public class ReduceDescriptor extends FunctionDescriptor { private final SerializableBinaryOperator javaImplementation; - public ReduceDescriptor(SerializableBinaryOperator javaImplementation, - DataUnitGroupType inputType, - BasicDataUnitType outputType) { + /** + * sql implementation of the reduce operator + */ + private String sqlImplementation; + + public ReduceDescriptor(final SerializableBinaryOperator javaImplementation, final DataUnitGroupType inputType, + final BasicDataUnitType outputType) { this(javaImplementation, inputType, outputType, new NestableLoadProfileEstimator( - LoadEstimator.createFallback(1, 1), - LoadEstimator.createFallback(1, 1) - )); + LoadEstimator.createFallback(1, 1), LoadEstimator.createFallback(1, 1))); } - public ReduceDescriptor(SerializableBinaryOperator javaImplementation, - DataUnitGroupType inputType, BasicDataUnitType outputType, - LoadProfileEstimator loadProfileEstimator) { + public ReduceDescriptor(final SerializableBinaryOperator javaImplementation, final DataUnitGroupType inputType, + final BasicDataUnitType outputType, final LoadProfileEstimator loadProfileEstimator) { super(loadProfileEstimator); this.inputType = inputType; this.outputType = outputType; this.javaImplementation = javaImplementation; } - public ReduceDescriptor(SerializableBinaryOperator javaImplementation, Class inputType) { + public ReduceDescriptor(final SerializableBinaryOperator javaImplementation, final Class inputType) { this(javaImplementation, DataUnitType.createGroupedUnchecked(inputType), - DataUnitType.createBasicUnchecked(inputType) - ); + DataUnitType.createBasicUnchecked(inputType)); } + /** + * This is function is not built to last. It is thought to help out devising + * programs while we are still figuring + * out how to express functions in a platform-independent way. + * + * @return a function that can perform the reduce + */ + public String getSqlImplementation() { + return this.sqlImplementation; + } /** - * This is function is not built to last. It is thought to help out devising programs while we are still figuring + * This is function is not built to last. It is thought to help out devising + * programs while we are still figuring * out how to express functions in a platform-independent way. * * @return a function that can perform the reduce */ + public ReduceDescriptor withSqlImplementation(final String sqlImplementation) { + this.sqlImplementation = sqlImplementation; + return this; + } + + /** + * This is function is not built to last. It is thought to help out devising + * programs while we are still figuring out how to express functions in a + * platform-independent way. + * + * @return a function that can perform the reduce + */ public BinaryOperator getJavaImplementation() { return this.javaImplementation; } /** - * In generic code, we do not have the type parameter values of operators, functions etc. This method avoids casting issues. + * In generic code, we do not have the type parameter values of operators, + * functions etc. This method avoids casting issues. * * @return this instance with type parameters set to {@link Object} */ diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java index c193704a3..4f0d196b8 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java @@ -18,12 +18,21 @@ package org.apache.wayang.jdbc.execution; -import org.apache.wayang.basic.channels.FileChannel; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.wayang.basic.data.Tuple2; -import org.apache.wayang.basic.operators.SpatialFilterOperator; -import org.apache.wayang.basic.operators.SpatialJoinOperator; import org.apache.wayang.basic.operators.FilterOperator; import org.apache.wayang.basic.operators.JoinOperator; +import org.apache.wayang.basic.operators.SpatialFilterOperator; +import org.apache.wayang.basic.operators.SpatialJoinOperator; import org.apache.wayang.basic.operators.TableSource; import org.apache.wayang.core.api.Job; import org.apache.wayang.core.api.exception.WayangException; @@ -31,89 +40,163 @@ import org.apache.wayang.core.plan.executionplan.Channel; import org.apache.wayang.core.plan.executionplan.ExecutionStage; import org.apache.wayang.core.plan.executionplan.ExecutionTask; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; import org.apache.wayang.core.platform.ExecutionState; import org.apache.wayang.core.platform.Executor; import org.apache.wayang.core.platform.ExecutorTemplate; import org.apache.wayang.core.platform.Platform; import org.apache.wayang.core.util.WayangCollections; -import org.apache.wayang.core.util.fs.FileSystem; -import org.apache.wayang.core.util.fs.FileSystems; import org.apache.wayang.jdbc.channels.SqlQueryChannel; import org.apache.wayang.jdbc.compiler.FunctionCompiler; import org.apache.wayang.jdbc.operators.JdbcExecutionOperator; import org.apache.wayang.jdbc.operators.JdbcFilterOperator; +import org.apache.wayang.jdbc.operators.JdbcGlobalReduceOperator; import org.apache.wayang.jdbc.operators.JdbcJoinOperator; import org.apache.wayang.jdbc.operators.JdbcProjectionOperator; +import org.apache.wayang.jdbc.operators.JdbcReduceByOperator; +import org.apache.wayang.jdbc.operators.JdbcSortOperator; import org.apache.wayang.jdbc.operators.JdbcTableSinkOperator; import org.apache.wayang.jdbc.operators.JdbcTableSource; import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.UncheckedIOException; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Set; -import java.util.stream.Collectors; /** * {@link Executor} implementation for the {@link JdbcPlatformTemplate}. */ public class JdbcExecutor extends ExecutorTemplate { + public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp, + final Collection filterTasks, final JdbcProjectionOperator projectionTask, final JdbcGlobalReduceOperator globalReduceTask, final JdbcReduceByOperator reduceByTask, final JdbcSortOperator sortTask, + final Collection joinTasks) { + final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); + final Collection conditions = filterTasks.stream() + .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) + .collect(Collectors.toList()); + final Collection joins = joinTasks.stream() + .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) + .collect(Collectors.toList()); + + final String selectClause; + if (globalReduceTask != null) { + selectClause = globalReduceTask.createSqlClause( + jdbcExecutor.connection, + jdbcExecutor.functionCompiler + ); + } else if (reduceByTask != null) { + selectClause = reduceByTask.createSqlClause( + jdbcExecutor.connection, + jdbcExecutor.functionCompiler + ); + } else if (projectionTask != null) { + selectClause = projectionTask.createSqlClause( + jdbcExecutor.connection, + jdbcExecutor.functionCompiler + ); + } else { + selectClause = "*"; + } - private final JdbcPlatformTemplate platform; + final StringBuilder sb = new StringBuilder(1000); + sb.append("SELECT ").append(selectClause).append(" FROM ").append(tableName); + if (!joins.isEmpty()) { + final String separator = " "; + for (final String join : joins) { + sb.append(separator).append(join); + } + } + if (!conditions.isEmpty()) { + sb.append(" WHERE "); + sb.append(String.join(" AND ", conditions)); + } + if (reduceByTask != null) { + sb.append(" GROUP BY " + reduceByTask.getKeyDescriptor().getSqlImplementation().getField0()); + } + if (sortTask != null) { + sb.append(sortTask.createSqlClause( + jdbcExecutor.connection, + jdbcExecutor.functionCompiler + )); + } - private final Connection connection; + sb.append(';'); + return sb; + } - private final Logger logger = LogManager.getLogger(this.getClass()); + /** + * Creates a query channel and the sql statement + * + * @param stage + * @param context + * @return a tuple containing the sql statement + */ + protected static Tuple2 createSqlQuery(final ExecutionStage stage, + final OptimizationContext context, final JdbcExecutor jdbcExecutor) { + final Collection startTasks = stage.getStartTasks(); - private final FunctionCompiler functionCompiler = new FunctionCompiler(); + // Verify that we can handle this instance. + final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; + assert startTask.getOperator() instanceof TableSource + : "Invalid JDBC stage: Start task has to be a TableSource"; - public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) { - super(job.getCrossPlatformExecutor()); - this.platform = platform; - this.connection = this.platform.createDatabaseDescriptor(job.getConfiguration()).createJdbcConnection(); - } + // Extract the different types of ExecutionOperators from the stage. + final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator(); + SqlQueryChannel.Instance tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(startTask, context, + jdbcExecutor); + final Collection filterTasks = new ArrayList<>(4); + JdbcProjectionOperator projectionTask = null; + JdbcGlobalReduceOperator globalReduceTask = null; + JdbcReduceByOperator reduceByTask = null; + JdbcSortOperator sortTask = null; + final Collection joinTasks = new ArrayList<>(); + final Set allTasks = stage.getAllTasks(); + assert allTasks.size() <= 3; + ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage); + while (nextTask != null) { + // Evaluate the nextTask. + final ExecutionOperator operator = nextTask.getOperator(); + if (operator instanceof FilterOperator || operator instanceof SpatialFilterOperator) { + filterTasks.add((JdbcExecutionOperator) operator); + } else if (operator instanceof JdbcProjectionOperator) { + assert projectionTask == null; // Allow one projection operator per stage for now. + projectionTask = (JdbcProjectionOperator) operator; + } else if (operator instanceof final JdbcGlobalReduceOperator globalReduce) { + assert globalReduceTask == null; // Allow one projection operator per stage for now. + globalReduceTask = globalReduce; + } else if (operator instanceof final JdbcReduceByOperator reduceBy) { + assert reduceByTask == null; // Allow one projection operator per stage for now. + reduceByTask = reduceBy; + } else if (operator instanceof final JdbcSortOperator sort) { + assert sortTask == null; // Allow one projection operator per stage for now. + sortTask = sort; + } else if (operator instanceof JoinOperator || (operator instanceof SpatialJoinOperator)) { + joinTasks.add((JdbcExecutionOperator) operator); + } else { + throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); + } - @Override - public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, final ExecutionState executionState) { - // Check if this stage ends with a sink operator - final Collection termTasks = stage.getTerminalTasks(); - assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported."; - final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; + // Move the tipChannelInstance. + tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, + jdbcExecutor); - if (termTask.getOperator() instanceof JdbcTableSinkOperator) { - // If it is a sink stage: compose and execute SQL directly within the database - JdbcExecutor.executeSinkStage(stage, optimizationContext, this); - } else { - //If it is normal stage: compose SQL and store in channel for downstream consumption - final Tuple2 pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this); - final String query = pair.field0; - final SqlQueryChannel.Instance queryChannel = pair.field1; - queryChannel.setSqlQuery(query); - executionState.register(queryChannel); + // Go to the next nextTask. + nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage); } + + // Create the SQL query. + final StringBuilder query = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, globalReduceTask, reduceByTask, sortTask, joinTasks); + + return new Tuple2<>(query.toString(), tipChannelInstance); } /** * Handles execution stages that end with a {@link JdbcTableSinkOperator}. - * Composes a SQL query from the stage's operators and executes it directly - * on the database connection, keeping all data within the database. + * Composes a SQL query from the stage's operators and executes it directly on + * the database connection, keeping all data within the database. * * @param stage the execution stage ending with a sink * @param optimizationContext provides optimization information * @param jdbcExecutor the executor with the database connection */ - private static void executeSinkStage(final ExecutionStage stage, - final OptimizationContext optimizationContext, - final JdbcExecutor jdbcExecutor) { + private static void executeSinkStage(final ExecutionStage stage, final OptimizationContext optimizationContext, + final JdbcExecutor jdbcExecutor) { final Collection startTasks = stage.getStartTasks(); final Collection termTasks = stage.getTerminalTasks(); @@ -138,10 +221,10 @@ private static void executeSinkStage(final ExecutionStage stage, while (nextTask != null && !(nextTask.getOperator() instanceof JdbcTableSinkOperator)) { if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) { filterTasks.add(filterOperator); - } else if (nextTask.getOperator() instanceof JdbcProjectionOperator projectionOperator) { + } else if (nextTask.getOperator() instanceof final JdbcProjectionOperator projectionOperator) { assert projectionTask == null; projectionTask = projectionOperator; - } else if (nextTask.getOperator() instanceof JdbcJoinOperator joinOperator) { + } else if (nextTask.getOperator() instanceof final JdbcJoinOperator joinOperator) { joinTasks.add(joinOperator); } else { throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); @@ -150,7 +233,8 @@ private static void executeSinkStage(final ExecutionStage stage, } // Compose the SELECT query - final StringBuilder selectQuery = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks); + final StringBuilder selectQuery = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, null, null, null, + joinTasks); // Remove trailing semicolon from SELECT String selectSql = selectQuery.toString(); @@ -167,30 +251,34 @@ private static void executeSinkStage(final ExecutionStage stage, if ("overwrite".equals(sinkOp.getMode())) { stmt.execute("DROP TABLE IF EXISTS " + sinkOp.getTableName()); } - // Execute the composed query: CREATE TABLE x AS SELECT ... or INSERT INTO x SELECT ... + // Execute the composed query: CREATE TABLE x AS SELECT ... or INSERT INTO x + // SELECT ... final String fullSql = sinkClause + " " + selectSql + sinkOp.createSqlSuffix(); stmt.execute(fullSql); jdbcExecutor.logger.info("Executed SQL sink: {}", fullSql); - } catch (SQLException e) { + System.out.println("Executed sql sink: " + fullSql); + } catch (final SQLException e) { throw new WayangException("Failed to execute SQL sink on table: " + sinkOp.getTableName(), e); } } /** * Retrieves the follow-up {@link ExecutionTask} of the given {@code task} - * unless it is not comprising a - * {@link JdbcExecutionOperator} and/or not in the given {@link ExecutionStage}. + * unless it is not comprising a {@link JdbcExecutionOperator} and/or not in the + * given {@link ExecutionStage}. * * @param task whose follow-up {@link ExecutionTask} is requested; should have * a single follower * @param stage in which the follow-up {@link ExecutionTask} should be * @return the said follow-up {@link ExecutionTask} or {@code null} if none */ - private static ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, final ExecutionStage stage) { + private static ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, + final ExecutionStage stage) { assert task.getNumOuputChannels() == 1; final Channel outputChannel = task.getOutputChannel(0); final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers()); - return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? consumer + return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator + ? consumer : null; } @@ -228,95 +316,48 @@ private static SqlQueryChannel.Instance instantiateOutboundChannel(final Executi * @return the {@link SqlQueryChannel.Instance} */ private static SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task, - final OptimizationContext optimizationContext, - final SqlQueryChannel.Instance predecessorChannelInstance, final JdbcExecutor jdbcExecutor) { - final SqlQueryChannel.Instance newInstance = JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, jdbcExecutor); + final OptimizationContext optimizationContext, final SqlQueryChannel.Instance predecessorChannelInstance, + final JdbcExecutor jdbcExecutor) { + final SqlQueryChannel.Instance newInstance = JdbcExecutor.instantiateOutboundChannel(task, optimizationContext, + jdbcExecutor); newInstance.getLineage().addPredecessor(predecessorChannelInstance.getLineage()); return newInstance; } - /** - * Creates a query channel and the sql statement - * - * @param stage - * @param context - * @return a tuple containing the sql statement - */ - protected static Tuple2 createSqlQuery(final ExecutionStage stage, - final OptimizationContext context, final JdbcExecutor jdbcExecutor) { - final Collection startTasks = stage.getStartTasks(); - final Collection termTasks = stage.getTerminalTasks(); - - // Verify that we can handle this instance. - assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported"; - final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; - assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported."; - final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; - assert startTask.getOperator() instanceof TableSource - : "Invalid JDBC stage: Start task has to be a TableSource"; + private final JdbcPlatformTemplate platform; - // Extract the different types of ExecutionOperators from the stage. - final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator(); - SqlQueryChannel.Instance tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(startTask, context, jdbcExecutor); - final Collection filterTasks = new ArrayList<>(4); - JdbcProjectionOperator projectionTask = null; - final Collection joinTasks = new ArrayList<>(); - final Set allTasks = stage.getAllTasks(); - assert allTasks.size() <= 3; - ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage); - while (nextTask != null) { - // Evaluate the nextTask. - final var operator = nextTask.getOperator(); - if (operator instanceof FilterOperator || operator instanceof SpatialFilterOperator) { - filterTasks.add((JdbcExecutionOperator) operator); - } else if (operator instanceof JdbcProjectionOperator) { - assert projectionTask == null; // Allow one projection operator per stage for now. - projectionTask = (JdbcProjectionOperator) operator; - } else if (operator instanceof JoinOperator || (operator instanceof SpatialJoinOperator)) { - joinTasks.add((JdbcExecutionOperator) operator); - } else { - throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); - } + private final Connection connection; - // Move the tipChannelInstance. - tipChannelInstance = JdbcExecutor.instantiateOutboundChannel(nextTask, context, tipChannelInstance, jdbcExecutor); + private final Logger logger = LogManager.getLogger(this.getClass()); - // Go to the next nextTask. - nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage); - } + private final FunctionCompiler functionCompiler = new FunctionCompiler(); - // Create the SQL query. - final StringBuilder query = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks); - return new Tuple2<>(query.toString(), tipChannelInstance); + public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) { + super(job.getCrossPlatformExecutor()); + this.platform = platform; + this.connection = this.platform.createDatabaseDescriptor(job.getConfiguration()).createJdbcConnection(); } - public static StringBuilder createSqlString(final JdbcExecutor jdbcExecutor, final JdbcTableSource tableOp, - final Collection filterTasks, - JdbcProjectionOperator projectionTask, - final Collection joinTasks) { - final String tableName = tableOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); - final Collection conditions = filterTasks.stream() - .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) - .collect(Collectors.toList()); - final String projection = projectionTask == null ? "*" : projectionTask.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler); - final Collection joins = joinTasks.stream() - .map(op -> op.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler)) - .collect(Collectors.toList()); + @Override + public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, + final ExecutionState executionState) { + // Check if this stage ends with a sink operator + final Collection termTasks = stage.getTerminalTasks(); + assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported."; + final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; - final StringBuilder sb = new StringBuilder(1000); - sb.append("SELECT ").append(projection).append(" FROM ").append(tableName); - if (!joins.isEmpty()) { - final String separator = " "; - for (final String join : joins) { - sb.append(separator).append(join); - } - } - if (!conditions.isEmpty()) { - sb.append(" WHERE "); - sb.append(String.join(" AND ", conditions)); + if (termTask.getOperator() instanceof JdbcTableSinkOperator) { + JdbcExecutor.executeSinkStage(stage, optimizationContext, this); + } else { + // If it is normal stage: compose SQL and store in channel for downstream + // consumption + final Tuple2 pair = JdbcExecutor.createSqlQuery(stage, + optimizationContext, this); + final String query = pair.field0; + final SqlQueryChannel.Instance queryChannel = pair.field1; + queryChannel.setSqlQuery(query); + executionState.register(queryChannel); } - sb.append(';'); - return sb; } @Override @@ -332,28 +373,4 @@ public void dispose() { public Platform getPlatform() { return this.platform; } - - private void saveResult(final FileChannel.Instance outputFileChannelInstance, final ResultSet rs) - throws IOException, SQLException { - // Output results. - final FileSystem outFs = FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get(); - try (final OutputStreamWriter writer = new OutputStreamWriter( - outFs.create(outputFileChannelInstance.getSinglePath()))) { - while (rs.next()) { - // System.out.println(rs.getInt(1) + " " + rs.getString(2)); - final ResultSetMetaData rsmd = rs.getMetaData(); - for (int i = 1; i <= rsmd.getColumnCount(); i++) { - writer.write(rs.getString(i)); - if (i < rsmd.getColumnCount()) { - writer.write('\t'); - } - } - if (!rs.isLast()) { - writer.write('\n'); - } - } - } catch (final UncheckedIOException e) { - throw e.getCause(); - } - } } diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperator.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperator.java new file mode 100644 index 000000000..990290a9b --- /dev/null +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.jdbc.operators; + +import java.sql.Connection; +import java.util.Optional; + +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.jdbc.compiler.FunctionCompiler; + +public abstract class JdbcGlobalReduceOperator extends GlobalReduceOperator + implements JdbcExecutionOperator { + + public JdbcGlobalReduceOperator(final GlobalReduceOperator globalReduceOperator) { + super(globalReduceOperator); + } + + public JdbcGlobalReduceOperator(final ReduceDescriptor reduceDescriptor) { + super(reduceDescriptor, DataSetType.createDefault(Record.class)); + } + + @Override + public String getLoadProfileEstimatorConfigurationKey() { + return String.format("wayang.%s.globalreduce.load", this.getPlatform().getPlatformId()); + } + + @Override + public String createSqlClause(final Connection connection, final FunctionCompiler compiler) { + return reduceDescriptor.getSqlImplementation(); + } + + @Override + public Optional createLoadProfileEstimator(final Configuration configuration) { + final Optional optEstimator = JdbcExecutionOperator.super.createLoadProfileEstimator( + configuration); + LoadProfileEstimators.nestUdfEstimator(optEstimator, this.reduceDescriptor, configuration); + return optEstimator; + } +} diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperator.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperator.java new file mode 100644 index 000000000..5b06e3fe5 --- /dev/null +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.jdbc.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.ReduceByOperator; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.jdbc.compiler.FunctionCompiler; + +import java.sql.Connection; +import java.util.Optional; + +public abstract class JdbcReduceByOperator extends ReduceByOperator implements JdbcExecutionOperator { + public JdbcReduceByOperator(final TransformationDescriptor keyDescriptor, + final ReduceDescriptor reduceDescriptor) { + super(keyDescriptor, reduceDescriptor, DataSetType.createDefault(Record.class)); + } + + public JdbcReduceByOperator(final ReduceByOperator that) { + super(that); + } + + @Override + public String getLoadProfileEstimatorConfigurationKey() { + return String.format("wayang.%s.reduceby.load", this.getPlatform().getPlatformId()); + } + + @Override + public String createSqlClause(final Connection connection, final FunctionCompiler compiler) { + return keyDescriptor.getSqlImplementation().getField0() + "," + reduceDescriptor.getSqlImplementation(); + } + + @Override + public Optional createLoadProfileEstimator(final Configuration configuration) { + final Optional optEstimator = JdbcExecutionOperator.super.createLoadProfileEstimator( + configuration); + LoadProfileEstimators.nestUdfEstimator(optEstimator, this.reduceDescriptor, configuration); + return optEstimator; + } +} diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcSortOperator.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcSortOperator.java new file mode 100644 index 000000000..84f4996e4 --- /dev/null +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/operators/JdbcSortOperator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.jdbc.operators; + +import java.sql.Connection; +import java.util.Optional; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.SortOperator; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; +import org.apache.wayang.jdbc.compiler.FunctionCompiler; + + +public abstract class JdbcSortOperator extends SortOperator implements JdbcExecutionOperator { + public JdbcSortOperator(final TransformationDescriptor keyDescriptor) { + super(keyDescriptor); + } + + public JdbcSortOperator(final SortOperator that) { + super(that); + } + + @Override + public String getLoadProfileEstimatorConfigurationKey() { + return String.format("wayang.%s.sort.load", this.getPlatform().getPlatformId()); + } + + @Override + public String createSqlClause(final Connection connection, final FunctionCompiler compiler) { + return " ORDER BY " + keyDescriptor.getSqlImplementation().field0 + " " + keyDescriptor.getSqlImplementation().field1; + } + + @Override + public Optional createLoadProfileEstimator(final Configuration configuration) { + final Optional optEstimator = + JdbcExecutionOperator.super.createLoadProfileEstimator(configuration); + LoadProfileEstimators.nestUdfEstimator(optEstimator, this.getKeyDescriptor(), configuration); + return optEstimator; + } +} diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java new file mode 100644 index 000000000..a14598ac1 --- /dev/null +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcGlobalReduceOperatorTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.jdbc.operators; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; + +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.Job; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.core.optimizer.DefaultOptimizationContext; +import org.apache.wayang.core.plan.executionplan.ExecutionStage; +import org.apache.wayang.core.plan.executionplan.ExecutionTask; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.CrossPlatformExecutor; +import org.apache.wayang.core.profiling.NoInstrumentationStrategy; +import org.apache.wayang.jdbc.channels.SqlQueryChannel; +import org.apache.wayang.jdbc.execution.JdbcExecutor; +import org.apache.wayang.jdbc.test.HsqldbGlobalReduceOperator; +import org.apache.wayang.jdbc.test.HsqldbPlatform; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.jdbc.test.HsqldbTableSource; +import org.junit.jupiter.api.Test; + +/** + * Test suite for {@link SqlToStreamOperator}. + */ +class JdbcGlobalReduceOperatorTest extends OperatorTestBase { + @Test + void testWithHsqldb() throws SQLException { + final Configuration configuration = new Configuration(); + + final Job job = mock(Job.class); + when(job.getConfiguration()).thenReturn(configuration); + when(job.getCrossPlatformExecutor()) + .thenReturn(new CrossPlatformExecutor(job, new NoInstrumentationStrategy())); + final SqlQueryChannel.Descriptor sqlChannelDescriptor = HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor(); + + final ExecutionStage sqlStage = mock(ExecutionStage.class); + + final JdbcTableSource tableSourceA = new HsqldbTableSource("testA"); + + final ExecutionTask tableSourceATask = new ExecutionTask(tableSourceA); + tableSourceATask.setOutputChannel(0, new SqlQueryChannel(sqlChannelDescriptor, tableSourceA.getOutput(0))); + tableSourceATask.setStage(sqlStage); + + final ExecutionOperator globalReduceOperator = new HsqldbGlobalReduceOperator( + new ReduceDescriptor(null, Record.class).withSqlImplementation("COUNT(*)")); + + final ExecutionTask globalReduceTask = new ExecutionTask(globalReduceOperator); + tableSourceATask.getOutputChannel(0).addConsumer(globalReduceTask, 0); + globalReduceTask.setOutputChannel(0, + new SqlQueryChannel(sqlChannelDescriptor, globalReduceOperator.getOutput(0))); + globalReduceTask.setStage(sqlStage); + + when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceATask)); + when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(globalReduceTask)); + + final ExecutionStage nextStage = mock(ExecutionStage.class); + + final SqlToStreamOperator sqlToStreamOperator = new SqlToStreamOperator(HsqldbPlatform.getInstance()); + final ExecutionTask sqlToStreamTask = new ExecutionTask(sqlToStreamOperator); + globalReduceTask.getOutputChannel(0).addConsumer(sqlToStreamTask, 0); + sqlToStreamTask.setStage(nextStage); + + final JdbcExecutor executor = new JdbcExecutor(HsqldbPlatform.getInstance(), job); + executor.execute(sqlStage, new DefaultOptimizationContext(job), job.getCrossPlatformExecutor()); + + final SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor() + .getChannelInstance(sqlToStreamTask.getInputChannel(0)); + + final HsqldbPlatform hsqldbPlatform = new HsqldbPlatform(); + + try (Connection jdbcConnection = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) { + final Statement statement = jdbcConnection.createStatement(); + final java.sql.ResultSet resultSet = statement.executeQuery(sqlQueryChannelInstance.getSqlQuery()); + resultSet.next(); + final int count = resultSet.getInt(1); + + assertTrue(count > 0); + } + + assertEquals("SELECT COUNT(*) FROM testA;", sqlQueryChannelInstance.getSqlQuery()); + } +} diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java new file mode 100644 index 000000000..f00f4020e --- /dev/null +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcReduceByOperatorTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.jdbc.operators; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.SQLException; +import java.util.Collections; + +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.Job; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.optimizer.DefaultOptimizationContext; +import org.apache.wayang.core.plan.executionplan.ExecutionStage; +import org.apache.wayang.core.plan.executionplan.ExecutionTask; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.CrossPlatformExecutor; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.profiling.NoInstrumentationStrategy; +import org.apache.wayang.jdbc.channels.SqlQueryChannel; +import org.apache.wayang.jdbc.execution.JdbcExecutor; +import org.apache.wayang.jdbc.test.HsqldbPlatform; +import org.apache.wayang.jdbc.test.HsqldbReduceByOperator; +import org.apache.wayang.jdbc.test.HsqldbTableSource; +import org.junit.jupiter.api.Test; + +/** + * Test suite for {@link SqlToStreamOperator}. + */ +public class JdbcReduceByOperatorTest extends OperatorTestBase { + @Test + void testWithHsqldb() throws SQLException { + final Configuration configuration = new Configuration(); + + final Job job = mock(Job.class); + when(job.getConfiguration()).thenReturn(configuration); + when(job.getCrossPlatformExecutor()) + .thenReturn(new CrossPlatformExecutor(job, new NoInstrumentationStrategy())); + final SqlQueryChannel.Descriptor sqlChannelDescriptor = HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor(); + + final ExecutionStage sqlStage = mock(ExecutionStage.class); + + final JdbcTableSource tableSourceA = new HsqldbTableSource("testA"); + + final ExecutionTask tableSourceATask = new ExecutionTask(tableSourceA); + tableSourceATask.setOutputChannel(0, new SqlQueryChannel(sqlChannelDescriptor, tableSourceA.getOutput(0))); + tableSourceATask.setStage(sqlStage); + + final TransformationDescriptor keyDescriptor = new TransformationDescriptor().withSqlImplementation("col0", "col0"); + final ReduceDescriptor reduceDescriptor = new ReduceDescriptor(null, Record.class).withSqlImplementation("COUNT(*)"); + final ExecutionOperator reducyByOperator = new HsqldbReduceByOperator(keyDescriptor, reduceDescriptor); + + final ExecutionTask reducyByTask = new ExecutionTask(reducyByOperator); + tableSourceATask.getOutputChannel(0).addConsumer(reducyByTask, 0); + reducyByTask.setOutputChannel(0, + new SqlQueryChannel(sqlChannelDescriptor, reducyByOperator.getOutput(0))); + reducyByTask.setStage(sqlStage); + + when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceATask)); + when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(reducyByTask)); + + final ExecutionStage nextStage = mock(ExecutionStage.class); + + final SqlToStreamOperator sqlToStreamOperator = new SqlToStreamOperator(HsqldbPlatform.getInstance()); + final ExecutionTask sqlToStreamTask = new ExecutionTask(sqlToStreamOperator); + reducyByTask.getOutputChannel(0).addConsumer(sqlToStreamTask, 0); + sqlToStreamTask.setStage(nextStage); + + final JdbcExecutor executor = new JdbcExecutor(HsqldbPlatform.getInstance(), job); + executor.execute(sqlStage, new DefaultOptimizationContext(job), job.getCrossPlatformExecutor()); + + final SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor() + .getChannelInstance(sqlToStreamTask.getInputChannel(0)); + + assertEquals("SELECT col0,COUNT(*) FROM testA GROUP BY col0;", sqlQueryChannelInstance.getSqlQuery()); + } +} diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java new file mode 100644 index 000000000..118fb7efa --- /dev/null +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/operators/JdbcSortOperatorTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.jdbc.operators; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.SQLException; +import java.util.Collections; + +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.Job; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.optimizer.DefaultOptimizationContext; +import org.apache.wayang.core.plan.executionplan.ExecutionStage; +import org.apache.wayang.core.plan.executionplan.ExecutionTask; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.CrossPlatformExecutor; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.profiling.NoInstrumentationStrategy; +import org.apache.wayang.jdbc.channels.SqlQueryChannel; +import org.apache.wayang.jdbc.execution.JdbcExecutor; +import org.apache.wayang.jdbc.test.HsqldbPlatform; +import org.apache.wayang.jdbc.test.HsqldbSortOperator; +import org.apache.wayang.jdbc.test.HsqldbTableSource; +import org.junit.jupiter.api.Test; + +public class JdbcSortOperatorTest extends OperatorTestBase { + @Test + void testWithHsqldb() throws SQLException { + final Configuration configuration = new Configuration(); + + final Job job = mock(Job.class); + when(job.getConfiguration()).thenReturn(configuration); + when(job.getCrossPlatformExecutor()) + .thenReturn(new CrossPlatformExecutor(job, new NoInstrumentationStrategy())); + final SqlQueryChannel.Descriptor sqlChannelDescriptor = HsqldbPlatform.getInstance().getSqlQueryChannelDescriptor(); + + final ExecutionStage sqlStage = mock(ExecutionStage.class); + + final JdbcTableSource tableSourceA = new HsqldbTableSource("testA"); + + final ExecutionTask tableSourceATask = new ExecutionTask(tableSourceA); + tableSourceATask.setOutputChannel(0, new SqlQueryChannel(sqlChannelDescriptor, tableSourceA.getOutput(0))); + tableSourceATask.setStage(sqlStage); + + final TransformationDescriptor keyDescriptor = new TransformationDescriptor().withSqlImplementation("col0","DESC"); + final ExecutionOperator sortOperator = new HsqldbSortOperator(keyDescriptor); + + final ExecutionTask sortTask = new ExecutionTask(sortOperator); + tableSourceATask.getOutputChannel(0).addConsumer(sortTask, 0); + sortTask.setOutputChannel(0, + new SqlQueryChannel(sqlChannelDescriptor, sortOperator.getOutput(0))); + sortTask.setStage(sqlStage); + + when(sqlStage.getStartTasks()).thenReturn(Collections.singleton(tableSourceATask)); + when(sqlStage.getTerminalTasks()).thenReturn(Collections.singleton(sortTask)); + + final ExecutionStage nextStage = mock(ExecutionStage.class); + + final SqlToStreamOperator sqlToStreamOperator = new SqlToStreamOperator(HsqldbPlatform.getInstance()); + final ExecutionTask sqlToStreamTask = new ExecutionTask(sqlToStreamOperator); + sortTask.getOutputChannel(0).addConsumer(sqlToStreamTask, 0); + sqlToStreamTask.setStage(nextStage); + + final JdbcExecutor executor = new JdbcExecutor(HsqldbPlatform.getInstance(), job); + executor.execute(sqlStage, new DefaultOptimizationContext(job), job.getCrossPlatformExecutor()); + + final SqlQueryChannel.Instance sqlQueryChannelInstance = (SqlQueryChannel.Instance) job.getCrossPlatformExecutor() + .getChannelInstance(sqlToStreamTask.getInputChannel(0)); + + assertEquals("SELECT * FROM testA ORDER BY col0 DESC;", sqlQueryChannelInstance.getSqlQuery()); + } +} diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbGlobalReduceOperator.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbGlobalReduceOperator.java new file mode 100644 index 000000000..732d4b334 --- /dev/null +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbGlobalReduceOperator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.jdbc.test; + +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.jdbc.operators.JdbcGlobalReduceOperator; +import org.apache.wayang.basic.data.Record; + +/** + * Test implementation of {@link JdbcGlobalReduceOperator}. + */ +public class HsqldbGlobalReduceOperator extends JdbcGlobalReduceOperator { + + public HsqldbGlobalReduceOperator(ReduceDescriptor reduceDescriptor) { + super(reduceDescriptor); + } + + @Override + public HsqldbPlatform getPlatform() { + return HsqldbPlatform.getInstance(); + } +} diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbJoinOperator.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbJoinOperator.java index 593647770..181b2d24c 100644 --- a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbJoinOperator.java +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbJoinOperator.java @@ -18,11 +18,8 @@ package org.apache.wayang.jdbc.test; -import org.apache.wayang.core.platform.ChannelDescriptor; import org.apache.wayang.basic.data.Record; -import org.apache.wayang.jdbc.test.HsqldbPlatform; import org.apache.wayang.jdbc.operators.JdbcJoinOperator; -import org.apache.wayang.core.types.DataSetType; import org.apache.wayang.core.function.TransformationDescriptor; /** diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbReduceByOperator.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbReduceByOperator.java new file mode 100644 index 000000000..20be563f8 --- /dev/null +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbReduceByOperator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.jdbc.test; + +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.jdbc.operators.JdbcReduceByOperator; +import org.apache.wayang.basic.data.Record; + +/** + * Test implementation of {@link JdbcReduceByOperator}. + */ +public class HsqldbReduceByOperator extends JdbcReduceByOperator { + public HsqldbReduceByOperator(final TransformationDescriptor keyDescriptor, + final ReduceDescriptor reduceDescriptor) { + super(keyDescriptor, reduceDescriptor); + } + + @Override + public HsqldbPlatform getPlatform() { + return HsqldbPlatform.getInstance(); + } +} diff --git a/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbSortOperator.java b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbSortOperator.java new file mode 100644 index 000000000..566f97120 --- /dev/null +++ b/wayang-platforms/wayang-jdbc-template/src/test/java/org/apache/wayang/jdbc/test/HsqldbSortOperator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.jdbc.test; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.jdbc.operators.JdbcFilterOperator; +import org.apache.wayang.jdbc.operators.JdbcSortOperator; +import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate; + +/** + * Test implementation of {@link JdbcFilterOperator}. + */ + +public class HsqldbSortOperator extends JdbcSortOperator { + public HsqldbSortOperator(final TransformationDescriptor keyDescriptor) { + super(keyDescriptor); + } + + @Override + public JdbcPlatformTemplate getPlatform() { + return HsqldbPlatform.getInstance(); + } + +} diff --git a/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/mapping/GlobalReduceMapping.java b/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/mapping/GlobalReduceMapping.java new file mode 100644 index 000000000..158f7d0ff --- /dev/null +++ b/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/mapping/GlobalReduceMapping.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.postgres.mapping; + +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.postgres.operators.PostgresGlobalReduceOperator; +import org.apache.wayang.postgres.platform.PostgresPlatform; + +import java.util.Collection; +import java.util.Collections; + +/** + * Mapping from {@link GlobalReduceOperator} to {@link PostgresGlobalReduceOperator}. + */ +public class GlobalReduceMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + PostgresPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern> operatorPattern = new OperatorPattern<>( + "reduce", new GlobalReduceOperator(null, DataSetType.createDefault(Record.class)), false) + .withAdditionalTest(op -> op.getReduceDescriptor().getSqlImplementation() != null); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators>( + (matchedOperator, epoch) -> new PostgresGlobalReduceOperator(matchedOperator).at(epoch) + ); + } +} \ No newline at end of file diff --git a/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/operators/PostgresGlobalReduceOperator.java b/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/operators/PostgresGlobalReduceOperator.java new file mode 100644 index 000000000..4a2724eaa --- /dev/null +++ b/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/operators/PostgresGlobalReduceOperator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.postgres.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.GlobalReduceOperator; +import org.apache.wayang.core.function.ReduceDescriptor; +import org.apache.wayang.jdbc.operators.JdbcGlobalReduceOperator; + +public class PostgresGlobalReduceOperator extends JdbcGlobalReduceOperator implements PostgresExecutionOperator { + public PostgresGlobalReduceOperator(final ReduceDescriptor reduceDescriptor) { + super(reduceDescriptor); + } + + public PostgresGlobalReduceOperator(GlobalReduceOperator that) { + super(that); + } + + @Override + protected PostgresGlobalReduceOperator createCopy() { + return new PostgresGlobalReduceOperator(this); + } +} diff --git a/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/operators/PostgresJoinOperator.java b/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/operators/PostgresJoinOperator.java index d4589ca6b..19ec9e8eb 100644 --- a/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/operators/PostgresJoinOperator.java +++ b/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/operators/PostgresJoinOperator.java @@ -19,7 +19,6 @@ package org.apache.wayang.postgres.operators; import org.apache.wayang.basic.data.Record; -import org.apache.wayang.basic.operators.FilterOperator; import org.apache.wayang.basic.operators.JoinOperator; import org.apache.wayang.core.function.TransformationDescriptor; import org.apache.wayang.jdbc.operators.JdbcJoinOperator;