Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,27 @@ public Promise<Void> getResult() {

final class CancelWorkflowInput {
private final WorkflowExecution execution;
private final @Nullable String reason;

// Kept for backward compatibility
@Deprecated
public CancelWorkflowInput(WorkflowExecution execution) {
this(execution, null);
}

public CancelWorkflowInput(WorkflowExecution execution, @Nullable String reason) {
this.execution = execution;
this.reason = reason;
}

public WorkflowExecution getExecution() {
return execution;
}

@Nullable
public String getReason() {
return reason;
}
}

final class CancelWorkflowOutput {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,13 @@ Functions.Proc1<Exception> signalExternalWorkflowExecution(
* Request cancellation of a workflow execution by WorkflowId and optionally RunId.
*
* @param execution contains WorkflowId and optional RunId of the workflow to send request to.
* @param reason optional reason for cancellation.
* @param callback callback notified about the operation result
*/
void requestCancelExternalWorkflowExecution(
WorkflowExecution execution, Functions.Proc2<Void, RuntimeException> callback);
WorkflowExecution execution,
@Nullable String reason,
Functions.Proc2<Void, RuntimeException> callback);

/**
* @return time of the {@link PollWorkflowTaskQueueResponse} start event of the workflow task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,17 @@ public Functions.Proc1<Exception> signalExternalWorkflowExecution(

@Override
public void requestCancelExternalWorkflowExecution(
WorkflowExecution execution, Functions.Proc2<Void, RuntimeException> callback) {
RequestCancelExternalWorkflowExecutionCommandAttributes attributes =
WorkflowExecution execution,
@Nullable String reason,
Functions.Proc2<Void, RuntimeException> callback) {
RequestCancelExternalWorkflowExecutionCommandAttributes.Builder attributes =
RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
.setWorkflowId(execution.getWorkflowId())
.setRunId(execution.getRunId())
.build();
workflowStateMachines.requestCancelExternalWorkflowExecution(attributes, callback);
.setRunId(execution.getRunId());
if (reason != null) {
attributes.setReason(reason);
}
workflowStateMachines.requestCancelExternalWorkflowExecution(attributes.build(), callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.workflow.*;
import java.util.Objects;
import javax.annotation.Nullable;

/** Dynamic implementation of a strongly typed child workflow interface. */
class ExternalWorkflowStubImpl implements ExternalWorkflowStub {
Expand Down Expand Up @@ -52,10 +53,26 @@ public void signal(String signalName, Object... args) {

@Override
public void cancel() {
cancel(null);
}

@Override
public void cancel(@Nullable String reason) {
assertReadOnly.apply("cancel external workflow");
if (reason == null) {
try {
CancellationScope currentScope = CancellationScope.current();
if (currentScope.isCancelRequested()) {
reason = currentScope.getCancellationReason();
}
} catch (IllegalStateException ignored) {
// Outside of workflow thread; leave reason as null.
}
}
Promise<Void> cancelRequested =
outboundCallsInterceptor
.cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution))
.cancelWorkflow(
new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution, reason))
.getResult();
if (AsyncInternal.isAsync()) {
AsyncInternal.setAsyncResult(cancelRequested);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,7 @@ public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
CompletablePromise<Void> result = Workflow.newPromise();
replayContext.requestCancelExternalWorkflowExecution(
input.getExecution(),
input.getReason(),
(r, exception) -> {
if (exception == null) {
result.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,13 @@ public static void continueAsNew(
}

public static Promise<Void> cancelWorkflow(WorkflowExecution execution) {
return cancelWorkflow(execution, null);
}

public static Promise<Void> cancelWorkflow(WorkflowExecution execution, @Nullable String reason) {
assertNotReadOnly("cancel workflow");
return getWorkflowOutboundInterceptor()
.cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution))
.cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution, reason))
.getResult();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface CancellationScope extends Runnable {
/**
* Cancels the scope as well as all its children.
*
* @param reason human readable reason for the cancellation. Becomes message of the
* @param reason human-readable reason for the cancellation. Becomes message of the
* CanceledException thrown.
*/
void cancel(String reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.internal.sync.StubMarker;
import javax.annotation.Nullable;

/**
* Supports signalling and cancelling any workflows by the workflow type and their id. This is
Expand Down Expand Up @@ -49,5 +50,13 @@ static <T> ExternalWorkflowStub fromTyped(T typed) {
*/
void signal(String signalName, Object... args);

/** Request cancellation of the workflow execution. */
void cancel();

/**
* Request cancellation of the workflow execution with a reason.
*
* @param reason optional reason for cancellation.
*/
void cancel(@Nullable String reason);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package io.temporal.workflow.cancellationTests;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.failure.CanceledFailure;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.CancellationScope;
import io.temporal.workflow.ExternalWorkflowStub;
import io.temporal.workflow.Promise;
import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.UUID;
import org.junit.Rule;
import org.junit.Test;

public class WorkflowCancelReasonTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setUseTimeskipping(false)
.setWorkflowTypes(
CancellationAwareWorkflow.class,
CancelExternalWorkflowImpl.class,
CascadingCancelWorkflowImpl.class)
.build();

@Test
public void testCancellationReasonFromClient() {
String workflowId = "client-cancel-" + UUID.randomUUID();
CancellationReasonWorkflow workflow =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(
CancellationReasonWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(testWorkflowRule.getTaskQueue())
.build());

WorkflowClient.start(workflow::execute);

WorkflowStub stub = WorkflowStub.fromTyped(workflow);
String reason = "client-cancel-reason";
stub.cancel(reason);

WorkflowFailedException exception =
assertThrows(WorkflowFailedException.class, () -> stub.getResult(String.class));
assertEquals(CanceledFailure.class, exception.getCause().getClass());

assertEquals(expectedResult(reason), workflow.getRecordedReason());
}

@Test
public void testCancellationReasonFromCommand() {
String targetWorkflowId = "command-cancel-" + UUID.randomUUID();
CancellationReasonWorkflow targetWorkflow =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(
CancellationReasonWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(targetWorkflowId)
.setTaskQueue(testWorkflowRule.getTaskQueue())
.build());

WorkflowClient.start(targetWorkflow::execute);

CancelExternalWorkflow canceller =
testWorkflowRule.newWorkflowStub(CancelExternalWorkflow.class);
String reason = "command-cancel-reason";
WorkflowClient.start(canceller::execute, targetWorkflowId, reason);

WorkflowStub targetStub = WorkflowStub.fromTyped(targetWorkflow);
WorkflowFailedException exception =
assertThrows(WorkflowFailedException.class, () -> targetStub.getResult(String.class));
assertEquals(CanceledFailure.class, exception.getCause().getClass());

assertEquals(expectedResult(reason), targetWorkflow.getRecordedReason());
}

@Test
public void testCancellationReasonAbsent() {
String workflowId = "client-cancel-null-" + UUID.randomUUID();
CancellationReasonWorkflow workflow =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(
CancellationReasonWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(testWorkflowRule.getTaskQueue())
.build());

WorkflowClient.start(workflow::execute);

WorkflowStub stub = WorkflowStub.fromTyped(workflow);
stub.cancel();

WorkflowFailedException exception =
assertThrows(WorkflowFailedException.class, () -> stub.getResult(String.class));
assertEquals(CanceledFailure.class, exception.getCause().getClass());

assertEquals(expectedResult(""), workflow.getRecordedReason());
}

@Test
public void testCancellationReasonDerivedFromContext() {
String targetWorkflowId = "context-derived-" + UUID.randomUUID();
CancellationReasonWorkflow targetWorkflow =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(
CancellationReasonWorkflow.class,
WorkflowOptions.newBuilder()
.setWorkflowId(targetWorkflowId)
.setTaskQueue(testWorkflowRule.getTaskQueue())
.build());

WorkflowClient.start(targetWorkflow::execute);

CascadingCancelWorkflow cascaderWorkflow =
testWorkflowRule.newWorkflowStub(CascadingCancelWorkflow.class);

WorkflowClient.start(cascaderWorkflow::execute, targetWorkflowId);

WorkflowStub cascaderStub = WorkflowStub.fromTyped(cascaderWorkflow);
String reason = "context-derived-reason";
cascaderStub.cancel(reason);

WorkflowStub targetStub = WorkflowStub.fromTyped(targetWorkflow);
WorkflowFailedException exception =
assertThrows(WorkflowFailedException.class, () -> targetStub.getResult(String.class));
assertEquals(CanceledFailure.class, exception.getCause().getClass());

assertEquals(expectedResult(reason), targetWorkflow.getRecordedReason());
}

private static String expectedResult(String reason) {
String part = String.valueOf(reason);
return part + "|" + part;
}

@WorkflowInterface
public interface CancellationReasonWorkflow {
@WorkflowMethod
String execute();

@QueryMethod
String getRecordedReason();
}

public static class CancellationAwareWorkflow implements CancellationReasonWorkflow {

private String recordedReason;

@Override
public String execute() {
Promise<String> cancellationRequest = CancellationScope.current().getCancellationRequest();
String requestedReason = cancellationRequest.get();
String scopeReason = CancellationScope.current().getCancellationReason();
recordedReason = expectedResultFromWorkflow(requestedReason, scopeReason);
System.out.println(recordedReason);
return recordedReason;
}

@Override
public String getRecordedReason() {
return recordedReason;
}

private String expectedResultFromWorkflow(String requestedReason, String scopeReason) {
return requestedReason + "|" + scopeReason;
}
}

@WorkflowInterface
public interface CancelExternalWorkflow {
@WorkflowMethod
void execute(String workflowId, String reason);
}

public static class CancelExternalWorkflowImpl implements CancelExternalWorkflow {

@Override
public void execute(String workflowId, String reason) {
ExternalWorkflowStub externalWorkflow = Workflow.newUntypedExternalWorkflowStub(workflowId);
externalWorkflow.cancel(reason);
}
}

@WorkflowInterface
public interface CascadingCancelWorkflow {
@WorkflowMethod
void execute(String workflowId);
}

public static class CascadingCancelWorkflowImpl implements CascadingCancelWorkflow {

@Override
public void execute(String workflowId) {
ExternalWorkflowStub externalWorkflow = Workflow.newUntypedExternalWorkflowStub(workflowId);
CancellationScope.current().getCancellationRequest().get();
externalWorkflow.cancel();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ private void processRequestCancelExternalWorkflowExecution(
.setWorkflowExecution(
WorkflowExecution.newBuilder().setWorkflowId(attr.getWorkflowId()))
.setNamespace(ctx.getNamespace())
.setReason(attr.getReason())
.build();
CancelExternalWorkflowExecutionCallerInfo info =
new CancelExternalWorkflowExecutionCallerInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ public Functions.Proc1<Exception> signalExternalWorkflowExecution(

@Override
public void requestCancelExternalWorkflowExecution(
WorkflowExecution execution, Functions.Proc2<Void, RuntimeException> callback) {
WorkflowExecution execution,
@Nullable String reason,
Functions.Proc2<Void, RuntimeException> callback) {
throw new UnsupportedOperationException("not implemented");
}

Expand Down
Loading