Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Functions.Proc scheduleLocalActivityTask(
ExecuteLocalActivityParameters parameters, LocalActivityCallback callback);

/**
* Start child workflow.
* Start a child workflow.
*
* @param parameters encapsulates all the information required to schedule a child workflow for
* execution
Expand All @@ -179,8 +179,20 @@ Functions.Proc1<Exception> startChildWorkflow(
Functions.Proc2<WorkflowExecution, Exception> startCallback,
Functions.Proc2<Optional<Payloads>, Exception> completionCallback);

/**
* Start a Nexus operation.
*
* @param attributes nexus operation attributes
* @param metadata user metadata to be associated with the operation.
* @param startedCallback callback that is called when the operation is start if async, or
* completes if it is sync.
* @param completionCallback callback that is called upon child workflow completion or failure
* @return cancellation handle. Invoke {@link io.temporal.workflow.Functions.Proc1#apply(Object)}
* to cancel activity task.
*/
Functions.Proc1<Exception> startNexusOperation(
ScheduleNexusOperationCommandAttributes attributes,
@Nullable UserMetadata metadata,
Functions.Proc2<Optional<String>, Failure> startedCallback,
Functions.Proc2<Optional<Payload>, Failure> completionCallback);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,12 @@ public Functions.Proc1<Exception> startChildWorkflow(
@Override
public Functions.Proc1<Exception> startNexusOperation(
ScheduleNexusOperationCommandAttributes attributes,
@Nullable UserMetadata metadata,
Functions.Proc2<Optional<String>, Failure> startedCallback,
Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
Functions.Proc cancellationHandler =
workflowStateMachines.startNexusOperation(attributes, startedCallback, completionCallback);
workflowStateMachines.startNexusOperation(
attributes, metadata, startedCallback, completionCallback);
return (exception) -> cancellationHandler.apply();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.failure.v1.NexusOperationFailureInfo;
import io.temporal.api.history.v1.*;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.workflow.Functions;
import java.util.Optional;
import javax.annotation.Nullable;

/**
* NexusOperationStateMachine manages a nexus operation.
Expand All @@ -46,6 +48,7 @@ final class NexusOperationStateMachine
private static final String NEXUS_OPERATION_CANCELED_MESSAGE = "Nexus operation canceled";

private ScheduleNexusOperationCommandAttributes scheduleAttributes;
private UserMetadata metadata;
private final Functions.Proc2<Optional<String>, Failure> startedCallback;
private boolean async = false;

Expand Down Expand Up @@ -240,22 +243,25 @@ private void notifyTimedOut() {
*/
public static NexusOperationStateMachine newInstance(
ScheduleNexusOperationCommandAttributes attributes,
@Nullable UserMetadata metadata,
Functions.Proc2<Optional<String>, Failure> startedCallback,
Functions.Proc2<Optional<Payload>, Failure> completionCallback,
Functions.Proc1<CancellableCommand> commandSink,
Functions.Proc1<StateMachine> stateMachineSink) {
return new NexusOperationStateMachine(
attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
attributes, metadata, startedCallback, completionCallback, commandSink, stateMachineSink);
}

private NexusOperationStateMachine(
ScheduleNexusOperationCommandAttributes attributes,
@Nullable UserMetadata metadata,
Functions.Proc2<Optional<String>, Failure> startedCallback,
Functions.Proc2<Optional<Payload>, Failure> completionCallback,
Functions.Proc1<CancellableCommand> commandSink,
Functions.Proc1<StateMachine> stateMachineSink) {
super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
this.scheduleAttributes = attributes;
this.metadata = metadata;
this.operation = attributes.getOperation();
this.service = attributes.getService();
this.endpoint = attributes.getEndpoint();
Expand All @@ -265,11 +271,16 @@ private NexusOperationStateMachine(
}

public void createScheduleNexusTaskCommand() {
addCommand(
Command.Builder command =
Command.newBuilder()
.setCommandType(CommandType.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION)
.setScheduleNexusOperationCommandAttributes(scheduleAttributes)
.build());
scheduleAttributes = null; // avoiding retaining large input for the duration of the operation
.setScheduleNexusOperationCommandAttributes(scheduleAttributes);
if (metadata != null) {
command.setUserMetadata(metadata);
}
addCommand(command.build());
// avoiding retaining large input for the duration of the operation
scheduleAttributes = null;
metadata = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -951,12 +951,18 @@ public Functions.Proc startChildWorkflow(

public Functions.Proc startNexusOperation(
ScheduleNexusOperationCommandAttributes attributes,
@Nullable UserMetadata metadata,
Functions.Proc2<Optional<String>, Failure> startedCallback,
Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
checkEventLoopExecuting();
NexusOperationStateMachine operation =
NexusOperationStateMachine.newInstance(
attributes, startedCallback, completionCallback, commandSink, stateMachineSink);
attributes,
metadata,
startedCallback,
completionCallback,
commandSink,
stateMachineSink);
return () -> {
if (operation.isCancellable()) {
operation.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,9 +823,15 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
attributes.setScheduleToCloseTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToCloseTimeout()));

@Nullable
UserMetadata userMetadata =
makeUserMetaData(
input.getOptions().getSummary(), null, dataConverterWithCurrentWorkflowContext);

Functions.Proc1<Exception> cancellationCallback =
replayContext.startNexusOperation(
attributes.build(),
userMetadata,
(operationExec, failure) -> {
if (failure != null) {
runner.executeInWorkflowThread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public static NexusOperationOptions getDefaultInstance() {

public static final class Builder {
private Duration scheduleToCloseTimeout;
private String summary;

/**
* Sets the schedule to close timeout for the Nexus operation.
Expand All @@ -65,17 +66,30 @@ public NexusOperationOptions.Builder setScheduleToCloseTimeout(
return this;
}

/**
* Single-line fixed summary for this Nexus operation that will appear in UI/CLI. This can be in
* single-line Temporal Markdown format.
*
* <p>Default is none/empty.
*/
@Experimental
public NexusOperationOptions.Builder setSummary(String summary) {
this.summary = summary;
return this;
}

private Builder() {}

private Builder(NexusOperationOptions options) {
if (options == null) {
return;
}
this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
this.summary = options.getSummary();
}

public NexusOperationOptions build() {
return new NexusOperationOptions(scheduleToCloseTimeout);
return new NexusOperationOptions(scheduleToCloseTimeout, summary);
}

public NexusOperationOptions.Builder mergeNexusOperationOptions(
Expand All @@ -87,39 +101,54 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(
(override.scheduleToCloseTimeout == null)
? this.scheduleToCloseTimeout
: override.scheduleToCloseTimeout;
this.summary = (override.summary == null) ? this.summary : override.summary;
return this;
}
}

private NexusOperationOptions(Duration scheduleToCloseTimeout) {
private NexusOperationOptions(Duration scheduleToCloseTimeout, String summary) {
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
this.summary = summary;
}

public NexusOperationOptions.Builder toBuilder() {
return new NexusOperationOptions.Builder(this);
}

private Duration scheduleToCloseTimeout;
private final Duration scheduleToCloseTimeout;
private final String summary;

public Duration getScheduleToCloseTimeout() {
return scheduleToCloseTimeout;
}

@Experimental
public String getSummary() {
return summary;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NexusOperationOptions that = (NexusOperationOptions) o;
return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout);
return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
&& Objects.equals(summary, that.summary);
}

@Override
public int hashCode() {
return Objects.hash(scheduleToCloseTimeout);
return Objects.hash(scheduleToCloseTimeout, summary);
}

@Override
public String toString() {
return "NexusOperationOptions{" + "scheduleToCloseTimeout=" + scheduleToCloseTimeout + '}';
return "NexusOperationOptions{"
+ "scheduleToCloseTimeout="
+ scheduleToCloseTimeout
+ ", summary='"
+ summary
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
builder
.<Optional<String>, Failure>add2(
(v, c) ->
stateMachines.startNexusOperation(scheduleAttributes, c, delayedCallback::run))
stateMachines.startNexusOperation(
scheduleAttributes, null, c, delayedCallback::run))
.add((v) -> stateMachines.requestCancelNexusOperation(cancelAttributes))
.<Optional<Payload>, Failure>add2((pair, c) -> delayedCallback.set(c))
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
newScheduleNexusOperationCommandAttributesBuilder();
builder
.<Optional<Payload>, Failure>add2(
(v, c) -> stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
(v, c) ->
stateMachines.startNexusOperation(attributes.build(), null, (o, f) -> {}, c))
.add(
(pair) ->
stateMachines.completeWorkflow(
Expand Down Expand Up @@ -167,7 +168,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
newScheduleNexusOperationCommandAttributesBuilder();
builder
.<Optional<Payload>, Failure>add2(
(v, c) -> stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
(v, c) ->
stateMachines.startNexusOperation(attributes.build(), null, (o, f) -> {}, c))
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));
}
}
Expand Down Expand Up @@ -238,7 +240,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
newScheduleNexusOperationCommandAttributesBuilder();
builder
.<Optional<Payload>, Failure>add2(
(v, c) -> stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
(v, c) ->
stateMachines.startNexusOperation(attributes.build(), null, (o, f) -> {}, c))
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));
}
}
Expand Down Expand Up @@ -309,7 +312,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
newScheduleNexusOperationCommandAttributesBuilder();
builder
.<Optional<Payload>, Failure>add2(
(v, c) -> stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
(v, c) ->
stateMachines.startNexusOperation(attributes.build(), null, (o, f) -> {}, c))
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));
}
}
Expand Down Expand Up @@ -383,7 +387,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
.<Optional<Payload>, Failure>add2(
(v, c) ->
cancellationHandler =
stateMachines.startNexusOperation(attributes.build(), (o, f) -> {}, c))
stateMachines.startNexusOperation(
attributes.build(), null, (o, f) -> {}, c))
.add((pair) -> stateMachines.failWorkflow(pair.getT2()));
// Immediate cancellation
builder.add((v) -> cancellationHandler.apply());
Expand Down Expand Up @@ -420,7 +425,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
builder
.<Optional<String>, Failure>add2(
(v, c) ->
stateMachines.startNexusOperation(attributes.build(), c, delayedCallback::run))
stateMachines.startNexusOperation(
attributes.build(), null, c, delayedCallback::run))
.<Optional<Payload>, Failure>add2(
(pair, c) -> {
Assert.assertEquals(OPERATION_ID, pair.getT1().get());
Expand Down Expand Up @@ -514,7 +520,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
builder
.<Optional<String>, Failure>add2(
(v, c) ->
stateMachines.startNexusOperation(attributes.build(), c, delayedCallback::run))
stateMachines.startNexusOperation(
attributes.build(), null, c, delayedCallback::run))
.<Optional<Payload>, Failure>add2(
(pair, c) -> {
Assert.assertEquals(OPERATION_ID, pair.getT1().get());
Expand Down Expand Up @@ -604,7 +611,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
builder
.<Optional<String>, Failure>add2(
(v, c) ->
stateMachines.startNexusOperation(attributes.build(), c, delayedCallback::run))
stateMachines.startNexusOperation(
attributes.build(), null, c, delayedCallback::run))
.<Optional<Payload>, Failure>add2(
(pair, c) -> {
Assert.assertEquals(OPERATION_ID, pair.getT1().get());
Expand Down Expand Up @@ -694,7 +702,8 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
builder
.<Optional<String>, Failure>add2(
(v, c) ->
stateMachines.startNexusOperation(attributes.build(), c, delayedCallback::run))
stateMachines.startNexusOperation(
attributes.build(), null, c, delayedCallback::run))
.<Optional<Payload>, Failure>add2(
(pair, c) -> {
Assert.assertEquals(OPERATION_ID, pair.getT1().get());
Expand Down
Loading
Loading