Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ public boolean start() {
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
log.info("Starting async poller: {}", asyncTaskPoller.getLabel());
AdjustableSemaphore pollerSemaphore =
new AdjustableSemaphore(pollerBehavior.getInitialMaxConcurrentTaskPollers());
new AdjustableSemaphore(pollerBehavior.getInitialConcurrentTaskPollers());
PollScaleReportHandle<T> pollScaleReportHandle =
new PollScaleReportHandle<>(
pollerBehavior.getMinConcurrentTaskPollers(),
pollerBehavior.getMaxConcurrentTaskPollers(),
pollerBehavior.getInitialMaxConcurrentTaskPollers(),
pollerBehavior.getInitialConcurrentTaskPollers(),
(newTarget) -> {
log.debug(
"Updating maximum number of pollers for {} to: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.temporal.common.Experimental;
import java.util.Objects;
import javax.annotation.Nullable;

/**
* A poller behavior that will automatically scale the number of pollers based on feedback from the
Expand All @@ -16,17 +17,40 @@ public final class PollerBehaviorAutoscaling implements PollerBehavior {
private final int maxConcurrentTaskPollers;
private final int initialConcurrentTaskPollers;

/**
* Creates a new PollerBehaviorAutoscaling with default parameters.
*
* <p>Default parameters are:
*
* <ul>
* <li>minConcurrentTaskPollers = 1
* <li>maxConcurrentTaskPollers = 100
* <li>initialConcurrentTaskPollers = 5
*/
public PollerBehaviorAutoscaling() {
this(null, null, null);
}

/**
* Creates a new PollerBehaviorAutoscaling with the specified parameters.
*
* @param minConcurrentTaskPollers Minimum number of concurrent task pollers.
* @param maxConcurrentTaskPollers Maximum number of concurrent task pollers.
* @param initialConcurrentTaskPollers Initial number of concurrent task pollers.
* @param minConcurrentTaskPollers Minimum number of concurrent task pollers. Default is 1.
* @param maxConcurrentTaskPollers Maximum number of concurrent task pollers. Default is 100.
* @param initialConcurrentTaskPollers Initial number of concurrent task pollers. Default is 5.
*/
public PollerBehaviorAutoscaling(
int minConcurrentTaskPollers,
int maxConcurrentTaskPollers,
int initialConcurrentTaskPollers) {
@Nullable Integer minConcurrentTaskPollers,
@Nullable Integer maxConcurrentTaskPollers,
@Nullable Integer initialConcurrentTaskPollers) {
if (minConcurrentTaskPollers == null) {
minConcurrentTaskPollers = 1;
}
if (maxConcurrentTaskPollers == null) {
maxConcurrentTaskPollers = 100;
}
if (initialConcurrentTaskPollers == null) {
initialConcurrentTaskPollers = 5;
}
if (minConcurrentTaskPollers < 1) {
throw new IllegalArgumentException("minConcurrentTaskPollers must be at least 1");
}
Expand Down Expand Up @@ -67,7 +91,7 @@ public int getMaxConcurrentTaskPollers() {
*
* @return Initial number of concurrent task pollers.
*/
public int getInitialMaxConcurrentTaskPollers() {
public int getInitialConcurrentTaskPollers() {
return initialConcurrentTaskPollers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class CleanNexusWorkerShutdownTest {
public static Collection<PollerBehavior> data() {
return Arrays.asList(
new PollerBehavior[] {
new PollerBehaviorSimpleMaximum(10), new PollerBehaviorAutoscaling(1, 10, 5),
new PollerBehaviorSimpleMaximum(10), new PollerBehaviorAutoscaling(),
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.temporal.common.WorkerDeploymentVersion;
import io.temporal.worker.WorkerDeploymentOptions;
import io.temporal.worker.tuning.PollerBehavior;
import io.temporal.worker.tuning.PollerBehaviorAutoscaling;
import java.util.Collection;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -97,19 +96,62 @@ public WorkerDeploymentConfigurationProperties getDeploymentProperties() {
}

public static class PollerConfigurationProperties {
private final @Nullable PollerBehaviorAutoscaling pollerBehaviorAutoscaling;
public static class PollerBehaviorAutoscalingConfiguration {
private final Boolean enabled;
private final Integer minConcurrentTaskPollers;
private final Integer maxConcurrentTaskPollers;
private final Integer initialConcurrentTaskPollers;

@ConstructorBinding
public PollerBehaviorAutoscalingConfiguration(
@Nullable Boolean enabled,
@Nullable Integer minConcurrentTaskPollers,
@Nullable Integer maxConcurrentTaskPollers,
@Nullable Integer initialConcurrentTaskPollers) {
this.enabled = enabled;
this.minConcurrentTaskPollers = minConcurrentTaskPollers;
this.maxConcurrentTaskPollers = maxConcurrentTaskPollers;
this.initialConcurrentTaskPollers = initialConcurrentTaskPollers;
}

@Nullable
public Boolean isEnabled() {
// If enabled is true or any of the other parameters are set, then autoscaling is enabled.
return Boolean.TRUE.equals(enabled)
|| minConcurrentTaskPollers != null
|| maxConcurrentTaskPollers != null
|| initialConcurrentTaskPollers != null;
}

@Nullable
public Integer getMinConcurrentTaskPollers() {
return minConcurrentTaskPollers;
}

@Nullable
public Integer getMaxConcurrentTaskPollers() {
return maxConcurrentTaskPollers;
}

@Nullable
public Integer getInitialConcurrentTaskPollers() {
return initialConcurrentTaskPollers;
}
}

private final @Nullable PollerBehaviorAutoscalingConfiguration pollerBehaviorAutoscaling;

/**
* @param pollerBehaviorAutoscaling defines poller behavior for autoscaling
*/
@ConstructorBinding
public PollerConfigurationProperties(
@Nullable PollerBehaviorAutoscaling pollerBehaviorAutoscaling) {
@Nullable PollerBehaviorAutoscalingConfiguration pollerBehaviorAutoscaling) {
this.pollerBehaviorAutoscaling = pollerBehaviorAutoscaling;
}

@Nullable
public PollerBehaviorAutoscaling getPollerBehaviorAutoscaling() {
public PollerBehaviorAutoscalingConfiguration getPollerBehaviorAutoscaling() {
return pollerBehaviorAutoscaling;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.temporal.spring.boot.autoconfigure.properties.WorkerProperties;
import io.temporal.worker.WorkerDeploymentOptions;
import io.temporal.worker.WorkerOptions;
import io.temporal.worker.tuning.PollerBehaviorAutoscaling;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -50,25 +51,46 @@ WorkerOptions createWorkerOptions() {
Optional.ofNullable(threadsConfiguration.getMaxConcurrentNexusTaskPollers())
.ifPresent(options::setMaxConcurrentNexusTaskPollers);
if (threadsConfiguration.getWorkflowTaskPollersConfiguration() != null) {
Optional.ofNullable(
WorkerProperties.PollerConfigurationProperties.PollerBehaviorAutoscalingConfiguration
pollerBehaviorAutoscaling =
threadsConfiguration
.getWorkflowTaskPollersConfiguration()
.getPollerBehaviorAutoscaling())
.ifPresent(options::setWorkflowTaskPollersBehavior);
.getPollerBehaviorAutoscaling();
if (pollerBehaviorAutoscaling != null && pollerBehaviorAutoscaling.isEnabled()) {
options.setWorkflowTaskPollersBehavior(
new PollerBehaviorAutoscaling(
pollerBehaviorAutoscaling.getMinConcurrentTaskPollers(),
pollerBehaviorAutoscaling.getMaxConcurrentTaskPollers(),
pollerBehaviorAutoscaling.getInitialConcurrentTaskPollers()));
}
}
if (threadsConfiguration.getActivityTaskPollersConfiguration() != null) {
Optional.ofNullable(
WorkerProperties.PollerConfigurationProperties.PollerBehaviorAutoscalingConfiguration
pollerBehaviorAutoscaling =
threadsConfiguration
.getActivityTaskPollersConfiguration()
.getPollerBehaviorAutoscaling())
.ifPresent(options::setActivityTaskPollersBehavior);
.getPollerBehaviorAutoscaling();
if (pollerBehaviorAutoscaling != null && pollerBehaviorAutoscaling.isEnabled()) {
options.setActivityTaskPollersBehavior(
new PollerBehaviorAutoscaling(
pollerBehaviorAutoscaling.getMinConcurrentTaskPollers(),
pollerBehaviorAutoscaling.getMaxConcurrentTaskPollers(),
pollerBehaviorAutoscaling.getInitialConcurrentTaskPollers()));
}
}
if (threadsConfiguration.getNexusTaskPollersConfiguration() != null) {
Optional.ofNullable(
WorkerProperties.PollerConfigurationProperties.PollerBehaviorAutoscalingConfiguration
pollerBehaviorAutoscaling =
threadsConfiguration
.getNexusTaskPollersConfiguration()
.getPollerBehaviorAutoscaling())
.ifPresent(options::setNexusTaskPollersBehavior);
.getPollerBehaviorAutoscaling();
if (pollerBehaviorAutoscaling != null && pollerBehaviorAutoscaling.isEnabled()) {
options.setNexusTaskPollersBehavior(
new PollerBehaviorAutoscaling(
pollerBehaviorAutoscaling.getMinConcurrentTaskPollers(),
pollerBehaviorAutoscaling.getMaxConcurrentTaskPollers(),
pollerBehaviorAutoscaling.getInitialConcurrentTaskPollers()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,23 @@ public TemporalOptionsCustomizer<WorkerOptions.Builder> workerCustomizer() {
"Values from the Spring Config should be respected");
assertEquals(
5,
autoscaling.getInitialMaxConcurrentTaskPollers(),
autoscaling.getInitialConcurrentTaskPollers(),
"Values from the Spring Config should be respected");
assertNotNull(options.getActivityTaskPollersBehavior());
assertInstanceOf(
PollerBehaviorAutoscaling.class, options.getActivityTaskPollersBehavior());
autoscaling = (PollerBehaviorAutoscaling) options.getActivityTaskPollersBehavior();
assertEquals(
1,
options.getMaxConcurrentActivityTaskPollers(),
autoscaling.getMinConcurrentTaskPollers(),
"Values from the Spring Config should be respected");
assertEquals(
100,
autoscaling.getMaxConcurrentTaskPollers(),
"Values from the Spring Config should be respected");
assertEquals(
5,
autoscaling.getInitialConcurrentTaskPollers(),
"Values from the Spring Config should be respected");
assertEquals(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ spring:
max-concurrent-nexus-task-executors: 1
max-concurrent-activity-executors: 1
max-concurrent-local-activity-executors: 1
max-concurrent-activity-task-pollers: 1
max-concurrent-nexus-task-pollers: 1
workflow-task-pollers-configuration:
poller-behavior-autoscaling:
min-concurrent-task-pollers: 1
max-concurrent-task-pollers: 10
initial-concurrent-task-pollers: 5
activity-task-pollers-configuration:
poller-behavior-autoscaling:
enabled: true
rate-limits:
max-worker-activities-per-second: 1.0
max-task-queue-activities-per-second: 1.0
Expand Down
Loading