Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,64 +253,83 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
WorkflowId workflowId = new WorkflowId(namespace, requestWorkflowId);
WorkflowIdReusePolicy reusePolicy = startRequest.getWorkflowIdReusePolicy();
WorkflowIdConflictPolicy conflictPolicy = startRequest.getWorkflowIdConflictPolicy();
if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
&& reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
throw createInvalidArgument(
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
}

validateWorkflowIdReusePolicy(reusePolicy, conflictPolicy);
validateOnConflictOptions(startRequest);

// Backwards compatibility: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING is deprecated
if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
conflictPolicy = WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING;
reusePolicy = WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE;
}
if (conflictPolicy == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED) {
conflictPolicy = WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL;
}
if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_UNSPECIFIED) {
reusePolicy = WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE;
}

TestWorkflowMutableState existing;
lock.lock();
try {
String newRunId = UUID.randomUUID().toString();
existing = executionsByWorkflowId.get(workflowId);
if (existing != null) {
WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
StartWorkflowExecutionResponse dedupedResponse = dedupeRequest(startRequest, existing);
if (dedupedResponse != null) {
return dedupedResponse;
}

WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
if (status == WORKFLOW_EXECUTION_STATUS_RUNNING) {
StartWorkflowExecutionResponse dedupedResponse = dedupeRequest(startRequest, existing);
if (dedupedResponse != null) {
return dedupedResponse;
switch (conflictPolicy) {
case WORKFLOW_ID_CONFLICT_POLICY_FAIL:
return throwDuplicatedWorkflow(startRequest, existing);
case WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING:
if (startRequest.hasOnConflictOptions()) {
existing.applyOnConflictOptions(startRequest);
}
return StartWorkflowExecutionResponse.newBuilder()
.setStarted(false)
.setRunId(existing.getExecutionId().getExecution().getRunId())
.build();
case WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING:
existing.terminateWorkflowExecution(
TerminateWorkflowExecutionRequest.newBuilder()
.setNamespace(startRequest.getNamespace())
.setWorkflowExecution(existing.getExecutionId().getExecution())
.setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
.setIdentity("history-service")
.setDetails(
Payloads.newBuilder()
.addPayloads(
Payload.newBuilder()
.setData(
ByteString.copyFromUtf8(
String.format(
"terminated by new runID: %s", newRunId)))
.build())
.build())
.build());
break;
}
}

if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
|| conflictPolicy
== WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING) {
existing.terminateWorkflowExecution(
TerminateWorkflowExecutionRequest.newBuilder()
.setNamespace(startRequest.getNamespace())
.setWorkflowExecution(existing.getExecutionId().getExecution())
.setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
.setIdentity("history-service")
.setDetails(
Payloads.newBuilder()
.addPayloads(
Payload.newBuilder()
.setData(
ByteString.copyFromUtf8(
String.format("terminated by new runID: %s", newRunId)))
.build())
.build())
.build());
} else if (conflictPolicy
== WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) {
if (startRequest.hasOnConflictOptions()) {
existing.applyOnConflictOptions(startRequest);
// Status of existing workflow could have changed to TERMINATED.
status = existing.getWorkflowExecutionStatus();

// At this point, the existing workflow already completed or was terminated.
switch (reusePolicy) {
case WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE:
break;
case WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY:
if (status == WORKFLOW_EXECUTION_STATUS_COMPLETED
|| status == WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW) {
return throwDuplicatedWorkflow(startRequest, existing);
}
return StartWorkflowExecutionResponse.newBuilder()
.setStarted(false)
.setRunId(existing.getExecutionId().getExecution().getRunId())
.build();
} else {
break;
case WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE:
return throwDuplicatedWorkflow(startRequest, existing);
}
} else if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE
|| (reusePolicy == WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
&& (status == WORKFLOW_EXECUTION_STATUS_COMPLETED
|| status == WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW))) {
return throwDuplicatedWorkflow(startRequest, existing);
}
}

Expand Down Expand Up @@ -373,14 +392,28 @@ private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
WorkflowExecutionAlreadyStartedFailure.getDescriptor());
}

private void validateWorkflowIdReusePolicy(
WorkflowIdReusePolicy reusePolicy, WorkflowIdConflictPolicy conflictPolicy) {
if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
&& reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
throw createInvalidArgument(
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
}
if (conflictPolicy == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
&& reusePolicy == WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
throw createInvalidArgument(
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE cannot be used together with WorkflowIdConflictPolicy WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING");
}
}

private void validateOnConflictOptions(StartWorkflowExecutionRequest startRequest) {
if (!startRequest.hasOnConflictOptions()) {
return;
}
OnConflictOptions options = startRequest.getOnConflictOptions();
if (options.getAttachCompletionCallbacks() && !options.getAttachRequestId()) {
throw createInvalidArgument(
"Invalid OnConflictOptions: AttachCompletionCallbacks cannot be 'true' if AttachRequestId is 'false'.");
"Invalid OnConflictOptions: AttachCompletionCallbacks cannot be 'true' if AttachRequestId is 'false'.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.client.*;
Expand Down Expand Up @@ -118,6 +119,59 @@ public void secondWorkflowTerminatesFirst() {
describe(execution2).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
}

@Test
public void deduplicateRequestWorkflowStillRunning() {
String workflowId = "deduplicate-request-1";
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setRequestId("request-id-1")
.build();

WorkflowExecution execution1 = startForeverWorkflow(options);
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);

WorkflowExecution execution2 = startForeverWorkflow(options);
describe(execution2).assertExecutionId(execution1);
}

@Test
public void deduplicateRequestWorkflowAlreadyCompleted() {
String workflowId = "deduplicate-request-2";
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setRequestId("request-id-2")
.build();

WorkflowExecution execution1 = runFailingWorkflow(options);
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);

WorkflowExecution execution2 = startForeverWorkflow(options);
describe(execution2).assertExecutionId(execution1);
}

@Test
public void invalidWorkflowIdReusePolicy() {
String workflowId = "invalid-workflow-id-reuse-policy";
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setWorkflowIdReusePolicy(
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
.setWorkflowIdConflictPolicy(
WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING)
.build();

Assert.assertThrows(WorkflowServiceException.class, () -> startForeverWorkflow(options));
}

private WorkflowExecution startForeverWorkflow(WorkflowOptions options) {
TestWorkflows.PrimitiveWorkflow workflowStub =
testWorkflowRule
Expand Down
Loading