Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
dfc901d
Add OnConflictOptions support
justinp-tt Feb 13, 2025
85cff62
Add OnConflictOptions support
justinp-tt Feb 13, 2025
1a6be22
Add OnConflictOptions support
justinp-tt Feb 14, 2025
5f08c48
Add OnConflictOptions support
justinp-tt Feb 14, 2025
5bfb3e5
Add OnConflictOptions support
justinp-tt Feb 14, 2025
47cd098
Add OnConflictOptions support
justinp-tt Feb 19, 2025
3896f3d
Add OnConflictOptions support
justinp-tt Feb 19, 2025
6d6345e
Update temporal-test-server/src/main/java/io/temporal/internal/testse…
justinp-tt Feb 21, 2025
ca8f3d5
Add OnConflictOptions support
justinp-tt Feb 24, 2025
b56db56
Add OnConflictOptions support
justinp-tt Feb 24, 2025
9151216
Merge branch 'master' into on-conflict-options
justinp-tt Feb 24, 2025
6c25aec
Add OnConflictOptions support
justinp-tt Feb 24, 2025
74aada4
Add OnConflictOptions support
justinp-tt Feb 24, 2025
0d5b919
Add OnConflictOptions support
justinp-tt Feb 24, 2025
81f9ec4
Add OnConflictOptions support
justinp-tt Feb 24, 2025
c6b9925
Add OnConflictOptions support
justinp-tt Feb 24, 2025
44f562e
fixes
rodrigozhou Feb 25, 2025
8158ab0
Merge branch 'master' into on-conflict-options
rodrigozhou Feb 27, 2025
35bbf02
tests
rodrigozhou Feb 28, 2025
86ff638
fix callbacks in describe and continue as new
rodrigozhou Feb 28, 2025
1e69607
unblock UseExisting
rodrigozhou Feb 28, 2025
a507b5e
fix tests
rodrigozhou Mar 3, 2025
e3ae76a
revert unblock UseExisting
rodrigozhou Mar 4, 2025
bec28a6
Merge branch 'master' into on-conflict-options
rodrigozhou Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

package io.temporal.workflow.nexus;

import static org.junit.Assume.assumeTrue;

import io.nexusrpc.handler.HandlerException;
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
Expand All @@ -40,7 +38,6 @@
import java.util.List;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -52,12 +49,6 @@ public class WorkflowHandleFailOnConflictTest {
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Before
public void checkRealServer() {
assumeTrue(
"Test Server doesn't support OnConflictOption yet", SDKTestWorkflowRule.useExternalService);
}

@Test
public void testOnConflictFail() {
TestWorkflows.TestWorkflow1 workflowStub =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

package io.temporal.workflow.nexus;

import static org.junit.Assume.assumeTrue;

import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
Expand Down Expand Up @@ -49,12 +47,6 @@ public class WorkflowHandleUseExistingOnConflictCancelTest {
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Before
public void checkRealServer() {
assumeTrue(
"Test Server doesn't support OnConflictOption yet", SDKTestWorkflowRule.useExternalService);
}

@Test
public void testUseExistingCancel() {
TestWorkflows.TestWorkflow1 workflowStub =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

package io.temporal.workflow.nexus;

import static org.junit.Assume.assumeTrue;

import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
Expand All @@ -47,12 +45,6 @@ public class WorkflowHandleUseExistingOnConflictTest {
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Before
public void checkRealServer() {
assumeTrue(
"Test Server doesn't support OnConflictOption yet", SDKTestWorkflowRule.useExternalService);
}

@Test
public void testOnConflictUseExisting() {
TestWorkflows.TestWorkflow1 workflowStub =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@
import java.io.IOException;
import java.util.Objects;

final class ExecutionId {
public final class ExecutionId {

private final String namespace;
private final WorkflowExecution execution;

ExecutionId(String namespace, WorkflowExecution execution) {
public ExecutionId(String namespace, WorkflowExecution execution) {
this.namespace = Objects.requireNonNull(namespace);
this.execution = Objects.requireNonNull(execution);
}

ExecutionId(String namespace, String workflowId, String runId) {
public ExecutionId(String namespace, String workflowId, String runId) {
this(
namespace,
WorkflowExecution.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,17 @@

package io.temporal.internal.testservice;

import static io.temporal.internal.common.LinkConverter.*;
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
import static io.temporal.internal.testservice.StateMachines.Action.START;
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
import static io.temporal.internal.testservice.StateMachines.State.NONE;
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
import static io.temporal.internal.common.LinkConverter.nexusLinkToWorkflowEvent;
import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
import static io.temporal.internal.testservice.StateMachines.Action.*;
import static io.temporal.internal.testservice.StateMachines.State.*;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.*;
import com.google.protobuf.Any;
import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import io.grpc.Status;
Expand All @@ -65,7 +49,8 @@
import io.temporal.api.query.v1.WorkflowQueryResult;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.update.v1.*;
import io.temporal.api.update.v1.Acceptance;
import io.temporal.api.update.v1.Outcome;
import io.temporal.api.update.v1.Request;
import io.temporal.api.update.v1.Response;
import io.temporal.api.workflowservice.v1.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.grpc.Deadline;
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.Callback;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
Expand All @@ -32,6 +33,7 @@
import io.temporal.api.nexus.v1.StartOperationResponse;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
import io.temporal.api.workflowservice.v1.*;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.Nullable;
Expand All @@ -49,6 +51,8 @@ void startWorkflowTask(

void completeWorkflowTask(int historySize, RespondWorkflowTaskCompletedRequest request);

void applyOnConflictOptions(StartWorkflowExecutionRequest request);

void reportCancelRequested(ExternalWorkflowExecutionCancelRequestedEventAttributes a);

void completeSignalExternalWorkflowExecution(String signalId, String runId);
Expand Down Expand Up @@ -143,4 +147,8 @@ PollWorkflowExecutionUpdateResponse pollUpdateWorkflowExecution(
Optional<TestWorkflowMutableState> getParent();

boolean isTerminalState();

boolean isRequestIdAttached(String requestId);

List<Callback> getCompletionCallbacks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -136,6 +137,8 @@ private interface UpdateProcedure {
new ConcurrentHashMap<>();
public StickyExecutionAttributes stickyExecutionAttributes;
private Map<String, Payload> currentMemo;
private final Set<String> attachedRequestIds = new HashSet<>();
private final List<Callback> completionCallbacks = new ArrayList<>();

/**
* @param retryState present if workflow is a retry
Expand Down Expand Up @@ -184,6 +187,7 @@ private interface UpdateProcedure {
this.workflow = StateMachines.newWorkflowStateMachine(data);
this.workflowTaskStateMachine = StateMachines.newWorkflowTaskStateMachine(store, startRequest);
this.currentMemo = new HashMap(startRequest.getMemo().getFieldsMap());
this.completionCallbacks.addAll(startRequest.getCompletionCallbacksList());
}

/** Based on overrideStartWorkflowExecutionRequest from historyEngine.go */
Expand Down Expand Up @@ -613,6 +617,29 @@ public void completeWorkflowTask(
request.hasStickyAttributes() ? request.getStickyAttributes() : null);
}

@Override
public void applyOnConflictOptions(@Nonnull StartWorkflowExecutionRequest request) {
update(
ctx -> {
OnConflictOptions options = request.getOnConflictOptions();
String requestId = null;
List<Callback> completionCallbacks = null;
List<Link> links = null;

if (options.getAttachRequestId()) {
requestId = request.getRequestId();
}
if (options.getAttachCompletionCallbacks()) {
completionCallbacks = request.getCompletionCallbacksList();
}
if (options.getAttachLinks()) {
links = request.getLinksList();
}

addWorkflowExecutionOptionsUpdatedEvent(ctx, requestId, completionCallbacks, links);
});
}

private void failWorkflowTaskWithAReason(
WorkflowTaskFailedCause failedCause,
ServerFailure eventAttributesFailure,
Expand Down Expand Up @@ -1476,6 +1503,7 @@ private void processFailWorkflowExecution(
identity,
getExecutionId(),
workflow.getData().firstExecutionRunId,
this,
parent,
parentChildInitiatedEventId);
return;
Expand Down Expand Up @@ -1608,6 +1636,7 @@ private void startNewCronRun(
identity,
getExecutionId(),
workflow.getData().firstExecutionRunId,
this,
parent,
parentChildInitiatedEventId);
}
Expand Down Expand Up @@ -1665,6 +1694,7 @@ private void processContinueAsNewWorkflowExecution(
identity,
getExecutionId(),
workflow.getData().firstExecutionRunId,
this,
parent,
parentChildInitiatedEventId);
}
Expand Down Expand Up @@ -1696,7 +1726,7 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
}
});

for (Callback cb : startRequest.getCompletionCallbacksList()) {
for (Callback cb : completionCallbacks) {
if (!cb.hasNexus()) {
// test server only supports nexus callbacks currently
log.warn("skipping non-nexus completion callback");
Expand All @@ -1718,8 +1748,16 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
.build())
.build());

service.completeNexusOperation(
ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get());
try {
service.completeNexusOperation(
ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get());
} catch (StatusRuntimeException e) {
// Callback destination not found should not block processing the callbacks nor
// completing the workflow.
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
throw e;
}
}
Comment on lines +1751 to +1760
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pdoerner This is what I did to address callback "not found".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we do get NOT_FOUND (or really any error) we should log an error and/or store the failure somewhere so that it can be returned as part of the DescribeWorkflow response which has fields for callback's last failure, attempt time, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we can also address that in a followup, doesn't necessarily have to be in this PR

}
}

Expand Down Expand Up @@ -1985,6 +2023,11 @@ public boolean isTerminalState() {
return isTerminalState(workflowState);
}

@Override
public boolean isRequestIdAttached(@Nonnull String requestId) {
return attachedRequestIds.contains(requestId);
}

private void updateHeartbeatTimer(
RequestContext ctx,
long activityId,
Expand Down Expand Up @@ -3121,7 +3164,7 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock()
.setParentExecution(p.getExecutionId().getExecution()));

List<CallbackInfo> callbacks =
this.startRequest.getCompletionCallbacksList().stream()
this.completionCallbacks.stream()
.map(TestWorkflowMutableStateImpl::constructCallbackInfo)
.collect(Collectors.toList());

Expand Down Expand Up @@ -3429,6 +3472,31 @@ private void addExecutionSignaledByExternalEvent(
ctx.addEvent(executionSignaled);
}

private void addWorkflowExecutionOptionsUpdatedEvent(
RequestContext ctx, String requestId, List<Callback> completionCallbacks, List<Link> links) {
WorkflowExecutionOptionsUpdatedEventAttributes.Builder attrs =
WorkflowExecutionOptionsUpdatedEventAttributes.newBuilder();
if (requestId != null) {
attrs.setAttachedRequestId(requestId);
this.attachedRequestIds.add(requestId);
}
if (completionCallbacks != null) {
attrs.addAllAttachedCompletionCallbacks(completionCallbacks);
this.completionCallbacks.addAll(completionCallbacks);
}

HistoryEvent.Builder event =
HistoryEvent.newBuilder()
.setWorkerMayIgnore(true)
.setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED)
.setWorkflowExecutionOptionsUpdatedEventAttributes(attrs);
if (links != null) {
event.addAllLinks(links);
}

ctx.addEvent(event.build());
}

private StateMachine<ActivityTaskData> getPendingActivityById(String activityId) {
Long scheduledEventId = activityById.get(activityId);
if (scheduledEventId == null) {
Expand Down Expand Up @@ -3555,4 +3623,9 @@ private boolean isTerminalState(State workflowState) {
|| workflowState == State.TERMINATED
|| workflowState == State.CONTINUED_AS_NEW;
}

@Override
public List<Callback> getCompletionCallbacks() {
return completionCallbacks;
}
}
Loading
Loading