Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.wayang.api.sql.calcite.converter;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.calcite.rel.core.AggregateCall;

import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.wayang.api.sql.calcite.converter.functions.AggregateAddCols;
import org.apache.wayang.api.sql.calcite.converter.functions.AggregateFunction;
import org.apache.wayang.api.sql.calcite.converter.functions.AggregateKeyExtractor;
Expand Down Expand Up @@ -57,15 +59,40 @@ Operator visit(final WayangAggregate wayangRelNode) {
Record.class);
childOp.connectTo(0, mapOperator, 0);

final Operator aggregateOperator = wayangRelNode.getGroupCount() > 0 ? new ReduceByOperator<>(
new TransformationDescriptor<>(new AggregateKeyExtractor(groupingFields), Record.class, Object.class),
new ReduceDescriptor<>(new AggregateFunction(aggregateCalls),
DataUnitType.createGrouped(Record.class),
DataUnitType.createBasicUnchecked(Record.class)))
: new GlobalReduceOperator<>(
new ReduceDescriptor<>(new AggregateFunction(aggregateCalls),
DataUnitType.createGrouped(Record.class),
DataUnitType.createBasicUnchecked(Record.class)));

final Operator aggregateOperator;

if (wayangRelNode.getGroupCount() > 0) {
aggregateOperator = new ReduceByOperator<>(
new TransformationDescriptor<>(
new AggregateKeyExtractor(groupingFields), Record.class,
Object.class),
new ReduceDescriptor<>(new AggregateFunction(aggregateCalls),
DataUnitType.createGrouped(Record.class),
DataUnitType.createBasicUnchecked(Record.class)));
} else {
final List<String> reductionFunctions = wayangRelNode.getNamedAggCalls().stream()
.map(agg -> agg.left.getAggregation().getName()).toList();

final List<String> fields = wayangRelNode.getInput().getRowType().getFieldList().stream()
.map(RelDataTypeField::getName).toList();

final List<String> aliases = wayangRelNode.getRowType().getFieldList().stream()
.map(RelDataTypeField::getName).toList();

final String[] reductionStatements = new String[reductionFunctions.size()];

for (int i = 0; i < reductionStatements.length; i++) {
reductionStatements[i] = reductionFunctions.get(i) + "(" + fields.get(i) + ") AS " + aliases.get(i);
}

aggregateOperator = new GlobalReduceOperator<>(
new ReduceDescriptor<>(new AggregateFunction(aggregateCalls),
DataUnitType.createGrouped(Record.class),
DataUnitType.createBasicUnchecked(Record.class))
.withSqlImplementation(
Arrays.stream(reductionStatements).collect(Collectors.joining(","))));
}

mapOperator.connectTo(0, aggregateOperator, 0);

Expand All @@ -74,6 +101,7 @@ Operator visit(final WayangAggregate wayangRelNode) {
Record.class,
Record.class);
aggregateOperator.connectTo(0, mapOperator2, 0);

return mapOperator2;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@

package org.apache.wayang.core.function;

import java.util.function.BinaryOperator;

import org.apache.wayang.core.optimizer.costs.LoadEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator;
import org.apache.wayang.core.types.BasicDataUnitType;
import org.apache.wayang.core.types.DataUnitGroupType;
import org.apache.wayang.core.types.DataUnitType;

import java.util.function.BinaryOperator;

/**
* This descriptor pertains to functions that take multiple data units and aggregate them into a single data unit
* by means of a tree-like fold, i.e., using a commutative, associative function..
* This descriptor pertains to functions that take multiple data units and
* aggregate them into a single data unit by means of a tree-like fold, i.e.,
* using a commutative, associative function..
*/
public class ReduceDescriptor<Type> extends FunctionDescriptor {

Expand All @@ -39,43 +40,67 @@ public class ReduceDescriptor<Type> extends FunctionDescriptor {

private final SerializableBinaryOperator<Type> javaImplementation;

public ReduceDescriptor(SerializableBinaryOperator<Type> javaImplementation,
DataUnitGroupType<Type> inputType,
BasicDataUnitType<Type> outputType) {
/**
* sql implementation of the reduce operator
*/
private String sqlImplementation;

public ReduceDescriptor(final SerializableBinaryOperator<Type> javaImplementation, final DataUnitGroupType<Type> inputType,
final BasicDataUnitType<Type> outputType) {
this(javaImplementation, inputType, outputType, new NestableLoadProfileEstimator(
LoadEstimator.createFallback(1, 1),
LoadEstimator.createFallback(1, 1)
));
LoadEstimator.createFallback(1, 1), LoadEstimator.createFallback(1, 1)));
}

public ReduceDescriptor(SerializableBinaryOperator<Type> javaImplementation,
DataUnitGroupType<Type> inputType, BasicDataUnitType<Type> outputType,
LoadProfileEstimator loadProfileEstimator) {
public ReduceDescriptor(final SerializableBinaryOperator<Type> javaImplementation, final DataUnitGroupType<Type> inputType,
final BasicDataUnitType<Type> outputType, final LoadProfileEstimator loadProfileEstimator) {
super(loadProfileEstimator);
this.inputType = inputType;
this.outputType = outputType;
this.javaImplementation = javaImplementation;
}

public ReduceDescriptor(SerializableBinaryOperator<Type> javaImplementation, Class<Type> inputType) {
public ReduceDescriptor(final SerializableBinaryOperator<Type> javaImplementation, final Class<Type> inputType) {
this(javaImplementation, DataUnitType.createGroupedUnchecked(inputType),
DataUnitType.createBasicUnchecked(inputType)
);
DataUnitType.createBasicUnchecked(inputType));
}

/**
* This is function is not built to last. It is thought to help out devising
* programs while we are still figuring
* out how to express functions in a platform-independent way.
*
* @return a function that can perform the reduce
*/
public String getSqlImplementation() {
return this.sqlImplementation;
}

/**
* This is function is not built to last. It is thought to help out devising programs while we are still figuring
* This is function is not built to last. It is thought to help out devising
* programs while we are still figuring
* out how to express functions in a platform-independent way.
*
* @return a function that can perform the reduce
*/
public ReduceDescriptor<Type> withSqlImplementation(final String sqlImplementation) {
this.sqlImplementation = sqlImplementation;
return this;
}

/**
* This is function is not built to last. It is thought to help out devising
* programs while we are still figuring out how to express functions in a
* platform-independent way.
*
* @return a function that can perform the reduce
*/
public BinaryOperator<Type> getJavaImplementation() {
return this.javaImplementation;
}

/**
* In generic code, we do not have the type parameter values of operators, functions etc. This method avoids casting issues.
* In generic code, we do not have the type parameter values of operators,
* functions etc. This method avoids casting issues.
*
* @return this instance with type parameters set to {@link Object}
*/
Expand Down
Loading
Loading