Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DistributedLock.Postgres/DistributedLock.Postgres.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>1.0.3</Version>
<Version>1.0.4</Version>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<Authors>Michael Adelson</Authors>
<Description>Provides a distributed lock implementation based on Postgresql</Description>
Expand Down
119 changes: 54 additions & 65 deletions DistributedLock.Postgres/PostgresAdvisoryLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@ namespace Medallion.Threading.Postgres
/// </summary>
internal class PostgresAdvisoryLock : IDbSynchronizationStrategy<object>
{
// matches SqlApplicationLock
private const int AlreadyHeldReturnCode = 103;
private static readonly object Cookie = new();

private static readonly object Cookie = new object();

public static readonly PostgresAdvisoryLock ExclusiveLock = new PostgresAdvisoryLock(isShared: false),
SharedLock = new PostgresAdvisoryLock(isShared: true);
public static readonly PostgresAdvisoryLock ExclusiveLock = new(isShared: false),
SharedLock = new(isShared: true);

private readonly bool _isShared;

Expand All @@ -40,7 +37,16 @@ private PostgresAdvisoryLock(bool isShared)
{
const string SavePointName = "medallion_threading_postgres_advisory_lock_acquire";

var key = new PostgresAdvisoryLockKey(resourceName);
PostgresAdvisoryLockKey key = new(resourceName);

if (connection.IsExernallyOwned
&& await this.IsHoldingLockAsync(connection, key, cancellationToken).ConfigureAwait(false))
{
if (timeout.IsZero) { return null; }
if (timeout.IsInfinite) { throw new DeadlockException("Attempted to acquire a lock that is already held on the same connection"); }
await SyncViaAsync.Delay(timeout, cancellationToken).ConfigureAwait(false);
return null;
}

var hasTransaction = await HasTransactionAsync(connection).ConfigureAwait(false);
if (hasTransaction)
Expand All @@ -58,10 +64,10 @@ private PostgresAdvisoryLock(bool isShared)

using var acquireCommand = this.CreateAcquireCommand(connection, key, timeout);

int acquireCommandResult;
object acquireCommandResult;
try
{
acquireCommandResult = (int)await acquireCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
acquireCommandResult = await acquireCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand All @@ -73,7 +79,14 @@ private PostgresAdvisoryLock(bool isShared)
{
// lock_timeout error code from https://www.postgresql.org/docs/10/errcodes-appendix.html
case "55P03":
return null;
// Even though we hit a lock timeout, an underlying race condition in Postgres means that we might actually
// have acquired the lock right before timing out. To account for this, we simply re-check whether we are
// holding the lock to determine the final return value. See https://github.com/madelson/DistributedLock/issues/147
// and https://www.postgresql.org/message-id/63573.1668271677%40sss.pgh.pa.us for more details.
// NOTE: we use CancellationToken.None for this check because if we ARE holding the lock it would be invalid to abort.
return await this.IsHoldingLockAsync(connection, key, CancellationToken.None).ConfigureAwait(false)
? Cookie
: null;
// deadlock_detected error code from https://www.postgresql.org/docs/10/errcodes-appendix.html
case "40P01":
throw new DeadlockException($"The request for the distributed lock failed with exit code '{postgresException.SqlState}' (deadlock_detected)", ex);
Expand All @@ -91,18 +104,13 @@ private PostgresAdvisoryLock(bool isShared)

await RollBackTransactionTimeoutVariablesIfNeededAsync().ConfigureAwait(false);

switch (acquireCommandResult)
return acquireCommandResult switch
{
case 0: return null;
case 1: return Cookie;
case AlreadyHeldReturnCode:
if (timeout.IsZero) { return null; }
if (timeout.IsInfinite) { throw new DeadlockException("Attempted to acquire a lock that is already held on the same connection"); }
await SyncViaAsync.Delay(timeout, cancellationToken).ConfigureAwait(false);
return null;
default:
throw new InvalidOperationException($"Unexpected return code {acquireCommandResult}");
}
DBNull _ => Cookie, // indicates we called pg_advisory_lock and not pg_try_advisory_lock
false => null,
true => Cookie,
_ => throw new InvalidOperationException($"Unexpected value '{acquireCommandResult}' from acquire command")
};

async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync()
{
Expand All @@ -116,6 +124,22 @@ async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync()
}
}

private async Task<bool> IsHoldingLockAsync(DatabaseConnection connection, PostgresAdvisoryLockKey key, CancellationToken cancellationToken)
{
using var command = connection.CreateCommand();
command.SetCommandText($@"
SELECT COUNT(*)
FROM pg_catalog.pg_locks l
JOIN pg_catalog.pg_database d
ON d.oid = l.database
WHERE l.locktype = 'advisory'
AND {AddPGLocksFilterParametersAndGetFilterExpression(command, key)}
AND l.pid = pg_catalog.pg_backend_pid()
AND d.datname = pg_catalog.current_database()"
);
return (long)await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false) != 0;
}

private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, PostgresAdvisoryLockKey key, TimeoutValue timeout)
{
var command = connection.CreateCommand();
Expand All @@ -128,55 +152,19 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post
// we'll be using the pg_try_advisory_lock function which doesn't block in that case.
commandText.AppendLine($"SET LOCAL lock_timeout = {(timeout.IsZero || timeout.IsInfinite ? 0 : timeout.InMilliseconds)};");

if (connection.IsExernallyOwned)
{
commandText.Append($@"
SELECT
CASE WHEN EXISTS(
SELECT *
FROM pg_catalog.pg_locks l
JOIN pg_catalog.pg_database d
ON d.oid = l.database
WHERE l.locktype = 'advisory'
AND {AddPGLocksFilterParametersAndGetFilterExpression(command, key)}
AND l.pid = pg_catalog.pg_backend_pid()
AND d.datname = pg_catalog.current_database()
)
THEN {AlreadyHeldReturnCode}
ELSE
"
);
AppendAcquireFunctionCall();
commandText.AppendLine().Append("END");
}
else
{
commandText.Append("SELECT ");
AppendAcquireFunctionCall();
}
commandText.Append(" AS result");
commandText.Append("SELECT ");
var isTry = timeout.IsZero;
commandText.Append("pg_catalog.pg");
if (isTry) { commandText.Append("_try"); }
commandText.Append("_advisory_lock");
if (this._isShared) { commandText.Append("_shared"); }
commandText.Append('(').Append(AddKeyParametersAndGetKeyArguments(command, key)).Append(')')
.Append(" AS result");

command.SetCommandText(commandText.ToString());
command.SetTimeout(timeout);

return command;

void AppendAcquireFunctionCall()
{
// creates an expression like
// pg_try_advisory_lock(@key1, @key2)::int
// OR (SELECT 1 FROM (SELECT pg_advisory_lock(@key)) f)
var isTry = timeout.IsZero;
if (!isTry) { commandText.Append("(SELECT 1 FROM (SELECT "); }
commandText.Append("pg_catalog.pg");
if (isTry) { commandText.Append("_try"); }
commandText.Append("_advisory");
commandText.Append("_lock");
if (this._isShared) { commandText.Append("_shared"); }
commandText.Append('(').Append(AddKeyParametersAndGetKeyArguments(command, key)).Append(')');
if (isTry) { commandText.Append("::int"); }
else { commandText.Append(") f)"); }
}
}

private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection connection)
Expand Down Expand Up @@ -249,6 +237,7 @@ private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseC
}
else
{
AddKeyParametersAndGetKeyArguments(command, key);
classIdParameter = "key1";
objIdParameter = "key2";
objSubId = "2";
Expand Down
2 changes: 1 addition & 1 deletion DistributedLock/DistributedLock.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>2.3.1</Version>
<Version>2.3.2</Version>
<AssemblyVersion>2.0.0.0</AssemblyVersion>
<Authors>Michael Adelson</Authors>
<Description>Provides easy-to-use mutexes, reader-writer locks, and semaphores that can synchronize across processes and machines. This is an umbrella package that brings in the entire family of DistributedLock.* packages (e. g. DistributedLock.SqlServer) as references. Those packages can also be installed individually.
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public class SomeService
Contributions are welcome! If you are interested in contributing towards a new or existing issue, please let me know via comments on the issue so that I can help you get started and avoid wasted effort on your part.

## Release notes
- 2.3.2
- Work around underlying Postgres race condition when waiting on advisory locks with a short non-zero timeout ([#147](https://github.com/madelson/DistributedLock/issues/147), DistributedLock.Postgres 1.0.4). Thanks @Tzachi009 for reporting and isolating the issue!
- 2.3.1
- Fixed concurrency issue with `HandleLostToken` for relational database locks ([#133](https://github.com/madelson/DistributedLock/issues/133), DistributedLock.Core 1.0.5, DistributedLock.MySql 1.0.1, DistributedLock.Oracle 1.0.1, DistributedLock.Postgres 1.0.3, DistributedLock.SqlServer 1.0.2). Thanks [@OskarKlintrot](https://github.com/OskarKlintrot) for testing!
- Fixed misleading error message why trying to disable auto-extension in Redis ([#130](https://github.com/madelson/DistributedLock/issues/130), DistributedLock.Redis 1.0.2)
Expand Down