Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/Pools/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,36 @@ class Connection
*/
protected ?Pool $pool = null;

protected float $checkedOutAt = 0;

/**
* @param TResource $resource
*/
public function __construct(protected mixed $resource)
{
}

/**
* Mark the connection as checked out (record timestamp)
*
* @return $this<TResource>
*/
public function markCheckedOut(): static
{
$this->checkedOutAt = microtime(true);
return $this;
}

/**
* Get the timestamp when this connection was checked out
*
* @return float
*/
public function getCheckedOutAt(): float
{
return $this->checkedOutAt;
}

/**
* @return string
*/
Expand Down
65 changes: 65 additions & 0 deletions src/Pools/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ class Pool
*/
protected int $synchronizedTimeout = 3; // seconds

/**
* Maximum time (seconds) a connection can be checked out before being
* considered leaked. 0 means disabled (no stale detection).
*
* @var int
*/
protected int $maxUseTime = 0;

protected PoolAdapter $pool;

/**
Expand Down Expand Up @@ -189,6 +197,26 @@ public function getSynchronizationTimeout(): int
return $this->synchronizedTimeout;
}

/**
* Set the maximum time a connection can be checked out before being
* considered leaked. When the pool is exhausted, connections exceeding
* this time will have their slots reclaimed, allowing new connections
* to be created.
*
* @param int $seconds Maximum use time in seconds. 0 to disable.
* @return $this
*/
public function setMaxUseTime(int $seconds): static
{
$this->maxUseTime = $seconds;
return $this;
}

public function getMaxUseTime(): int
{
return $this->maxUseTime;
}

/**
* @param Telemetry $telemetry
* @return $this<TResource>
Expand Down Expand Up @@ -272,6 +300,7 @@ public function pop(): Connection
if ($shouldCreateConnections) {
try {
$connection = $this->createConnection();
$connection->markCheckedOut();
$this->pool->synchronized(function () use ($connection) {
$this->active[$connection->getID()] = $connection;
});
Expand All @@ -289,6 +318,16 @@ public function pop(): Connection
$connection = $this->pool->pop($this->getSynchronizationTimeout());

if ($connection === false || $connection === null) {
// Before giving up, try to recover slots from leaked connections
if ($this->maxUseTime > 0) {
$recovered = $this->recoverStaleConnections();
if ($recovered > 0) {
// Slots freed, don't count this as a retry attempt
$attempts--;
continue;
}
}

if ($attempts >= $this->getRetryAttempts()) {
$activeCount = count($this->active);
$idleCount = $this->pool->count();
Expand All @@ -300,6 +339,7 @@ public function pop(): Connection
sleep($this->getRetrySleep());
} else {
if ($connection instanceof Connection) {
$connection->markCheckedOut();
$this->pool->synchronized(function () use ($connection) {
$this->active[$connection->getID()] = $connection;
});
Expand Down Expand Up @@ -466,6 +506,31 @@ public function isFull(): bool
return count($this->active) === 0;
}

/**
* Detect and recover slots from connections that have been checked out
* longer than maxUseTime (likely leaked). Returns the number of
* recovered slots.
*
* @return int Number of recovered connection slots
*/
private function recoverStaleConnections(): int
{
$now = microtime(true);
$recovered = 0;

return $this->pool->synchronized(function () use ($now, &$recovered): int {
foreach ($this->active as $id => $connection) {
$checkedOutAt = $connection->getCheckedOutAt();
if ($checkedOutAt > 0 && ($now - $checkedOutAt) > $this->maxUseTime) {
unset($this->active[$id]);
$this->connectionsCreated--;
$recovered++;
}
}
return $recovered;
});
}

private function recordPoolTelemetry(): void
{
$activeConnections = count($this->active);
Expand Down
69 changes: 69 additions & 0 deletions tests/Pools/Scopes/PoolTestScope.php
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,75 @@ public function testUseReclainsConnectionOnCallbackException(): void
});
}

public function testPoolRecoversStaleConnections(): void
{
$this->execute(function (): void {
$pool = new Pool($this->getAdapter(), 'test-stale', 2, fn () => 'resource');
$pool->setRetryAttempts(1);
$pool->setRetrySleep(0);
$pool->setMaxUseTime(1); // 1 second max use time

// Pop both connections to exhaust the pool
$conn1 = $pool->pop();
$conn2 = $pool->pop();

$this->assertSame(0, $pool->count());

// Simulate time passing by backdating the checkout timestamp
// so connections appear stale
$reflection = new \ReflectionProperty($conn1, 'checkedOutAt');
$reflection->setValue($conn1, microtime(true) - 2); // 2 seconds ago
$reflection = new \ReflectionProperty($conn2, 'checkedOutAt');
$reflection->setValue($conn2, microtime(true) - 2); // 2 seconds ago

// Without stale recovery, this would throw "Pool is empty"
// With stale recovery, it should detect the leaked connections,
// free their slots, and create a new connection
$conn3 = $pool->pop();
$this->assertSame('resource', $conn3->getResource());
});
}

public function testPoolDoesNotRecoverNonStaleConnections(): void
{
$this->execute(function (): void {
$pool = new Pool($this->getAdapter(), 'test-no-stale', 2, fn () => 'resource');
$pool->setRetryAttempts(1);
$pool->setRetrySleep(0);
$pool->setMaxUseTime(60); // 60 seconds - connections won't be stale

// Pop both connections
$conn1 = $pool->pop();
$conn2 = $pool->pop();

$this->assertSame(0, $pool->count());

// Connections are not stale (just checked out), so pool should still throw
$this->expectException(Exception::class);
$this->expectExceptionMessage("Pool 'test-no-stale' is empty");
$pool->pop();
});
}

public function testPoolStaleRecoveryDisabledByDefault(): void
{
$this->execute(function (): void {
$pool = new Pool($this->getAdapter(), 'test-disabled', 1, fn () => 'resource');
$pool->setRetryAttempts(1);
$pool->setRetrySleep(0);
// maxUseTime defaults to 0 (disabled)

$conn1 = $pool->pop();

// Even if we backdate the checkout time, stale recovery shouldn't trigger
$reflection = new \ReflectionProperty($conn1, 'checkedOutAt');
$reflection->setValue($conn1, microtime(true) - 100);

$this->expectException(Exception::class);
$pool->pop();
});
}

public function testPoolTelemetry(): void
{
$this->execute(function (): void {
Expand Down
Loading