Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,21 @@ public ActivityTask poll() {
}
PollActivityTaskQueueResponse response;
SlotPermit permit;
SlotSupplierFuture future;
boolean isSuccessful = false;

try {
permit =
future =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
return null;
}
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
if (permit == null) return null;

try {
response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.worker.tuning.LocalActivitySlotInfo;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.worker.tuning.SlotReleaseReason;
import io.temporal.worker.tuning.SlotSupplierFuture;
import io.temporal.workflow.Functions;
import java.util.concurrent.*;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -83,22 +84,30 @@ private void processQueue() {
QueuedLARequest request = null;
try {
request = requestQueue.take();

SlotSupplierFuture future = slotSupplier.reserveSlot(request.data);
try {
slotPermit = slotSupplier.reserveSlot(request.data);
slotPermit = future.get();
} catch (InterruptedException e) {
SlotPermit maybePermitAnyway = future.abortReservation();
if (maybePermitAnyway != null) {
slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), maybePermitAnyway);
}
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
} catch (ExecutionException e) {
log.error(
"Error reserving local activity slot, dropped activity id {}",
request.task.getActivityId(),
e);
continue;
}

request.task.getExecutionContext().setPermit(slotPermit);
afterReservedCallback.apply(request.task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Throwable e) {
// Fail the workflow task if something went wrong executing the local activity (at the
// executor level, otherwise, the LA handler itself should be handling errors)
Expand All @@ -112,6 +121,11 @@ private void processQueue() {
LocalActivityResult.processingFailed(
executionContext.getActivityId(), request.task.getAttemptTask().getAttempt(), e));
}
if (e.getCause() instanceof InterruptedException) {
// It's possible the interrupt happens inside the callback, so check that as well.
Thread.currentThread().interrupt();
return;
}
}
}
}
Expand Down Expand Up @@ -162,11 +176,9 @@ public boolean isTerminated() {
@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
running = false;
if (requestQueue.isEmpty()) {
// Just interrupt the thread, so that if we're waiting on blocking take the thread will
// be interrupted and exit. Otherwise the loop will exit once the queue is empty.
queueThreadService.shutdownNow();
}
// Always interrupt. This won't cause any *tasks* to be interrupted, since the queue thread is
// only responsible for handing them out.
queueThreadService.shutdownNow();

return interruptTasks
? shutdownManager.shutdownExecutorNowUntimed(
Expand All @@ -182,6 +194,7 @@ public void awaitTermination(long timeout, TimeUnit unit) {
// timeout duration if no task was ever submitted.
return;
}

ShutdownManager.awaitTermination(queueThreadService, unit.toMillis(timeout));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,21 @@ public NexusTask poll() {
}
PollNexusTaskQueueResponse response;
SlotPermit permit;
SlotSupplierFuture future;
boolean isSuccessful = false;

try {
permit =
future =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
log.warn("Error while trying to reserve a slot for a nexus task", e.getCause());
return null;
}
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
if (permit == null) return null;

try {
response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import io.temporal.internal.common.GrpcUtils;
import io.temporal.internal.task.VirtualThreadDelegate;
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.worker.tuning.SlotReleaseReason;
import io.temporal.worker.tuning.SlotSupplierFuture;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.*;
Expand Down Expand Up @@ -222,6 +225,25 @@ public WorkerLifecycleState getLifecycleState() {
return WorkerLifecycleState.ACTIVE;
}

static SlotPermit getSlotPermitAndHandleInterrupts(
SlotSupplierFuture future, TrackingSlotSupplier<?> slotSupplier) {
SlotPermit permit;
try {
permit = future.get();
} catch (InterruptedException e) {
SlotPermit maybePermitAnyway = future.abortReservation();
if (maybePermitAnyway != null) {
slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), maybePermitAnyway);
}
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
log.warn("Error while trying to reserve a slot", e.getCause());
return null;
}
return permit;
}

@Override
public String toString() {
// TODO using pollThreadNamePrefix here is ugly. We should consider introducing some concept of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -48,14 +48,20 @@ public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
publishSlotsMetric();
}

public SlotPermit reserveSlot(SlotReservationData dat) throws InterruptedException {
SlotPermit p = inner.reserveSlot(createCtx(dat));
issuedSlots.incrementAndGet();
return p;
public SlotSupplierFuture reserveSlot(SlotReservationData data) {
final SlotSupplierFuture future;
try {
future = inner.reserveSlot(createCtx(data));
} catch (Exception e) {
throw new RuntimeException(e);
}

future.thenRun(issuedSlots::incrementAndGet);
return future;
}

public Optional<SlotPermit> tryReserveSlot(SlotReservationData dat) {
Optional<SlotPermit> p = inner.tryReserveSlot(createCtx(dat));
public Optional<SlotPermit> tryReserveSlot(SlotReservationData data) {
Optional<SlotPermit> p = inner.tryReserveSlot(createCtx(data));
if (p.isPresent()) {
issuedSlots.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import io.temporal.worker.tuning.SlotPermit;
import io.temporal.worker.tuning.SlotReleaseReason;
import io.temporal.worker.tuning.SlotSupplierFuture;
import io.temporal.worker.tuning.WorkflowSlotInfo;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -121,23 +124,24 @@ public WorkflowPollTask(
@Override
@SuppressWarnings("deprecation")
public WorkflowTask poll() {
boolean isSuccessful = false;
SlotPermit permit;
SlotSupplierFuture future;
boolean isSuccessful = false;
try {
permit =
future =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
log.warn("Error while trying to reserve a slot for workflow task", e.getCause());
log.warn("Error while trying to reserve a slot for a workflow", e.getCause());
return null;
}

permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
if (permit == null) return null;

TaskQueueKind taskQueueKind = stickyQueueBalancer.makePoll();
boolean isSticky = TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind);
PollWorkflowTaskQueueRequest request = isSticky ? stickyPollRequest : pollRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
package io.temporal.worker.tuning;

import com.google.common.base.Preconditions;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;

/**
* This implementation of {@link SlotSupplier} provides a fixed number of slots backed by a
Expand All @@ -32,18 +35,89 @@
*/
public class FixedSizeSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> {
private final int numSlots;
private final Semaphore executorSlotsSemaphore;
private final AsyncSemaphore executorSlotsSemaphore;

/**
* A simple version of an async semaphore. Unfortunately there's not any readily available
* properly licensed library I could find for this which is a bit shocking, but this
* implementation should be suitable for our needs
*/
static class AsyncSemaphore {
private final ReentrantLock lock = new ReentrantLock();
private final Queue<CompletableFuture<Void>> waiters = new ArrayDeque<>();
private int permits;

AsyncSemaphore(int initialPermits) {
this.permits = initialPermits;
}

/**
* Acquire a permit asynchronously. If a permit is available, returns a completed future,
* otherwise returns a future that will be completed when a permit is released.
*/
public CompletableFuture<Void> acquire() {
lock.lock();
try {
if (permits > 0) {
permits--;
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> waiter = new CompletableFuture<>();
waiters.add(waiter);
return waiter;
}
} finally {
lock.unlock();
}
}

public boolean tryAcquire() {
lock.lock();
try {
if (permits > 0) {
permits--;
return true;
}
return false;
} finally {
lock.unlock();
}
}

/**
* Release a permit. If there are waiting futures, completes the next one instead of
* incrementing the permit count.
*/
public void release() {
lock.lock();
try {
CompletableFuture<Void> waiter = waiters.poll();
if (waiter != null) {
if (!waiter.complete(null) && waiter.isCancelled()) {
// If this waiter was cancelled, we need to release another permit, since this waiter
// is now useless
release();
}
} else {
permits++;
}
} finally {
lock.unlock();
}
}
}

public FixedSizeSlotSupplier(int numSlots) {
Preconditions.checkArgument(numSlots > 0, "FixedSizeSlotSupplier must have at least one slot");
this.numSlots = numSlots;
executorSlotsSemaphore = new Semaphore(numSlots);
executorSlotsSemaphore = new AsyncSemaphore(numSlots);
}

@Override
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
executorSlotsSemaphore.acquire();
return new SlotPermit();
public SlotSupplierFuture reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
CompletableFuture<Void> slotFuture = executorSlotsSemaphore.acquire();
return SlotSupplierFuture.fromCompletableFuture(
slotFuture.thenApply(ignored -> new SlotPermit()), () -> slotFuture.cancel(true));
}

@Override
Expand Down
Loading
Loading