Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@

import io.temporal.activity.ActivityInterface;
import io.temporal.onboardings.domain.messages.commands.RegisterCrmEntityRequest;
import io.temporal.onboardings.domain.messages.commands.SyncToStorageRequest;

@ActivityInterface
public interface IntegrationsHandlers {
void registerCrmEntity(RegisterCrmEntityRequest cmd);

void syncToStorage(SyncToStorageRequest cmd);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,19 @@
import io.temporal.failure.ApplicationFailure;
import io.temporal.onboardings.domain.clients.crm.CrmClient;
import io.temporal.onboardings.domain.messages.commands.RegisterCrmEntityRequest;
import io.temporal.onboardings.domain.messages.commands.SyncToStorageRequest;
import io.temporal.onboardings.domain.messages.orchestrations.Errors;
import io.temporal.workflow.Workflow;
import java.net.ConnectException;
import org.slf4j.Logger;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpClientErrorException;

@Component("integrations-handlers")
public class IntegrationsHandlersImpl implements IntegrationsHandlers {
private final CrmClient crmClient;
Logger logger = Workflow.getLogger(IntegrationsHandlersImpl.class);

public IntegrationsHandlersImpl(CrmClient crmClient) {
this.crmClient = crmClient;
Expand All @@ -61,4 +65,9 @@ public void registerCrmEntity(RegisterCrmEntityRequest cmd) {
"Failed to connect with CRM service.", Errors.SERVICE_UNRECOVERABLE.name(), e);
}
}

@Override
public void syncToStorage(SyncToStorageRequest cmd) {
logger.info("Received syncToStorageRequest: {}", cmd);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.temporal.onboardings.domain.messages.commands;

import io.temporal.onboardings.domain.messages.queries.EntityOnboardingState;

public record SyncToStorageRequest(EntityOnboardingState onboardingState) {}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
import io.temporal.onboardings.domain.messages.commands.RejectEntityRequest;
import io.temporal.onboardings.domain.messages.orchestrations.OnboardEntityRequest;
import io.temporal.onboardings.domain.messages.queries.EntityOnboardingState;
import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.*;

@WorkflowInterface
public interface EntityOnboarding {
Expand All @@ -46,4 +43,7 @@ public interface EntityOnboarding {

@SignalMethod
void reject(RejectEntityRequest cmd);

@UpdateMethod
void forceSync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.ApplicationFailure;
import io.temporal.onboardings.domain.integrations.IntegrationsHandlers;
import io.temporal.onboardings.domain.messages.commands.ApproveEntityRequest;
import io.temporal.onboardings.domain.messages.commands.RegisterCrmEntityRequest;
import io.temporal.onboardings.domain.messages.commands.RejectEntityRequest;
import io.temporal.onboardings.domain.messages.commands.RequestDeputyOwnerApprovalRequest;
import io.temporal.onboardings.domain.messages.commands.*;
import io.temporal.onboardings.domain.messages.orchestrations.Errors;
import io.temporal.onboardings.domain.messages.orchestrations.OnboardEntityRequest;
import io.temporal.onboardings.domain.messages.queries.EntityOnboardingState;
import io.temporal.onboardings.domain.messages.values.Approval;
import io.temporal.onboardings.domain.messages.values.ApprovalStatus;
import io.temporal.onboardings.domain.notifications.NotificationsHandlers;
import io.temporal.workflow.CancellationScope;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInit;
import java.time.Duration;
Expand All @@ -56,6 +54,7 @@
public class EntityOnboardingImpl implements EntityOnboarding {
Logger logger = Workflow.getLogger(EntityOnboardingImpl.class);
private EntityOnboardingState state;
private boolean shouldSync;
private final IntegrationsHandlers integrationsHandlers =
Workflow.newActivityStub(
IntegrationsHandlers.class,
Expand Down Expand Up @@ -98,6 +97,13 @@ public void execute(OnboardEntityRequest args) {

assertValidArgs(args);

// Sync the Things
// THIS WILL RAISE A NDE!
// Run replay test to see how that looks
CancellationScope syncScope = startSyncToStorage();

syncScope.run();

if (!args.skipApproval()) {
var waitApprovalSecs = args.completionTimeoutSeconds();
if (notifyDeputyOwner) {
Expand Down Expand Up @@ -159,7 +165,32 @@ public void execute(OnboardEntityRequest args) {
}
// be sure to check that all handlers have been completed before exit
Workflow.await(Workflow::isEveryHandlerFinished);
}

// startSyncToStorage
// Kick off the application state sync asynchronously.
// This is akin to dirty checking inside an OR/M IdentityMap implementation
// Just mark the workflow as `shouldSync` and the condition unblocks to perform the work.
private CancellationScope startSyncToStorage() {
CancellationScope syncScope =
Workflow.newCancellationScope(
() -> {
while (true) {
try {

// calling this BEFORE the condition effectively dumps our state to
// storage before waiting to be told to resync
integrationsHandlers.syncToStorage(new SyncToStorageRequest(this.state));
shouldSync = false;
} catch (ActivityFailure e) {
// maybe we can count the number of times we are willing to fail to report
// that our Workflow state is out of sync now
logger.warn("Sync failed", e);
}
Workflow.await(() -> shouldSync);
}
});
return syncScope;
}

@Override
Expand All @@ -185,6 +216,11 @@ public void reject(RejectEntityRequest cmd) {
state.id(), state.currentValue(), new Approval(ApprovalStatus.REJECTED, cmd.comment()));
}

@Override
public void forceSync() {
shouldSync = true;
}

private void assertValidArgs(OnboardEntityRequest args) {
if (args.id() == null
|| args.id().isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public void reject(RejectEntityRequest cmd) {
state.id(), state.currentValue(), new Approval(ApprovalStatus.REJECTED, cmd.comment()));
}

@Override
public void forceSync() {
throw ApplicationFailure.newFailure("not implemented", "NOT_IMPLEMENTED");
}

private void assertValidArgs(OnboardEntityRequest args) {
if (args.id() == null
|| args.id().isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import io.temporal.onboardings.domain.integrations.IntegrationsHandlers;
import io.temporal.onboardings.domain.messages.commands.ApproveEntityRequest;
import io.temporal.onboardings.domain.messages.commands.RejectEntityRequest;
import io.temporal.onboardings.domain.messages.commands.SyncToStorageRequest;
import io.temporal.onboardings.domain.messages.orchestrations.Errors;
import io.temporal.onboardings.domain.messages.orchestrations.OnboardEntityRequest;
import io.temporal.onboardings.domain.messages.queries.EntityOnboardingState;
import io.temporal.onboardings.domain.messages.values.Approval;
import io.temporal.onboardings.domain.messages.values.ApprovalStatus;
import io.temporal.onboardings.domain.notifications.NotificationsHandlers;
import io.temporal.testing.TestWorkflowEnvironment;
Expand Down Expand Up @@ -234,6 +236,27 @@ public void execute_givenInvalidArgs_itShouldFailWorkflow() {
Errors.INVALID_ARGS.name(), ((ApplicationFailure) e.getCause()).getType());
}

// behavior verification
@Test
public void givenValidArgsWithOwnerApprovalNoDeputyOwner_itShouldSyncToStorage() {
String wfId = UUID.randomUUID().toString();
var args = new OnboardEntityRequest(wfId, UUID.randomUUID().toString(), 4, null, false);
EntityOnboarding sut =
workflowClient.newWorkflowStub(
EntityOnboarding.class,
WorkflowOptions.newBuilder().setWorkflowId(args.id()).setTaskQueue(taskQueue).build());
WorkflowClient.start(sut::execute, args);
testWorkflowEnvironment.sleep(Duration.ofSeconds(1));
sut.forceSync();
verify(notificationsHandlers, never()).requestDeputyOwnerApproval(any());

verify(integrationsHandlers, times(2))
.syncToStorage(
new SyncToStorageRequest(
new EntityOnboardingState(
args.id(), args.value(), new Approval(ApprovalStatus.PENDING, null))));
}

@ComponentScan
public static class Configuration {
@MockBean private NotificationsHandlers notificationsHandlersMock;
Expand Down
Loading