Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,36 @@ jobs:

- name: Start containerized server and dependencies
run: |
docker compose \
-f ./docker/github/docker-compose.yaml \
up -d temporal

wget https://github.com/temporalio/cli/releases/download/v1.3.1-priority.0/temporal_cli_1.3.1-priority.0_linux_amd64.tar.gz
tar -xzf temporal_cli_1.3.1-priority.0_linux_amd64.tar.gz
chmod +x temporal
./temporal server start-dev \
--headless \
--port 7233 \
--http-port 7243 \
--namespace UnitTest \
--search-attribute CustomKeywordField=Keyword \
--search-attribute CustomStringField=Text \
--search-attribute CustomTextField=Text \
--search-attribute CustomIntField=Int \
--search-attribute CustomDatetimeField=Datetime \
--search-attribute CustomDoubleField=Double \
--search-attribute CustomBoolField=Bool \
--dynamic-config-value system.forceSearchAttributesCacheRefreshOnRead=true \
--dynamic-config-value system.enableActivityEagerExecution=true \
--dynamic-config-value system.enableEagerWorkflowStart=true \
--dynamic-config-value system.enableExecuteMultiOperation=true \
--dynamic-config-value frontend.enableUpdateWorkflowExecutionAsyncAccepted=true \
--dynamic-config-value history.MaxBufferedQueryCount=100000 \
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
--dynamic-config-value worker.buildIdScavengerEnabled=true \
--dynamic-config-value frontend.workerVersioningRuleAPIs=true \
--dynamic-config-value worker.removableBuildIdDurationSinceDefault=true \
--dynamic-config-value matching.useNewMatcher=true \
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true &
sleep 10s

- name: Run unit tests
env:
USER: unittest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package io.temporal.activity;

import io.temporal.api.common.v1.Payloads;
import io.temporal.common.Experimental;
import io.temporal.common.Priority;
import java.time.Duration;
import java.util.Optional;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -139,4 +141,14 @@ public interface ActivityInfo {

/** Used to determine if the Activity Execution is a local Activity. */
boolean isLocal();

/**
* Return the priority of the activity task.
*
* @apiNote If unset or on an older server version, this method will return {@link
* Priority#getDefaultInstance()}.
*/
@Experimental
@Nonnull
Priority getPriority();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What situations is this nullable? Old servers only or always unless explicitly set? (just curious, though if confusing we should consider documenting the cases where it may be)

Also a bit confused on this class how a user can know whether a return is nullable or not. But given the confusion, best to be safe and make it clear in docs. I would consider Optional<Priority> if it can be nullable even on newer servers, but meh. Note this same concern does apply as much to user-set values like the activity options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it can be nullable we should bias towards tagging with @Nullable vs optional, it is the preferred approach in Java

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be nullable insofar as you might be using an old server. So, yeah I'll attach the annotation. I believe new servers will always attach the (default) value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be worth documenting that and verifying the test server does match the real server here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So server leaves this as null if unset which is the equivalent of default. In Go, if the server left it as null, we return the default object so I did the same here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to not do that as it will lie. The default values for priorty are not 0, and therefore we might lie and seem like they've been set to zero for a server that actually just doesn't understand priority

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero means default, so zero is not a lie

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@

import com.google.common.base.Objects;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.common.Experimental;
import io.temporal.common.MethodRetry;
import io.temporal.common.RetryOptions;
import io.temporal.common.VersioningIntent;
import io.temporal.common.*;
import io.temporal.common.context.ContextPropagator;
import io.temporal.failure.CanceledFailure;
import java.time.Duration;
Expand Down Expand Up @@ -66,6 +63,7 @@ public static final class Builder {
private boolean disableEagerExecution;
private VersioningIntent versioningIntent;
private String summary;
private Priority priority;

private Builder() {}

Expand All @@ -84,6 +82,7 @@ private Builder(ActivityOptions options) {
this.disableEagerExecution = options.disableEagerExecution;
this.versioningIntent = options.versioningIntent;
this.summary = options.summary;
this.priority = options.priority;
}

/**
Expand Down Expand Up @@ -195,13 +194,13 @@ public Builder setRetryOptions(RetryOptions retryOptions) {
* could make sense. <br>
* This is also why there is no equivalent method on {@link LocalActivityOptions}.
*
* @see <a href="https://github.com/temporalio/sdk-java/issues/490">Rejected feature reqest for
* LocalActivityOption#contextPropagators</a>
* @param contextPropagators specifies the list of context propagators to use during propagation
* from a workflow to the activity with these {@link ActivityOptions}. This list overrides
* the list specified on {@link
* io.temporal.client.WorkflowClientOptions#getContextPropagators()}, {@code null} means no
* overriding
* @see <a href="https://github.com/temporalio/sdk-java/issues/490">Rejected feature reqest for
* LocalActivityOption#contextPropagators</a>
*/
public Builder setContextPropagators(List<ContextPropagator> contextPropagators) {
this.contextPropagators = contextPropagators;
Expand Down Expand Up @@ -258,6 +257,18 @@ public Builder setSummary(String summary) {
return this;
}

/**
* Optional priority settings that control relative ordering of task processing when tasks are
* backed up in a queue.
*
* <p>Defaults to inheriting priority from the workflow that scheduled the activity.
*/
@Experimental
public Builder setPriority(Priority priority) {
this.priority = priority;
return this;
}

public Builder mergeActivityOptions(ActivityOptions override) {
if (override == null) {
return this;
Expand Down Expand Up @@ -290,6 +301,7 @@ public Builder mergeActivityOptions(ActivityOptions override) {
this.versioningIntent = override.versioningIntent;
}
this.summary = (override.summary == null) ? this.summary : override.summary;
this.priority = (override.priority == null) ? this.priority : override.priority;
return this;
}

Expand All @@ -313,7 +325,8 @@ public ActivityOptions build() {
cancellationType,
disableEagerExecution,
versioningIntent,
summary);
summary,
priority);
}

public ActivityOptions validateAndBuildWithDefaults() {
Expand All @@ -330,7 +343,8 @@ public ActivityOptions validateAndBuildWithDefaults() {
versioningIntent == null
? VersioningIntent.VERSIONING_INTENT_UNSPECIFIED
: versioningIntent,
summary);
summary,
priority);
}
}

Expand All @@ -345,6 +359,7 @@ public ActivityOptions validateAndBuildWithDefaults() {
private final boolean disableEagerExecution;
private final VersioningIntent versioningIntent;
private final String summary;
private final Priority priority;

private ActivityOptions(
Duration heartbeatTimeout,
Expand All @@ -357,7 +372,8 @@ private ActivityOptions(
ActivityCancellationType cancellationType,
boolean disableEagerExecution,
VersioningIntent versioningIntent,
String summary) {
String summary,
Priority priority) {
this.heartbeatTimeout = heartbeatTimeout;
this.scheduleToStartTimeout = scheduleToStartTimeout;
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
Expand All @@ -369,6 +385,7 @@ private ActivityOptions(
this.disableEagerExecution = disableEagerExecution;
this.versioningIntent = versioningIntent;
this.summary = summary;
this.priority = priority;
}

/**
Expand Down Expand Up @@ -443,6 +460,11 @@ public String getSummary() {
return summary;
}

@Experimental
public Priority getPriority() {
return priority;
}

public Builder toBuilder() {
return new Builder(this);
}
Expand All @@ -462,7 +484,8 @@ public boolean equals(Object o) {
&& Objects.equal(contextPropagators, that.contextPropagators)
&& disableEagerExecution == that.disableEagerExecution
&& versioningIntent == that.versioningIntent
&& Objects.equal(summary, that.summary);
&& Objects.equal(summary, that.summary)
&& Objects.equal(priority, that.priority);
}

@Override
Expand All @@ -478,7 +501,8 @@ public int hashCode() {
cancellationType,
disableEagerExecution,
versioningIntent,
summary);
summary,
priority);
}

@Override
Expand Down Expand Up @@ -507,6 +531,8 @@ public String toString() {
+ versioningIntent
+ ", summary="
+ summary
+ ", priority="
+ priority
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public static WorkflowOptions merge(
.setCompletionCallbacks(o.getCompletionCallbacks())
.setLinks(o.getLinks())
.setOnConflictOptions(o.getOnConflictOptions())
.setPriority(o.getPriority())
.validateBuildWithDefaults();
}

Expand Down Expand Up @@ -132,6 +133,8 @@ public static final class Builder {

private OnConflictOptions onConflictOptions;

private Priority priority;

private Builder() {}

private Builder(WorkflowOptions options) {
Expand Down Expand Up @@ -159,6 +162,7 @@ private Builder(WorkflowOptions options) {
this.completionCallbacks = options.completionCallbacks;
this.links = options.links;
this.onConflictOptions = options.onConflictOptions;
this.priority = options.priority;
}

/**
Expand Down Expand Up @@ -380,8 +384,8 @@ public Builder setContextPropagators(@Nullable List<ContextPropagator> contextPr
* <li>has available workflow task executor slots
* </ul>
*
* and such a {@link WorkflowClient} is used to start a workflow, then the first workflow task
* could be dispatched on this local worker with the response to the start call if Server
* <p>and such a {@link WorkflowClient} is used to start a workflow, then the first workflow
* task could be dispatched on this local worker with the response to the start call if Server
* supports it. This option can be used to disable this mechanism.
*
* <p>Default is true
Expand Down Expand Up @@ -478,6 +482,16 @@ public Builder setOnConflictOptions(OnConflictOptions onConflictOptions) {
return this;
}

/**
* Optional priority settings that control relative ordering of task processing when tasks are
* backed up in a queue.
*/
@Experimental
public Builder setPriority(Priority priority) {
this.priority = priority;
return this;
}

public WorkflowOptions build() {
return new WorkflowOptions(
workflowId,
Expand All @@ -500,7 +514,8 @@ public WorkflowOptions build() {
requestId,
completionCallbacks,
links,
onConflictOptions);
onConflictOptions,
priority);
}

/**
Expand Down Expand Up @@ -528,7 +543,8 @@ public WorkflowOptions validateBuildWithDefaults() {
requestId,
completionCallbacks,
links,
onConflictOptions);
onConflictOptions,
priority);
}
}

Expand Down Expand Up @@ -572,6 +588,7 @@ public WorkflowOptions validateBuildWithDefaults() {

private final List<Link> links;
private final OnConflictOptions onConflictOptions;
private final Priority priority;

private WorkflowOptions(
String workflowId,
Expand All @@ -594,7 +611,8 @@ private WorkflowOptions(
String requestId,
List<Callback> completionCallbacks,
List<Link> links,
OnConflictOptions onConflictOptions) {
OnConflictOptions onConflictOptions,
Priority priority) {
this.workflowId = workflowId;
this.workflowIdReusePolicy = workflowIdReusePolicy;
this.workflowRunTimeout = workflowRunTimeout;
Expand All @@ -616,6 +634,7 @@ private WorkflowOptions(
this.completionCallbacks = completionCallbacks;
this.links = links;
this.onConflictOptions = onConflictOptions;
this.priority = priority;
}

public String getWorkflowId() {
Expand Down Expand Up @@ -717,6 +736,11 @@ public String getStaticDetails() {
return onConflictOptions;
}

@Experimental
public Priority getPriority() {
return priority;
}

public Builder toBuilder() {
return new Builder(this);
}
Expand Down Expand Up @@ -746,7 +770,8 @@ public boolean equals(Object o) {
&& Objects.equal(requestId, that.requestId)
&& Objects.equal(completionCallbacks, that.completionCallbacks)
&& Objects.equal(links, that.links)
&& Objects.equal(onConflictOptions, that.onConflictOptions);
&& Objects.equal(onConflictOptions, that.onConflictOptions)
&& Objects.equal(priority, that.priority);
}

@Override
Expand All @@ -772,7 +797,8 @@ public int hashCode() {
requestId,
completionCallbacks,
links,
onConflictOptions);
onConflictOptions,
priority);
}

@Override
Expand Down Expand Up @@ -823,6 +849,8 @@ public String toString() {
+ links
+ ", onConflictOptions="
+ onConflictOptions
+ ", priority="
+ priority
+ '}';
}
}
Loading
Loading