-
Notifications
You must be signed in to change notification settings - Fork 198
Add support for attaching to a running workflow #2424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
03c790c
9260e37
b46bbc6
5689f3d
3385803
97d7ebf
f4d0e0e
96957e6
fdfd08f
dc2f6e9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| /* | ||
| * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. | ||
| * | ||
| * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| * | ||
| * Modifications copyright (C) 2017 Uber Technologies, Inc. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this material except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package io.temporal.client; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import io.temporal.common.Experimental; | ||
| import java.util.Objects; | ||
|
|
||
| /** | ||
| * OnConflictOptions specifies the actions to be taken when using the {@link | ||
| * io.temporal.api.enums.v1.WorkflowIdConflictPolicy#WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING} | ||
| */ | ||
| @Experimental | ||
| public class OnConflictOptions { | ||
| public static OnConflictOptions.Builder newBuilder() { | ||
| return new OnConflictOptions.Builder(); | ||
| } | ||
|
|
||
| public static OnConflictOptions.Builder newBuilder(OnConflictOptions options) { | ||
| return new OnConflictOptions.Builder(options); | ||
| } | ||
|
|
||
| public static OnConflictOptions getDefaultInstance() { | ||
| return DEFAULT_INSTANCE; | ||
| } | ||
|
|
||
| private static final OnConflictOptions DEFAULT_INSTANCE; | ||
|
|
||
| static { | ||
| DEFAULT_INSTANCE = OnConflictOptions.newBuilder().build(); | ||
| } | ||
|
|
||
| private final boolean attachRequestId; | ||
| private final boolean attachCompletionCallbacks; | ||
| private final boolean attachLinks; | ||
|
|
||
| private OnConflictOptions( | ||
| boolean attachRequestId, boolean attachCompletionCallbacks, boolean attachLinks) { | ||
| this.attachRequestId = attachRequestId; | ||
| this.attachCompletionCallbacks = attachCompletionCallbacks; | ||
| this.attachLinks = attachLinks; | ||
| } | ||
|
|
||
| public boolean isAttachRequestId() { | ||
| return attachRequestId; | ||
| } | ||
|
|
||
| public boolean isAttachCompletionCallbacks() { | ||
| return attachCompletionCallbacks; | ||
| } | ||
|
|
||
| public boolean isAttachLinks() { | ||
| return attachLinks; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) return true; | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| OnConflictOptions that = (OnConflictOptions) o; | ||
| return attachRequestId == that.attachRequestId | ||
| && attachCompletionCallbacks == that.attachCompletionCallbacks | ||
| && attachLinks == that.attachLinks; | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(attachRequestId, attachCompletionCallbacks, attachLinks); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "OnConflictOptions{" | ||
| + "attachRequestId=" | ||
| + attachRequestId | ||
| + ", attachCompletionCallbacks=" | ||
| + attachCompletionCallbacks | ||
| + ", attachLinks=" | ||
| + attachLinks | ||
| + '}'; | ||
| } | ||
|
|
||
| public static final class Builder { | ||
| private boolean attachRequestId; | ||
| private boolean attachCompletionCallbacks; | ||
| private boolean attachLinks; | ||
|
|
||
| public Builder(OnConflictOptions options) { | ||
| this.attachRequestId = options.attachRequestId; | ||
| this.attachCompletionCallbacks = options.attachCompletionCallbacks; | ||
| this.attachLinks = options.attachLinks; | ||
| } | ||
|
|
||
| public Builder() {} | ||
|
|
||
| /** Attaches the request ID to the running workflow. */ | ||
| public Builder setAttachRequestId(boolean attachRequestId) { | ||
| this.attachRequestId = attachRequestId; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Attaches the completion callbacks to the running workflow. If true, AttachRequestId must be | ||
| * true. | ||
| */ | ||
| public Builder setAttachCompletionCallbacks(boolean attachCompletionCallbacks) { | ||
| this.attachCompletionCallbacks = attachCompletionCallbacks; | ||
| return this; | ||
| } | ||
|
|
||
| /** Attaches the links to the WorkflowExecutionOptionsUpdatedEvent history event. */ | ||
| public Builder setAttachLinks(boolean attachLinks) { | ||
| this.attachLinks = attachLinks; | ||
| return this; | ||
| } | ||
|
|
||
| public OnConflictOptions build() { | ||
| if (attachCompletionCallbacks) { | ||
| Preconditions.checkState( | ||
| attachRequestId, "AttachRequestId must be true if AttachCompletionCallbacks is true"); | ||
| } | ||
| return new OnConflictOptions(attachRequestId, attachCompletionCallbacks, attachLinks); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The server doesn't allow attaching callbacks without also attaching a request ID (as of yesterday). I'm wondering if we should verify that SDK side and document it on the builder setters. We definitely should in the api (@rodrigozhou). |
||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -84,6 +84,7 @@ public static WorkflowOptions merge( | |
| .setRequestId(o.getRequestId()) | ||
| .setCompletionCallbacks(o.getCompletionCallbacks()) | ||
| .setLinks(o.getLinks()) | ||
| .setOnConflictOptions(o.getOnConflictOptions()) | ||
| .validateBuildWithDefaults(); | ||
| } | ||
|
|
||
|
|
@@ -129,6 +130,8 @@ public static final class Builder { | |
|
|
||
| private List<Link> links; | ||
|
|
||
| private OnConflictOptions onConflictOptions; | ||
|
|
||
| private Builder() {} | ||
|
|
||
| private Builder(WorkflowOptions options) { | ||
|
|
@@ -155,6 +158,7 @@ private Builder(WorkflowOptions options) { | |
| this.requestId = options.requestId; | ||
| this.completionCallbacks = options.completionCallbacks; | ||
| this.links = options.links; | ||
| this.onConflictOptions = options.onConflictOptions; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -460,6 +464,20 @@ public Builder setLinks(List<Link> links) { | |
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Set workflow ID conflict options used in conjunction with conflict policy | ||
| * WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING. If onConflictOptions is set and a workflow is | ||
| * already running, the options specifies the actions to be taken on the running workflow. If | ||
| * not set or use together with any other WorkflowIDConflictPolicy, this parameter is ignored. | ||
| * | ||
| * <p>WARNING: Not intended for User Code. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wish there was a way to hide this. Maybe with a private subclass and cast?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't be private because it needs to be accessible by the |
||
| */ | ||
| @Experimental | ||
| public Builder setOnConflictOptions(OnConflictOptions onConflictOptions) { | ||
| this.onConflictOptions = onConflictOptions; | ||
| return this; | ||
| } | ||
|
|
||
| public WorkflowOptions build() { | ||
| return new WorkflowOptions( | ||
| workflowId, | ||
|
|
@@ -481,7 +499,8 @@ public WorkflowOptions build() { | |
| staticDetails, | ||
| requestId, | ||
| completionCallbacks, | ||
| links); | ||
| links, | ||
| onConflictOptions); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -508,7 +527,8 @@ public WorkflowOptions validateBuildWithDefaults() { | |
| staticDetails, | ||
| requestId, | ||
| completionCallbacks, | ||
| links); | ||
| links, | ||
| onConflictOptions); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -551,6 +571,7 @@ public WorkflowOptions validateBuildWithDefaults() { | |
| private final List<Callback> completionCallbacks; | ||
|
|
||
| private final List<Link> links; | ||
| private final OnConflictOptions onConflictOptions; | ||
|
|
||
| private WorkflowOptions( | ||
| String workflowId, | ||
|
|
@@ -572,7 +593,8 @@ private WorkflowOptions( | |
| String staticDetails, | ||
| String requestId, | ||
| List<Callback> completionCallbacks, | ||
| List<Link> links) { | ||
| List<Link> links, | ||
| OnConflictOptions onConflictOptions) { | ||
| this.workflowId = workflowId; | ||
| this.workflowIdReusePolicy = workflowIdReusePolicy; | ||
| this.workflowRunTimeout = workflowRunTimeout; | ||
|
|
@@ -593,6 +615,7 @@ private WorkflowOptions( | |
| this.requestId = requestId; | ||
| this.completionCallbacks = completionCallbacks; | ||
| this.links = links; | ||
| this.onConflictOptions = onConflictOptions; | ||
| } | ||
|
|
||
| public String getWorkflowId() { | ||
|
|
@@ -689,6 +712,11 @@ public String getStaticDetails() { | |
| return staticDetails; | ||
| } | ||
|
|
||
| @Experimental | ||
| public @Nullable OnConflictOptions getOnConflictOptions() { | ||
| return onConflictOptions; | ||
| } | ||
|
|
||
| public Builder toBuilder() { | ||
| return new Builder(this); | ||
| } | ||
|
|
@@ -717,7 +745,8 @@ public boolean equals(Object o) { | |
| && Objects.equal(staticDetails, that.staticDetails) | ||
| && Objects.equal(requestId, that.requestId) | ||
| && Objects.equal(completionCallbacks, that.completionCallbacks) | ||
| && Objects.equal(links, that.links); | ||
| && Objects.equal(links, that.links) | ||
| && Objects.equal(onConflictOptions, that.onConflictOptions); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -742,7 +771,8 @@ public int hashCode() { | |
| staticDetails, | ||
| requestId, | ||
| completionCallbacks, | ||
| links); | ||
| links, | ||
| onConflictOptions); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -791,6 +821,8 @@ public String toString() { | |
| + completionCallbacks | ||
| + ", links=" | ||
| + links | ||
| + ", onConflictOptions=" | ||
| + onConflictOptions | ||
| + '}'; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -186,46 +186,48 @@ private void cancelNexusOperationCommand() { | |
| } | ||
|
|
||
| private void notifyStarted() { | ||
| if (!async) { | ||
| if (currentEvent.getEventType() != EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED) { | ||
| startedCallback.apply(Optional.empty(), null); | ||
| } else { | ||
| async = true; | ||
| String operationToken = | ||
| currentEvent.getNexusOperationStartedEventAttributes().getOperationToken(); | ||
| String operationId = | ||
| currentEvent.getNexusOperationStartedEventAttributes().getOperationId(); | ||
| startedCallback.apply( | ||
| Optional.of(operationToken.isEmpty() ? operationId : operationToken), null); | ||
| } | ||
| } | ||
| async = true; | ||
| String operationToken = | ||
| currentEvent.getNexusOperationStartedEventAttributes().getOperationToken(); | ||
| // TODO(#2423) Remove support for operationId | ||
| String operationId = currentEvent.getNexusOperationStartedEventAttributes().getOperationId(); | ||
Quinn-With-Two-Ns marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| startedCallback.apply( | ||
| Optional.of(operationToken.isEmpty() ? operationId : operationToken), null); | ||
| } | ||
|
|
||
| private void notifyCompleted() { | ||
| notifyStarted(); | ||
| NexusOperationCompletedEventAttributes attributes = | ||
| currentEvent.getNexusOperationCompletedEventAttributes(); | ||
| if (!async) { | ||
| startedCallback.apply(Optional.empty(), null); | ||
| } | ||
|
Comment on lines
+201
to
+203
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would just keep all of this logic in |
||
| completionCallback.apply(Optional.of(attributes.getResult()), null); | ||
| } | ||
|
|
||
| private void notifyFailed() { | ||
| notifyStarted(); | ||
| NexusOperationFailedEventAttributes attributes = | ||
| currentEvent.getNexusOperationFailedEventAttributes(); | ||
| if (!async) { | ||
| startedCallback.apply(Optional.empty(), attributes.getFailure()); | ||
| } | ||
| completionCallback.apply(Optional.empty(), attributes.getFailure()); | ||
| } | ||
|
|
||
| private void notifyCanceled() { | ||
| notifyStarted(); | ||
| NexusOperationCanceledEventAttributes attributes = | ||
| currentEvent.getNexusOperationCanceledEventAttributes(); | ||
| if (!async) { | ||
| startedCallback.apply(Optional.empty(), attributes.getFailure()); | ||
| } | ||
| completionCallback.apply(Optional.empty(), attributes.getFailure()); | ||
| } | ||
|
|
||
| private void notifyTimedOut() { | ||
| notifyStarted(); | ||
| NexusOperationTimedOutEventAttributes attributes = | ||
| currentEvent.getNexusOperationTimedOutEventAttributes(); | ||
| if (!async) { | ||
| startedCallback.apply(Optional.empty(), attributes.getFailure()); | ||
| } | ||
| completionCallback.apply(Optional.empty(), attributes.getFailure()); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrmm, it was my understanding that we were going to hive this from users, though I know that is difficult in Java. If instead we're ok with showing to users but marking experimental, I think we should do the same in Go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't avoid exposing it given the structure of the Java SDK, just because the Java SDK has this unfortunate limitation doesn't mean other SDKs should copy it
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the question becomes should other SDKs follow Go's lead or Java's lead here. Sounds like we are saying Java will be the outlier instead of updating Go to expose as well?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes other SDKs should not copy it