Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.internal.client.external.GenericWorkflowClient;
import io.temporal.internal.common.HeaderUtils;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.payload.context.WorkflowSerializationContext;
import io.temporal.serviceclient.StatusUtils;
import io.temporal.worker.WorkflowTaskDispatchHandle;
Expand Down Expand Up @@ -95,6 +96,9 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
e);
}
}
if (CurrentNexusOperationContext.isNexusContext()) {
CurrentNexusOperationContext.get().setStartWorkflowResponseLink(response.getLink());
}
return new WorkflowStartOutput(execution);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
public final class CurrentNexusOperationContext {
private static final ThreadLocal<InternalNexusOperationContext> CURRENT = new ThreadLocal<>();

public static boolean isNexusContext() {
return CURRENT.get() != null;
}

public static InternalNexusOperationContext get() {
InternalNexusOperationContext result = CURRENT.get();
if (result == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.temporal.internal.nexus;

import com.uber.m3.tally.Scope;
import io.temporal.api.common.v1.Link;
import io.temporal.client.WorkflowClient;
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationContext;
Expand All @@ -11,6 +12,7 @@ public class InternalNexusOperationContext {
private final Scope metricScope;
private final WorkflowClient client;
NexusOperationOutboundCallsInterceptor outboundCalls;
Link startWorkflowResponseLink;

public InternalNexusOperationContext(
String namespace, String taskQueue, Scope metricScope, WorkflowClient client) {
Expand Down Expand Up @@ -47,6 +49,14 @@ public NexusOperationContext getUserFacingContext() {
return new NexusOperationContextImpl();
}

public void setStartWorkflowResponseLink(Link link) {
this.startWorkflowResponseLink = link;
}

public Link getStartWorkflowResponseLink() {
return startWorkflowResponseLink;
}

private class NexusOperationContextImpl implements NexusOperationContext {
@Override
public Scope getMetricsScope() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,23 @@ public OperationStartResult<R> start(

WorkflowExecution workflowExec = handle.getInvoker().invoke(nexusRequest);

// Create the link information about the new workflow and return to the caller.
// If the start workflow response returned a link use it, otherwise
// create the link information about the new workflow and return to the caller.
Link.WorkflowEvent workflowEventLink =
Link.WorkflowEvent.newBuilder()
.setNamespace(nexusCtx.getNamespace())
.setWorkflowId(workflowExec.getWorkflowId())
.setRunId(workflowExec.getRunId())
.setEventRef(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
.build();
nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent()
? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent()
: null;
if (workflowEventLink == null) {
workflowEventLink =
Link.WorkflowEvent.newBuilder()
.setNamespace(nexusCtx.getNamespace())
.setWorkflowId(workflowExec.getWorkflowId())
.setRunId(workflowExec.getRunId())
.setEventRef(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
.build();
}
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
// Generate the operation token for the new workflow.
String operationToken;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.api.common.v1.Link;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.nexus.Nexus;
import io.temporal.nexus.WorkflowRunOperation;
import io.temporal.testing.internal.SDKTestWorkflowRule;
Expand All @@ -14,6 +17,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.*;

public class WorkflowHandleUseExistingOnConflictTest {
Expand All @@ -31,6 +35,26 @@ public void testOnConflictUseExisting() {
String workflowId = UUID.randomUUID().toString();
String result = workflowStub.execute(workflowId);
Assert.assertEquals("Hello from operation workflow " + workflowId, result);

AtomicInteger eventRefLinkCount = new AtomicInteger();
AtomicInteger requestIdLinkCount = new AtomicInteger();
testWorkflowRule
.getHistoryEvents(
WorkflowStub.fromTyped(workflowStub).getExecution().getWorkflowId(),
EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED)
.forEach(
event -> {
List<Link> links = event.getLinksList();
Assert.assertEquals(1, links.size());
Link link = links.get(0);
if (link.getWorkflowEvent().hasEventRef()) {
eventRefLinkCount.getAndIncrement();
} else if (link.getWorkflowEvent().hasRequestIdRef()) {
requestIdLinkCount.getAndIncrement();
}
});
Assert.assertEquals(1, eventRefLinkCount.get());
Assert.assertEquals(4, requestIdLinkCount.get());
}

public static class TestNexus implements TestWorkflows.TestWorkflow1 {
Expand Down
Loading