Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,21 @@
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import java.util.Optional;
import java.util.function.Consumer;
import io.serverlessworkflow.impl.executors.CallableTaskFactory;

public class JavaConsumerCallExecutorBuilder<T>
implements CallableTaskBuilder<CallJava.CallJavaConsumer<T>> {

private Consumer<T> consumer;
private Optional<Class<T>> inputClass;

public void init(
public CallableTaskFactory init(
CallJava.CallJavaConsumer<T> task,
WorkflowDefinition definition,
WorkflowMutablePosition position) {
consumer = task.consumer();
inputClass = task.inputClass();
return () -> new JavaConsumerCallExecutor<T>(task.inputClass(), task.consumer());
}

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaConsumer.class.isAssignableFrom(clazz);
}

@Override
public CallableTask build() {
return new JavaConsumerCallExecutor<T>(inputClass, consumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,26 @@
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallJava.CallJavaContextFunction;
import io.serverlessworkflow.api.types.func.ContextFunction;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import java.util.Optional;
import io.serverlessworkflow.impl.executors.CallableTaskFactory;

public class JavaContextFunctionCallExecutorBuilder<T, V>
implements CallableTaskBuilder<CallJava.CallJavaContextFunction<T, V>> {

protected ContextFunction<T, V> function;
protected Optional<Class<T>> inputClass;
protected Optional<Class<V>> outputClass;

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaContextFunction.class.isAssignableFrom(clazz);
}

@Override
public void init(
public CallableTaskFactory init(
CallJavaContextFunction<T, V> task,
WorkflowDefinition definition,
WorkflowMutablePosition position) {
this.function = task.function();
this.inputClass = task.inputClass();
this.outputClass = task.outputClass();
}

@Override
public CallableTask build() {
return new JavaContextFunctionCallExecutor<T, V>(inputClass, outputClass, function);
return () ->
new JavaContextFunctionCallExecutor<T, V>(
task.inputClass(), task.outputClass(), task.function());
Comment thread
fjtirado marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,26 @@
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallJava.CallJavaFilterFunction;
import io.serverlessworkflow.api.types.func.FilterFunction;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import java.util.Optional;
import io.serverlessworkflow.impl.executors.CallableTaskFactory;

public class JavaFilterFunctionCallExecutorBuilder<T, V>
implements CallableTaskBuilder<CallJava.CallJavaFilterFunction<T, V>> {

private FilterFunction<T, V> function;
private Optional<Class<T>> inputClass;
private Optional<Class<V>> outputClass;

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaFilterFunction.class.isAssignableFrom(clazz);
}

@Override
public void init(
public CallableTaskFactory init(
CallJavaFilterFunction<T, V> task,
WorkflowDefinition definition,
WorkflowMutablePosition position) {
this.function = task.function();
this.inputClass = task.inputClass();
this.outputClass = task.outputClass();
}

@Override
public CallableTask build() {
return new JavaFilterFunctionCallExecutor<>(inputClass, outputClass, function);
return () ->
new JavaFilterFunctionCallExecutor<>(
task.inputClass(), task.outputClass(), task.function());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,23 @@
import io.serverlessworkflow.api.types.func.CallJava.CallJavaFunction;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import java.util.Optional;
import java.util.function.Function;
import io.serverlessworkflow.impl.executors.CallableTaskFactory;

public class JavaFunctionCallExecutorBuilder<T, V>
implements CallableTaskBuilder<CallJava.CallJavaFunction<T, V>> {

protected Function<T, V> function;
protected Optional<Class<T>> inputClass;
protected Optional<Class<V>> outputClass;

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaFunction.class.isAssignableFrom(clazz);
}

@Override
public void init(
public CallableTaskFactory init(
CallJavaFunction<T, V> task,
WorkflowDefinition definition,
WorkflowMutablePosition position) {
function = task.function();
inputClass = task.inputClass();
outputClass = task.outputClass();
}

@Override
public CallableTask build() {
return new JavaFunctionCallExecutor<>(inputClass, outputClass, function);
return () ->
new JavaFunctionCallExecutor<>(task.inputClass(), task.outputClass(), task.function());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,22 @@
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallJava.CallJavaLoopFunction;
import io.serverlessworkflow.api.types.func.LoopFunction;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import io.serverlessworkflow.impl.executors.CallableTaskFactory;

public class JavaLoopFunctionCallExecutorBuilder
implements CallableTaskBuilder<CallJava.CallJavaLoopFunction> {

private LoopFunction function;
private String varName;

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaLoopFunction.class.isAssignableFrom(clazz);
}

@Override
public void init(
public CallableTaskFactory init(
CallJavaLoopFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) {
function = task.function();
varName = task.varName();
}

@Override
public CallableTask build() {
return new JavaLoopFunctionCallExecutor<>(function, varName);
return () -> new JavaLoopFunctionCallExecutor<>(task.function(), task.varName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,25 @@
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.func.CallJava;
import io.serverlessworkflow.api.types.func.CallJava.CallJavaLoopFunctionIndex;
import io.serverlessworkflow.api.types.func.LoopFunctionIndex;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.executors.CallableTask;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import io.serverlessworkflow.impl.executors.CallableTaskFactory;

public class JavaLoopFunctionIndexCallExecutorBuilder
implements CallableTaskBuilder<CallJava.CallJavaLoopFunctionIndex> {

private LoopFunctionIndex function;
private String varName;
private String indexName;

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallJava.CallJavaLoopFunctionIndex.class.isAssignableFrom(clazz);
}

@Override
public void init(
public CallableTaskFactory init(
CallJavaLoopFunctionIndex task,
WorkflowDefinition definition,
WorkflowMutablePosition position) {
function = task.function();
varName = task.varName();
indexName = task.indexName();
}

@Override
public CallableTask build() {
return new JavaLoopFunctionIndexCallExecutor<>(function, varName, indexName);
return () ->
new JavaLoopFunctionIndexCallExecutor<>(task.function(), task.varName(), task.indexName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@

public class CallFunctionExecutorBuilder implements CallableTaskBuilder<CallFunction> {

private TaskExecutorBuilder<? extends TaskBase> executorBuilder;
private WorkflowValueResolver<Map<String, Object>> args;

@Override
public void init(
public CallableTaskFactory init(
CallFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) {
String functionName = task.getCall();
Use use = definition.workflow().getUse();
Expand Down Expand Up @@ -81,14 +78,15 @@ catalogEndpoint, pathFromFunctionName(functionName.substring(0, indexOf))),
function =
definition.resourceLoader().loadURI(URI.create(functionName), h -> from(definition, h));
}
executorBuilder =
TaskExecutorBuilder<? extends TaskBase> executorBuilder =
definition.application().taskFactory().getTaskExecutor(position, function, definition);
FunctionArguments functionArgs = task.getWith();
args =
WorkflowValueResolver<?> args =
functionArgs != null
? WorkflowUtils.buildMapResolver(
definition.application(), functionArgs.getAdditionalProperties())
: (w, t, m) -> Map.of();
return () -> this.build(executorBuilder, args);
}

private String pathFromFunctionName(String functionName) {
Expand Down Expand Up @@ -122,8 +120,8 @@ public boolean accept(Class<? extends TaskBase> clazz) {
return clazz.equals(CallFunction.class);
}

@Override
public CallableTask build() {
private CallableTask build(
TaskExecutorBuilder<? extends TaskBase> executorBuilder, WorkflowValueResolver<?> args) {
TaskExecutor<? extends TaskBase> executor = executorBuilder.build();
return (w, t, m) ->
executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class CallTaskExecutor<T extends TaskBase> extends RegularTaskExecutor<T>

public static class CallTaskExecutorBuilder<T extends TaskBase>
extends RegularTaskExecutorBuilder<T> {
private CallableTaskBuilder<T> callableBuilder;
private CallableTaskFactory callableFactory;
private List<CallableTaskProxyBuilder> callableProxyBuilders;
private CallableTask callable;

Expand All @@ -44,13 +44,12 @@ protected CallTaskExecutorBuilder(
definition.application().callableProxyBuilders().stream()
.filter(t -> t.accept(task))
.toList();
callableBuilder.init(task, definition, position);
this.callableBuilder = callableBuilder;
this.callableFactory = callableBuilder.init(task, definition, position);
}

@Override
public CallTaskExecutor<T> buildInstance() {
this.callable = callableBuilder.build();
this.callable = callableFactory.get();
for (CallableTaskProxyBuilder callableBuilder : callableProxyBuilders) {
this.callable = callableBuilder.build(callable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,5 @@ public interface CallableTaskBuilder<T extends TaskBase> extends ServicePriority

boolean accept(Class<? extends TaskBase> clazz);

void init(T task, WorkflowDefinition definition, WorkflowMutablePosition position);

CallableTask build();
CallableTaskFactory init(T task, WorkflowDefinition definition, WorkflowMutablePosition position);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors;

import java.util.function.Supplier;

@FunctionalInterface
public interface CallableTaskFactory extends Supplier<CallableTask> {}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
import io.serverlessworkflow.impl.executors.TryExecutor.TryExecutorBuilder;
import io.serverlessworkflow.impl.executors.WaitExecutor.WaitExecutorBuilder;
import java.util.Collection;
import java.util.ServiceLoader;
import java.util.ServiceLoader.Provider;

Expand All @@ -45,8 +46,8 @@ public static TaskExecutorFactory get() {

protected DefaultTaskExecutorFactory() {}

private ServiceLoader<CallableTaskBuilder> callTasks =
ServiceLoader.load(CallableTaskBuilder.class);
private Collection<CallableTaskBuilder> callTasks =
ServiceLoader.load(CallableTaskBuilder.class).stream().map(Provider::get).toList();
Comment thread
fjtirado marked this conversation as resolved.

@Override
public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
Expand Down Expand Up @@ -88,10 +89,11 @@ public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
private <T extends TaskBase> CallableTaskBuilder<T> findCallTask(Class<T> clazz) {
return (CallableTaskBuilder<T>)
callTasks.stream()
.map(Provider::get)
.filter(s -> s.accept(clazz))
.findAny()
.orElseThrow(
() -> new UnsupportedOperationException(clazz.getName() + " not supported yet"));
() ->
new UnsupportedOperationException(
clazz.getName() + " not accepted by any of these builders " + callTasks));
}
}
Loading
Loading