Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import com.uber.m3.tally.Scope;
import io.grpc.StatusRuntimeException;
import io.nexusrpc.Header;
import io.nexusrpc.OperationException;
import io.nexusrpc.handler.*;
Expand All @@ -12,6 +13,7 @@
import io.temporal.api.nexus.v1.*;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowNotFoundException;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.interceptors.WorkerInterceptor;
import io.temporal.failure.ApplicationFailure;
Expand Down Expand Up @@ -203,6 +205,9 @@ private CancelOperationResponse handleCancelledOperation(
private void convertKnownFailures(Throwable e) {
Throwable failure = CheckedExceptionWrapper.unwrap(e);
if (failure instanceof WorkflowException) {
if (failure instanceof WorkflowNotFoundException) {
throw new HandlerException(HandlerException.ErrorType.NOT_FOUND, failure);
}
throw new HandlerException(HandlerException.ErrorType.BAD_REQUEST, failure);
}
if (failure instanceof ApplicationFailure) {
Expand All @@ -213,6 +218,10 @@ private void convertKnownFailures(Throwable e) {
HandlerException.RetryBehavior.NON_RETRYABLE);
}
}
if (failure instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) failure;
throw convertStatusRuntimeExceptionToHandlerException(statusRuntimeException);
}
if (failure instanceof Error) {
throw (Error) failure;
}
Expand All @@ -221,6 +230,45 @@ private void convertKnownFailures(Throwable e) {
: new RuntimeException(failure);
}

private HandlerException convertStatusRuntimeExceptionToHandlerException(
StatusRuntimeException sre) {
switch (sre.getStatus().getCode()) {
case INVALID_ARGUMENT:
return new HandlerException(HandlerException.ErrorType.BAD_REQUEST, sre);
case ALREADY_EXISTS:
case FAILED_PRECONDITION:
case OUT_OF_RANGE:
return new HandlerException(
HandlerException.ErrorType.INTERNAL, sre, HandlerException.RetryBehavior.NON_RETRYABLE);
case ABORTED:
case UNAVAILABLE:
return new HandlerException(HandlerException.ErrorType.UNAVAILABLE, sre);
case CANCELLED:
case DATA_LOSS:
case INTERNAL:
case UNKNOWN:
case UNAUTHENTICATED:
case PERMISSION_DENIED:
// Note that codes.Unauthenticated, codes.PermissionDenied have Nexus error types but we
// convert to internal
// because this is not a client auth error and happens when the handler fails to auth with
// Temporal and should
// be considered retryable.
return new HandlerException(HandlerException.ErrorType.INTERNAL, sre);
case NOT_FOUND:
return new HandlerException(HandlerException.ErrorType.NOT_FOUND, sre);
case RESOURCE_EXHAUSTED:
return new HandlerException(HandlerException.ErrorType.RESOURCE_EXHAUSTED, sre);
case UNIMPLEMENTED:
return new HandlerException(HandlerException.ErrorType.NOT_IMPLEMENTED, sre);
case DEADLINE_EXCEEDED:
return new HandlerException(HandlerException.ErrorType.UPSTREAM_TIMEOUT, sre);
default:
// If the status code is not recognized, we treat it as an internal error
return new HandlerException(HandlerException.ErrorType.INTERNAL, sre);
}
}

private OperationStartResult<HandlerResultContent> startOperation(
OperationContext context, OperationStartDetails details, HandlerInputContent input)
throws OperationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowNotFoundException;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.NexusOperationFailure;
import io.temporal.testing.internal.SDKTestWorkflowRule;
Expand Down Expand Up @@ -58,6 +60,24 @@ public void nexusOperationApplicationFailureFailureConversion() {
Assert.assertEquals(HandlerException.ErrorType.INTERNAL, handlerFailure.getErrorType());
}

@Test
public void nexusOperationWorkflowNotFoundFailureConversion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
WorkflowFailedException exception =
Assert.assertThrows(
WorkflowFailedException.class, () -> workflowStub.execute("WorkflowNotFound"));
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
Assert.assertTrue(nexusFailure.getCause() instanceof HandlerException);
HandlerException handlerFailure = (HandlerException) nexusFailure.getCause();
Assert.assertEquals(HandlerException.ErrorType.NOT_FOUND, handlerFailure.getErrorType());
Assert.assertTrue(handlerFailure.getCause() instanceof ApplicationFailure);
ApplicationFailure applicationFailure = (ApplicationFailure) handlerFailure.getCause();
Assert.assertEquals(
"io.temporal.client.WorkflowNotFoundException", applicationFailure.getType());
}

public static class TestNexus implements TestWorkflow1 {
@Override
public String execute(String testcase) {
Expand Down Expand Up @@ -96,6 +116,9 @@ public OperationHandler<String, String> operation() {
} else if (name.equals("ApplicationFailureNonRetryable")) {
throw ApplicationFailure.newNonRetryableFailure(
"failed to call operation", "TestFailure");
} else if (name.equals("WorkflowNotFound")) {
throw new WorkflowNotFoundException(
WorkflowExecution.getDefaultInstance(), "TestWorkflowType", null);
}
Assert.fail();
return "fail";
Expand Down
Loading