Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/client/OnConflictOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.client;

import com.google.common.base.Preconditions;
import io.temporal.common.Experimental;
import java.util.Objects;

/**
* OnConflictOptions specifies the actions to be taken when using the {@link
* io.temporal.api.enums.v1.WorkflowIdConflictPolicy#WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING}
*/
@Experimental
public class OnConflictOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, it was my understanding that we were going to hive this from users, though I know that is difficult in Java. If instead we're ok with showing to users but marking experimental, I think we should do the same in Go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't avoid exposing it given the structure of the Java SDK, just because the Java SDK has this unfortunate limitation doesn't mean other SDKs should copy it

Copy link
Contributor

@cretz cretz Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the question becomes should other SDKs follow Go's lead or Java's lead here. Sounds like we are saying Java will be the outlier instead of updating Go to expose as well?

Copy link
Contributor Author

@Quinn-With-Two-Ns Quinn-With-Two-Ns Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes other SDKs should not copy it

public static OnConflictOptions.Builder newBuilder() {
return new OnConflictOptions.Builder();
}

public static OnConflictOptions.Builder newBuilder(OnConflictOptions options) {
return new OnConflictOptions.Builder(options);
}

public static OnConflictOptions getDefaultInstance() {
return DEFAULT_INSTANCE;
}

private static final OnConflictOptions DEFAULT_INSTANCE;

static {
DEFAULT_INSTANCE = OnConflictOptions.newBuilder().build();
}

private final boolean attachRequestId;
private final boolean attachCompletionCallbacks;
private final boolean attachLinks;

private OnConflictOptions(
boolean attachRequestId, boolean attachCompletionCallbacks, boolean attachLinks) {
this.attachRequestId = attachRequestId;
this.attachCompletionCallbacks = attachCompletionCallbacks;
this.attachLinks = attachLinks;
}

public boolean isAttachRequestId() {
return attachRequestId;
}

public boolean isAttachCompletionCallbacks() {
return attachCompletionCallbacks;
}

public boolean isAttachLinks() {
return attachLinks;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OnConflictOptions that = (OnConflictOptions) o;
return attachRequestId == that.attachRequestId
&& attachCompletionCallbacks == that.attachCompletionCallbacks
&& attachLinks == that.attachLinks;
}

@Override
public int hashCode() {
return Objects.hash(attachRequestId, attachCompletionCallbacks, attachLinks);
}

@Override
public String toString() {
return "OnConflictOptions{"
+ "attachRequestId="
+ attachRequestId
+ ", attachCompletionCallbacks="
+ attachCompletionCallbacks
+ ", attachLinks="
+ attachLinks
+ '}';
}

public static final class Builder {
private boolean attachRequestId;
private boolean attachCompletionCallbacks;
private boolean attachLinks;

public Builder(OnConflictOptions options) {
this.attachRequestId = options.attachRequestId;
this.attachCompletionCallbacks = options.attachCompletionCallbacks;
this.attachLinks = options.attachLinks;
}

public Builder() {}

/** Attaches the request ID to the running workflow. */
public Builder setAttachRequestId(boolean attachRequestId) {
this.attachRequestId = attachRequestId;
return this;
}

/**
* Attaches the completion callbacks to the running workflow. If true, AttachRequestId must be
* true.
*/
public Builder setAttachCompletionCallbacks(boolean attachCompletionCallbacks) {
this.attachCompletionCallbacks = attachCompletionCallbacks;
return this;
}

/** Attaches the links to the WorkflowExecutionOptionsUpdatedEvent history event. */
public Builder setAttachLinks(boolean attachLinks) {
this.attachLinks = attachLinks;
return this;
}

public OnConflictOptions build() {
if (attachCompletionCallbacks) {
Preconditions.checkState(
attachRequestId, "AttachRequestId must be true if AttachCompletionCallbacks is true");
}
return new OnConflictOptions(attachRequestId, attachCompletionCallbacks, attachLinks);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server doesn't allow attaching callbacks without also attaching a request ID (as of yesterday). I'm wondering if we should verify that SDK side and document it on the builder setters. We definitely should in the api (@rodrigozhou).

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public static WorkflowOptions merge(
.setRequestId(o.getRequestId())
.setCompletionCallbacks(o.getCompletionCallbacks())
.setLinks(o.getLinks())
.setOnConflictOptions(o.getOnConflictOptions())
.validateBuildWithDefaults();
}

Expand Down Expand Up @@ -129,6 +130,8 @@ public static final class Builder {

private List<Link> links;

private OnConflictOptions onConflictOptions;

private Builder() {}

private Builder(WorkflowOptions options) {
Expand All @@ -155,6 +158,7 @@ private Builder(WorkflowOptions options) {
this.requestId = options.requestId;
this.completionCallbacks = options.completionCallbacks;
this.links = options.links;
this.onConflictOptions = options.onConflictOptions;
}

/**
Expand Down Expand Up @@ -460,6 +464,20 @@ public Builder setLinks(List<Link> links) {
return this;
}

/**
* Set workflow ID conflict options used in conjunction with conflict policy
* WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING. If onConflictOptions is set and a workflow is
* already running, the options specifies the actions to be taken on the running workflow. If
* not set or use together with any other WorkflowIDConflictPolicy, this parameter is ignored.
*
* <p>WARNING: Not intended for User Code.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish there was a way to hide this. Maybe with a private subclass and cast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't be private because it needs to be accessible by the Internal package. Passing them through a thread local is an option, but puts a requirement on the calling thread. I think we should revisit before we GA

*/
@Experimental
public Builder setOnConflictOptions(OnConflictOptions onConflictOptions) {
this.onConflictOptions = onConflictOptions;
return this;
}

public WorkflowOptions build() {
return new WorkflowOptions(
workflowId,
Expand All @@ -481,7 +499,8 @@ public WorkflowOptions build() {
staticDetails,
requestId,
completionCallbacks,
links);
links,
onConflictOptions);
}

/**
Expand All @@ -508,7 +527,8 @@ public WorkflowOptions validateBuildWithDefaults() {
staticDetails,
requestId,
completionCallbacks,
links);
links,
onConflictOptions);
}
}

Expand Down Expand Up @@ -551,6 +571,7 @@ public WorkflowOptions validateBuildWithDefaults() {
private final List<Callback> completionCallbacks;

private final List<Link> links;
private final OnConflictOptions onConflictOptions;

private WorkflowOptions(
String workflowId,
Expand All @@ -572,7 +593,8 @@ private WorkflowOptions(
String staticDetails,
String requestId,
List<Callback> completionCallbacks,
List<Link> links) {
List<Link> links,
OnConflictOptions onConflictOptions) {
this.workflowId = workflowId;
this.workflowIdReusePolicy = workflowIdReusePolicy;
this.workflowRunTimeout = workflowRunTimeout;
Expand All @@ -593,6 +615,7 @@ private WorkflowOptions(
this.requestId = requestId;
this.completionCallbacks = completionCallbacks;
this.links = links;
this.onConflictOptions = onConflictOptions;
}

public String getWorkflowId() {
Expand Down Expand Up @@ -689,6 +712,11 @@ public String getStaticDetails() {
return staticDetails;
}

@Experimental
public @Nullable OnConflictOptions getOnConflictOptions() {
return onConflictOptions;
}

public Builder toBuilder() {
return new Builder(this);
}
Expand Down Expand Up @@ -717,7 +745,8 @@ public boolean equals(Object o) {
&& Objects.equal(staticDetails, that.staticDetails)
&& Objects.equal(requestId, that.requestId)
&& Objects.equal(completionCallbacks, that.completionCallbacks)
&& Objects.equal(links, that.links);
&& Objects.equal(links, that.links)
&& Objects.equal(onConflictOptions, that.onConflictOptions);
}

@Override
Expand All @@ -742,7 +771,8 @@ public int hashCode() {
staticDetails,
requestId,
completionCallbacks,
links);
links,
onConflictOptions);
}

@Override
Expand Down Expand Up @@ -791,6 +821,8 @@ public String toString() {
+ completionCallbacks
+ ", links="
+ links
+ ", onConflictOptions="
+ onConflictOptions
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.temporal.api.enums.v1.HistoryEventFilterType;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflow.v1.OnConflictOptions;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest;
Expand Down Expand Up @@ -100,6 +101,16 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
options.getLinks().forEach(request::addLinks);
}

if (options.getOnConflictOptions() != null) {
OnConflictOptions.Builder onConflictOptions =
OnConflictOptions.newBuilder()
.setAttachRequestId(options.getOnConflictOptions().isAttachRequestId())
.setAttachLinks(options.getOnConflictOptions().isAttachLinks())
.setAttachCompletionCallbacks(
options.getOnConflictOptions().isAttachCompletionCallbacks());
request.setOnConflictOptions(onConflictOptions);
}

String taskQueue = options.getTaskQueue();
if (taskQueue != null && !taskQueue.isEmpty()) {
request.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.temporal.api.common.v1.Callback;
import io.temporal.api.enums.v1.TaskQueueKind;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.client.OnConflictOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.metadata.POJOActivityMethodMetadata;
Expand Down Expand Up @@ -144,6 +145,12 @@ public static WorkflowStub createNexusBoundStub(
.filter(link -> link != null)
.collect(Collectors.toList()));
}
nexusWorkflowOptions.setOnConflictOptions(
OnConflictOptions.newBuilder()
.setAttachRequestId(true)
.setAttachLinks(true)
.setAttachCompletionCallbacks(true)
.build());
return stub.newInstance(nexusWorkflowOptions.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,46 +186,48 @@ private void cancelNexusOperationCommand() {
}

private void notifyStarted() {
if (!async) {
if (currentEvent.getEventType() != EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED) {
startedCallback.apply(Optional.empty(), null);
} else {
async = true;
String operationToken =
currentEvent.getNexusOperationStartedEventAttributes().getOperationToken();
String operationId =
currentEvent.getNexusOperationStartedEventAttributes().getOperationId();
startedCallback.apply(
Optional.of(operationToken.isEmpty() ? operationId : operationToken), null);
}
}
async = true;
String operationToken =
currentEvent.getNexusOperationStartedEventAttributes().getOperationToken();
// TODO(#2423) Remove support for operationId
String operationId = currentEvent.getNexusOperationStartedEventAttributes().getOperationId();
startedCallback.apply(
Optional.of(operationToken.isEmpty() ? operationId : operationToken), null);
}

private void notifyCompleted() {
notifyStarted();
NexusOperationCompletedEventAttributes attributes =
currentEvent.getNexusOperationCompletedEventAttributes();
if (!async) {
startedCallback.apply(Optional.empty(), null);
}
Comment on lines +201 to +203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just keep all of this logic in notifyStarted() but this is fine.

completionCallback.apply(Optional.of(attributes.getResult()), null);
}

private void notifyFailed() {
notifyStarted();
NexusOperationFailedEventAttributes attributes =
currentEvent.getNexusOperationFailedEventAttributes();
if (!async) {
startedCallback.apply(Optional.empty(), attributes.getFailure());
}
completionCallback.apply(Optional.empty(), attributes.getFailure());
}

private void notifyCanceled() {
notifyStarted();
NexusOperationCanceledEventAttributes attributes =
currentEvent.getNexusOperationCanceledEventAttributes();
if (!async) {
startedCallback.apply(Optional.empty(), attributes.getFailure());
}
completionCallback.apply(Optional.empty(), attributes.getFailure());
}

private void notifyTimedOut() {
notifyStarted();
NexusOperationTimedOutEventAttributes attributes =
currentEvent.getNexusOperationTimedOutEventAttributes();
if (!async) {
startedCallback.apply(Optional.empty(), attributes.getFailure());
}
completionCallback.apply(Optional.empty(), attributes.getFailure());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
public interface WorkflowHandleFactory<T, R> {
/**
* Invoked every operation start call and expected to return a workflow handle to a workflow stub
* through the provided {@link WorkflowClient}.
* created with the {@link WorkflowClient} provided by {@link
* NexusOperationContext#getWorkflowClient()}.
*/
@Nullable
WorkflowHandle<R> apply(OperationContext context, OperationStartDetails details, T input);
Expand Down
Loading
Loading