-
Notifications
You must be signed in to change notification settings - Fork 198
Add OnConflictOptions Support #2415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dfc901d
85cff62
1a6be22
5f08c48
5bfb3e5
47cd098
3896f3d
6d6345e
ca8f3d5
b56db56
9151216
6c25aec
74aada4
0d5b919
81f9ec4
c6b9925
44f562e
8158ab0
35bbf02
86ff638
1e69607
a507b5e
e3ae76a
bec28a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,7 @@ | |
| import java.util.function.LongSupplier; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.IntStream; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -136,6 +137,8 @@ private interface UpdateProcedure { | |
| new ConcurrentHashMap<>(); | ||
| public StickyExecutionAttributes stickyExecutionAttributes; | ||
| private Map<String, Payload> currentMemo; | ||
| private final Set<String> attachedRequestIds = new HashSet<>(); | ||
| private final List<Callback> completionCallbacks = new ArrayList<>(); | ||
|
|
||
| /** | ||
| * @param retryState present if workflow is a retry | ||
|
|
@@ -184,6 +187,7 @@ private interface UpdateProcedure { | |
| this.workflow = StateMachines.newWorkflowStateMachine(data); | ||
| this.workflowTaskStateMachine = StateMachines.newWorkflowTaskStateMachine(store, startRequest); | ||
| this.currentMemo = new HashMap(startRequest.getMemo().getFieldsMap()); | ||
| this.completionCallbacks.addAll(startRequest.getCompletionCallbacksList()); | ||
| } | ||
|
|
||
| /** Based on overrideStartWorkflowExecutionRequest from historyEngine.go */ | ||
|
|
@@ -613,6 +617,29 @@ public void completeWorkflowTask( | |
| request.hasStickyAttributes() ? request.getStickyAttributes() : null); | ||
| } | ||
|
|
||
| @Override | ||
| public void applyOnConflictOptions(@Nonnull StartWorkflowExecutionRequest request) { | ||
justinp-tt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| update( | ||
| ctx -> { | ||
| OnConflictOptions options = request.getOnConflictOptions(); | ||
| String requestId = null; | ||
| List<Callback> completionCallbacks = null; | ||
| List<Link> links = null; | ||
|
|
||
| if (options.getAttachRequestId()) { | ||
| requestId = request.getRequestId(); | ||
| } | ||
| if (options.getAttachCompletionCallbacks()) { | ||
| completionCallbacks = request.getCompletionCallbacksList(); | ||
| } | ||
| if (options.getAttachLinks()) { | ||
| links = request.getLinksList(); | ||
| } | ||
|
|
||
| addWorkflowExecutionOptionsUpdatedEvent(ctx, requestId, completionCallbacks, links); | ||
| }); | ||
| } | ||
|
|
||
| private void failWorkflowTaskWithAReason( | ||
| WorkflowTaskFailedCause failedCause, | ||
| ServerFailure eventAttributesFailure, | ||
|
|
@@ -1476,6 +1503,7 @@ private void processFailWorkflowExecution( | |
| identity, | ||
| getExecutionId(), | ||
| workflow.getData().firstExecutionRunId, | ||
| this, | ||
| parent, | ||
| parentChildInitiatedEventId); | ||
| return; | ||
|
|
@@ -1608,6 +1636,7 @@ private void startNewCronRun( | |
| identity, | ||
| getExecutionId(), | ||
| workflow.getData().firstExecutionRunId, | ||
| this, | ||
| parent, | ||
| parentChildInitiatedEventId); | ||
| } | ||
|
|
@@ -1665,6 +1694,7 @@ private void processContinueAsNewWorkflowExecution( | |
| identity, | ||
| getExecutionId(), | ||
| workflow.getData().firstExecutionRunId, | ||
| this, | ||
| parent, | ||
| parentChildInitiatedEventId); | ||
| } | ||
|
|
@@ -1696,7 +1726,7 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) { | |
| } | ||
| }); | ||
|
|
||
| for (Callback cb : startRequest.getCompletionCallbacksList()) { | ||
| for (Callback cb : completionCallbacks) { | ||
| if (!cb.hasNexus()) { | ||
| // test server only supports nexus callbacks currently | ||
| log.warn("skipping non-nexus completion callback"); | ||
|
|
@@ -1718,8 +1748,16 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) { | |
| .build()) | ||
| .build()); | ||
|
|
||
| service.completeNexusOperation( | ||
| ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get()); | ||
| try { | ||
| service.completeNexusOperation( | ||
| ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get()); | ||
| } catch (StatusRuntimeException e) { | ||
| // Callback destination not found should not block processing the callbacks nor | ||
| // completing the workflow. | ||
| if (e.getStatus().getCode() != Status.Code.NOT_FOUND) { | ||
| throw e; | ||
| } | ||
| } | ||
|
Comment on lines
+1751
to
+1760
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pdoerner This is what I did to address callback "not found".
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if we do get NOT_FOUND (or really any error) we should log an error and/or store the failure somewhere so that it can be returned as part of the DescribeWorkflow response which has fields for callback's last failure, attempt time, etc
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But we can also address that in a followup, doesn't necessarily have to be in this PR |
||
| } | ||
| } | ||
|
|
||
|
|
@@ -1985,6 +2023,11 @@ public boolean isTerminalState() { | |
| return isTerminalState(workflowState); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isRequestIdAttached(@Nonnull String requestId) { | ||
| return attachedRequestIds.contains(requestId); | ||
| } | ||
|
|
||
| private void updateHeartbeatTimer( | ||
| RequestContext ctx, | ||
| long activityId, | ||
|
|
@@ -3121,7 +3164,7 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock() | |
| .setParentExecution(p.getExecutionId().getExecution())); | ||
|
|
||
| List<CallbackInfo> callbacks = | ||
| this.startRequest.getCompletionCallbacksList().stream() | ||
| this.completionCallbacks.stream() | ||
| .map(TestWorkflowMutableStateImpl::constructCallbackInfo) | ||
| .collect(Collectors.toList()); | ||
|
|
||
|
|
@@ -3429,6 +3472,31 @@ private void addExecutionSignaledByExternalEvent( | |
| ctx.addEvent(executionSignaled); | ||
| } | ||
|
|
||
| private void addWorkflowExecutionOptionsUpdatedEvent( | ||
| RequestContext ctx, String requestId, List<Callback> completionCallbacks, List<Link> links) { | ||
| WorkflowExecutionOptionsUpdatedEventAttributes.Builder attrs = | ||
| WorkflowExecutionOptionsUpdatedEventAttributes.newBuilder(); | ||
| if (requestId != null) { | ||
| attrs.setAttachedRequestId(requestId); | ||
| this.attachedRequestIds.add(requestId); | ||
| } | ||
| if (completionCallbacks != null) { | ||
| attrs.addAllAttachedCompletionCallbacks(completionCallbacks); | ||
| this.completionCallbacks.addAll(completionCallbacks); | ||
| } | ||
|
|
||
| HistoryEvent.Builder event = | ||
| HistoryEvent.newBuilder() | ||
| .setWorkerMayIgnore(true) | ||
| .setEventType(EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED) | ||
| .setWorkflowExecutionOptionsUpdatedEventAttributes(attrs); | ||
| if (links != null) { | ||
| event.addAllLinks(links); | ||
| } | ||
|
|
||
| ctx.addEvent(event.build()); | ||
| } | ||
|
|
||
| private StateMachine<ActivityTaskData> getPendingActivityById(String activityId) { | ||
| Long scheduledEventId = activityById.get(activityId); | ||
| if (scheduledEventId == null) { | ||
|
|
@@ -3555,4 +3623,9 @@ private boolean isTerminalState(State workflowState) { | |
| || workflowState == State.TERMINATED | ||
| || workflowState == State.CONTINUED_AS_NEW; | ||
| } | ||
|
|
||
| @Override | ||
| public List<Callback> getCompletionCallbacks() { | ||
| return completionCallbacks; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.