Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.workflow.DynamicWorkflow;
import io.temporal.workflow.Functions;
import io.temporal.workflow.Functions.Func;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -91,7 +90,7 @@ public final class POJOWorkflowImplementationFactory implements ReplayWorkflowFa
workflowDefinitions = Collections.synchronizedMap(new HashMap<>());

/** Factories providing instances of workflow classes. */
private final Map<Class<?>, Functions.Func<?>> workflowInstanceFactories =
private final Map<Class<?>, Functions.Func1<EncodedValues, ?>> workflowInstanceFactories =
Collections.synchronizedMap(new HashMap<>());

/** If present then it is called for any unknown workflow type. */
Expand Down Expand Up @@ -146,15 +145,17 @@ public void registerWorkflowImplementationTypes(
*/
@SuppressWarnings("unchecked")
public <R> void addWorkflowImplementationFactory(
WorkflowImplementationOptions options, Class<R> clazz, Functions.Func<R> factory) {
WorkflowImplementationOptions options,
Class<R> clazz,
Functions.Func1<EncodedValues, R> factory) {
if (DynamicWorkflow.class.isAssignableFrom(clazz)) {
if (dynamicWorkflowImplementationFactory != null) {
throw new TypeAlreadyRegisteredException(
"DynamicWorkflow",
"An implementation of DynamicWorkflow or its factory is already registered with the worker");
}
dynamicWorkflowImplementationFactory =
(unused) -> ((Func<? extends DynamicWorkflow>) factory).apply();
(Functions.Func1<EncodedValues, ? extends DynamicWorkflow>) factory;
return;
}
workflowInstanceFactories.put(clazz, factory);
Expand Down Expand Up @@ -433,16 +434,17 @@ public WorkflowOutput execute(WorkflowInput input) {
}

protected void newInstance(Optional<Payloads> input) {
Func<?> factory = workflowInstanceFactories.get(workflowImplementationClass);
Functions.Func1<EncodedValues, ?> factory =
workflowInstanceFactories.get(workflowImplementationClass);
if (factory != null) {
workflow = factory.apply();
workflow = factory.apply(new EncodedValues(input, dataConverterWithWorkflowContext));
} else {
// Historically any exception thrown from the constructor was wrapped into Error causing a
// workflow task failure.
// This is not consistent with throwing exception from the workflow method which can
// causes a workflow failure depending on the exception type.
// To preserve backwards compatibility we only change behaviour if a constructor is
// annotated with WorkflowInit.
// annotated with @WorkflowInit.
if (ctor != null) {
try {
workflow =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.client.WorkflowClient;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.internal.activity.ActivityExecutionContextFactory;
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
import io.temporal.internal.activity.LocalActivityExecutionContextFactoryImpl;
Expand All @@ -38,6 +39,7 @@
import io.temporal.worker.tuning.SlotSupplier;
import io.temporal.worker.tuning.WorkflowSlotInfo;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.Functions.Func1;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -165,6 +167,11 @@ public void registerWorkflowImplementationTypes(

public <R> void registerWorkflowImplementationFactory(
WorkflowImplementationOptions options, Class<R> clazz, Func<R> factory) {
this.factory.addWorkflowImplementationFactory(options, clazz, unused -> factory.apply());
}

public <R> void registerWorkflowImplementationFactory(
WorkflowImplementationOptions options, Class<R> clazz, Func1<EncodedValues, R> factory) {
this.factory.addWorkflowImplementationFactory(options, clazz, factory);
}

Expand Down
10 changes: 10 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.sync.WorkflowInternal;
import io.temporal.internal.sync.WorkflowThreadExecutor;
import io.temporal.internal.worker.*;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.worker.tuning.*;
import io.temporal.workflow.Functions;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
Expand Down Expand Up @@ -325,6 +327,14 @@ public <R> void registerWorkflowImplementationFactory(
workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
}

@VisibleForTesting
public <R> void registerWorkflowImplementationFactory(
Class<R> workflowInterface,
Functions.Func1<EncodedValues, R> factory,
WorkflowImplementationOptions options) {
workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
}

/**
* Configures a factory to use when an instance of a workflow implementation is created.
*
Expand Down
6 changes: 6 additions & 0 deletions temporal-spring-boot-autoconfigure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ dependencies {
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"

testImplementation "org.springframework.boot:spring-boot-starter-test"

testImplementation('org.slf4j:slf4j-api') {
version {
strictly "${slf4jVersion}"
}
}
}

tasks.test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.temporal.spring.boot.autoconfigure.properties.NamespaceProperties;
import io.temporal.spring.boot.autoconfigure.properties.WorkerProperties;
import io.temporal.worker.*;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -510,7 +512,6 @@ private void configureWorkflowImplementationAutoDiscovery(

@SuppressWarnings("unchecked")
private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz) {

POJOWorkflowImplMetadata workflowMetadata =
POJOWorkflowImplMetadata.newInstanceForWorkflowFactory(clazz);
List<POJOWorkflowMethodMetadata> workflowMethods = workflowMetadata.getWorkflowMethods();
Expand All @@ -527,7 +528,18 @@ private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz)

WorkerDeploymentOptions deploymentOptions = worker.getWorkerOptions().getDeploymentOptions();

for (POJOWorkflowMethodMetadata workflowMethod : workflowMetadata.getWorkflowMethods()) {
// If the workflow implementation class has a constructor annotated with @WorkflowInit,
// we need to register it as a workflow factory.
if (workflowMetadata.getWorkflowInit() != null) {
// Currently, we only support one workflow method in a class with a constructor annotated with
// @WorkflowInit.
if (workflowMethods.size() > 1) {
throw new BeanDefinitionValidationException(
"Workflow implementation class "
+ clazz
+ " has more then one workflow method and a constructor annotated with @WorkflowInit.");
}
POJOWorkflowMethodMetadata workflowMethod = workflowMetadata.getWorkflowMethods().get(0);
if (deploymentOptions != null && deploymentOptions.isUsingVersioning()) {
POJOWorkflowImplementationFactory.validateVersioningBehavior(
clazz,
Expand All @@ -538,10 +550,43 @@ private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz)

worker.registerWorkflowImplementationFactory(
(Class<T>) workflowMethod.getWorkflowInterface(),
() -> (T) beanFactory.createBean(clazz),
(encodedValues) -> {
try {
Constructor<?> ctor = workflowMetadata.getWorkflowInit();
Object[] parameters = new Object[ctor.getParameterCount()];
for (int i = 0; i < ctor.getParameterCount(); i++) {
parameters[i] =
encodedValues.get(
i, ctor.getParameterTypes()[i], ctor.getGenericParameterTypes()[i]);
}
T workflowInstance = (T) workflowMetadata.getWorkflowInit().newInstance(parameters);
beanFactory.autowireBean(workflowInstance);
return workflowInstance;
} catch (InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
throw new RuntimeException(e);
}
},
workflowImplementationOptions);
addRegisteredWorkflowImpl(
worker, workflowMethod.getWorkflowInterface().getName(), workflowMetadata);
} else {
for (POJOWorkflowMethodMetadata workflowMethod : workflowMetadata.getWorkflowMethods()) {
if (deploymentOptions != null && deploymentOptions.isUsingVersioning()) {
POJOWorkflowImplementationFactory.validateVersioningBehavior(
clazz,
workflowMethod,
deploymentOptions.getDefaultVersioningBehavior(),
deploymentOptions.isUsingVersioning());
}
worker.registerWorkflowImplementationFactory(
(Class<T>) workflowMethod.getWorkflowInterface(),
() -> (T) beanFactory.createBean(clazz),
workflowImplementationOptions);
addRegisteredWorkflowImpl(
worker, workflowMethod.getWorkflowInterface().getName(), workflowMetadata);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,44 @@
import io.temporal.workflow.NexusOperationOptions;
import io.temporal.workflow.NexusServiceOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInit;
import java.time.Duration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;

@WorkflowImpl(taskQueues = {"${default-queue.name:UnitTest}"})
public class TestWorkflowImpl implements TestWorkflow {
private final TestNexusService nexusService;
private final TestActivity activity;

@Autowired private ConfigurableApplicationContext applicationContext;

@WorkflowInit
public TestWorkflowImpl(String input) {
nexusService =
Workflow.newNexusServiceStub(
TestNexusService.class,
NexusServiceOptions.newBuilder()
.setEndpoint("AutoDiscoveryByTaskQueueEndpoint")
.setOperationOptions(
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build())
.build());

activity =
Workflow.newActivityStub(
TestActivity.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(1))
.validateAndBuildWithDefaults());
}

@Override
public String execute(String input) {
if (input.equals("nexus")) {
Workflow.newNexusServiceStub(
TestNexusService.class,
NexusServiceOptions.newBuilder()
.setEndpoint("AutoDiscoveryByTaskQueueEndpoint")
.setOperationOptions(
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build())
.build())
.operation(input);
nexusService.operation(input);
}
return Workflow.newActivityStub(
TestActivity.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(1))
.validateAndBuildWithDefaults())
.execute("done");
return activity.execute("done");
}
}
Loading