Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import kotlin.reflect.javaType
import kotlin.reflect.typeOf

/**
* Extracts Heartbeat details from the last failed attempt.
* Extracts heartbeat details from the last heartbeat of the current activity attempt or from the
* last failed attempt if no heartbeats were sent yet.
*
* @param T type of the Heartbeat details
* @see ActivityExecutionContext.getHeartbeatDetails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,56 @@ public interface ActivityExecutionContext {
<V> void heartbeat(V details) throws ActivityCompletionException;

/**
* Extracts Heartbeat details from the last failed attempt. This is used in combination with retry
* options. An Activity Execution could be scheduled with optional {@link
* io.temporal.common.RetryOptions} via {@link io.temporal.activity.ActivityOptions}. If an
* Activity Execution failed then the server would attempt to dispatch another Activity Task to
* retry the execution according to the retry options. If there were Heartbeat details reported by
* the last Activity Execution that failed, they would be delivered along with the Activity Task
* for the next retry attempt and can be extracted by the Activity implementation.
* Extracts Heartbeat details from the last heartbeat of this Activity Execution attempt. If there
* were no heartbeats in this attempt, details from the last failed attempt are returned instead.
* This is used in combination with retry options. An Activity Execution could be scheduled with
* optional {@link io.temporal.common.RetryOptions} via {@link
* io.temporal.activity.ActivityOptions}. If an Activity Execution failed then the server would
* attempt to dispatch another Activity Task to retry the execution according to the retry
* options. If there were Heartbeat details reported by the last Activity Execution that failed,
* they would be delivered along with the Activity Task for the next retry attempt and can be
* extracted by the Activity implementation.
*
* @param detailsClass Class of the Heartbeat details
*/
<V> Optional<V> getHeartbeatDetails(Class<V> detailsClass);

/**
* Extracts Heartbeat details from the last failed attempt. This is used in combination with retry
* options. An Activity Execution could be scheduled with optional {@link
* io.temporal.common.RetryOptions} via {@link io.temporal.activity.ActivityOptions}. If an
* Activity Execution failed then the server would attempt to dispatch another Activity Task to
* retry the execution according to the retry options. If there were Heartbeat details reported by
* the last Activity Execution that failed, the details would be delivered along with the Activity
* Task for the next retry attempt. The Activity implementation can extract the details via {@link
* #getHeartbeatDetails(Class)}() and resume progress.
* Extracts Heartbeat details from the last heartbeat of this Activity Execution attempt. If there
* were no heartbeats in this attempt, details from the last failed attempt are returned instead.
* It is useful in combination with retry options. An Activity Execution could be scheduled with
* optional {@link io.temporal.common.RetryOptions} via {@link
* io.temporal.activity.ActivityOptions}. If an Activity Execution failed then the server would
* attempt to dispatch another Activity Task to retry the execution according to the retry
* options. If there were Heartbeat details reported by the last Activity Execution that failed,
* the details would be delivered along with the Activity Task for the next retry attempt. The
* Activity implementation can extract the details via {@link #getHeartbeatDetails(Class)}() and
* resume progress.
*
* @param detailsClass Class of the Heartbeat details
* @param detailsGenericType Type of the Heartbeat details
*/
<V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);

/**
* Returns details from the last failed attempt of this Activity Execution. Unlike {@link
* #getHeartbeatDetails(Class)}, the returned details are not updated on every heartbeat call
* within the current attempt.
*
* @param detailsClass Class of the Heartbeat details
*/
<V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass);

/**
* Returns details from the last failed attempt of this Activity Execution. Unlike {@link
* #getHeartbeatDetails(Class, Type)}, the returned details are not updated on every heartbeat
* call within the current attempt.
*
* @param detailsClass Class of the Heartbeat details
* @param detailsGenericType Type of the Heartbeat details
*/
<V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);

/**
* Gets a correlation token that can be used to complete the Activity Execution asynchronously
* through {@link io.temporal.client.ActivityCompletionClient#complete(byte[], Object)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGe
return next.getHeartbeatDetails(detailsClass, detailsGenericType);
}

@Override
public <V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass) {
return next.getLastHeartbeatDetails(detailsClass);
}

@Override
public <V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType) {
return next.getLastHeartbeatDetails(detailsClass, detailsGenericType);
}

@Override
public byte[] getTaskToken() {
return next.getTaskToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,17 @@ public <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGe
return heartbeatContext.getHeartbeatDetails(detailsClass, detailsGenericType);
}

@Override
public <V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass) {
return getLastHeartbeatDetails(detailsClass, detailsClass);
}

@Override
@SuppressWarnings("unchecked")
public <V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType) {
return heartbeatContext.getLastHeartbeatDetails(detailsClass, detailsGenericType);
}

@Override
public byte[] getTaskToken() {
return info.getTaskToken();
Expand Down Expand Up @@ -153,7 +164,7 @@ public ActivityInfo getInfo() {

@Override
public Object getLastHeartbeatValue() {
return heartbeatContext.getLastHeartbeatDetails();
return heartbeatContext.getLatestHeartbeatDetails();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ interface HeartbeatContext {
*/
<V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);

Object getLastHeartbeatDetails();
/**
* @see io.temporal.activity.ActivityExecutionContext#getLastHeartbeatDetails(Class)
*/
<V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);

Object getLatestHeartbeatDetails();

/** Cancel any pending heartbeat and discard cached heartbeat details. */
void cancelOutstandingHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,24 @@ public <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGe
}
}

/**
* @see ActivityExecutionContext#getLastHeartbeatDetails(Class, Type)
*/
@Override
@SuppressWarnings("unchecked")
public <V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType) {
lock.lock();
try {
return Optional.ofNullable(
dataConverterWithActivityContext.fromPayloads(
0, prevAttemptHeartbeatDetails, detailsClass, detailsGenericType));
} finally {
lock.unlock();
}
}

@Override
public Object getLastHeartbeatDetails() {
public Object getLatestHeartbeatDetails() {
lock.lock();
try {
if (receivedAHeartbeat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ public <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGe
return Optional.empty();
}

@Override
public <V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass) {
return Optional.empty();
}

@Override
public <V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType) {
return Optional.empty();
}

@Override
public byte[] getTaskToken() {
throw new UnsupportedOperationException("getTaskToken is not supported for local activities");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -43,6 +44,10 @@ public static class HeartBeatingActivityImpl implements TestActivities.NoArgsAct
public void execute() {
// If the heartbeat details are "3", then we know that the last heartbeat was sent.
if (Activity.getExecutionContext().getHeartbeatDetails(String.class).orElse("").equals("3")) {
Activity.getExecutionContext().heartbeat("1");
// Verify that last heartbeat details don't change after a heartbeat
Assert.assertEquals(
"3", Activity.getExecutionContext().getLastHeartbeatDetails(String.class).orElse(""));
return;
}
// Send 3 heartbeats and then fail, expecting the last heartbeat to be sent
Expand Down
Loading