diff --git a/DistributedLock.Postgres/DistributedLock.Postgres.csproj b/DistributedLock.Postgres/DistributedLock.Postgres.csproj index 951424d4..444143e3 100644 --- a/DistributedLock.Postgres/DistributedLock.Postgres.csproj +++ b/DistributedLock.Postgres/DistributedLock.Postgres.csproj @@ -10,7 +10,7 @@ - 1.0.3 + 1.0.4 1.0.0.0 Michael Adelson Provides a distributed lock implementation based on Postgresql diff --git a/DistributedLock.Postgres/PostgresAdvisoryLock.cs b/DistributedLock.Postgres/PostgresAdvisoryLock.cs index 2e6a2b05..6d83dcf2 100644 --- a/DistributedLock.Postgres/PostgresAdvisoryLock.cs +++ b/DistributedLock.Postgres/PostgresAdvisoryLock.cs @@ -16,13 +16,10 @@ namespace Medallion.Threading.Postgres /// internal class PostgresAdvisoryLock : IDbSynchronizationStrategy { - // matches SqlApplicationLock - private const int AlreadyHeldReturnCode = 103; + private static readonly object Cookie = new(); - private static readonly object Cookie = new object(); - - public static readonly PostgresAdvisoryLock ExclusiveLock = new PostgresAdvisoryLock(isShared: false), - SharedLock = new PostgresAdvisoryLock(isShared: true); + public static readonly PostgresAdvisoryLock ExclusiveLock = new(isShared: false), + SharedLock = new(isShared: true); private readonly bool _isShared; @@ -40,7 +37,16 @@ private PostgresAdvisoryLock(bool isShared) { const string SavePointName = "medallion_threading_postgres_advisory_lock_acquire"; - var key = new PostgresAdvisoryLockKey(resourceName); + PostgresAdvisoryLockKey key = new(resourceName); + + if (connection.IsExernallyOwned + && await this.IsHoldingLockAsync(connection, key, cancellationToken).ConfigureAwait(false)) + { + if (timeout.IsZero) { return null; } + if (timeout.IsInfinite) { throw new DeadlockException("Attempted to acquire a lock that is already held on the same connection"); } + await SyncViaAsync.Delay(timeout, cancellationToken).ConfigureAwait(false); + return null; + } var hasTransaction = await HasTransactionAsync(connection).ConfigureAwait(false); if (hasTransaction) @@ -58,10 +64,10 @@ private PostgresAdvisoryLock(bool isShared) using var acquireCommand = this.CreateAcquireCommand(connection, key, timeout); - int acquireCommandResult; + object acquireCommandResult; try { - acquireCommandResult = (int)await acquireCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); + acquireCommandResult = await acquireCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false); } catch (Exception ex) { @@ -73,7 +79,14 @@ private PostgresAdvisoryLock(bool isShared) { // lock_timeout error code from https://www.postgresql.org/docs/10/errcodes-appendix.html case "55P03": - return null; + // Even though we hit a lock timeout, an underlying race condition in Postgres means that we might actually + // have acquired the lock right before timing out. To account for this, we simply re-check whether we are + // holding the lock to determine the final return value. See https://github.com/madelson/DistributedLock/issues/147 + // and https://www.postgresql.org/message-id/63573.1668271677%40sss.pgh.pa.us for more details. + // NOTE: we use CancellationToken.None for this check because if we ARE holding the lock it would be invalid to abort. + return await this.IsHoldingLockAsync(connection, key, CancellationToken.None).ConfigureAwait(false) + ? Cookie + : null; // deadlock_detected error code from https://www.postgresql.org/docs/10/errcodes-appendix.html case "40P01": throw new DeadlockException($"The request for the distributed lock failed with exit code '{postgresException.SqlState}' (deadlock_detected)", ex); @@ -91,18 +104,13 @@ private PostgresAdvisoryLock(bool isShared) await RollBackTransactionTimeoutVariablesIfNeededAsync().ConfigureAwait(false); - switch (acquireCommandResult) + return acquireCommandResult switch { - case 0: return null; - case 1: return Cookie; - case AlreadyHeldReturnCode: - if (timeout.IsZero) { return null; } - if (timeout.IsInfinite) { throw new DeadlockException("Attempted to acquire a lock that is already held on the same connection"); } - await SyncViaAsync.Delay(timeout, cancellationToken).ConfigureAwait(false); - return null; - default: - throw new InvalidOperationException($"Unexpected return code {acquireCommandResult}"); - } + DBNull _ => Cookie, // indicates we called pg_advisory_lock and not pg_try_advisory_lock + false => null, + true => Cookie, + _ => throw new InvalidOperationException($"Unexpected value '{acquireCommandResult}' from acquire command") + }; async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync() { @@ -116,6 +124,22 @@ async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync() } } + private async Task IsHoldingLockAsync(DatabaseConnection connection, PostgresAdvisoryLockKey key, CancellationToken cancellationToken) + { + using var command = connection.CreateCommand(); + command.SetCommandText($@" + SELECT COUNT(*) + FROM pg_catalog.pg_locks l + JOIN pg_catalog.pg_database d + ON d.oid = l.database + WHERE l.locktype = 'advisory' + AND {AddPGLocksFilterParametersAndGetFilterExpression(command, key)} + AND l.pid = pg_catalog.pg_backend_pid() + AND d.datname = pg_catalog.current_database()" + ); + return (long)await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false) != 0; + } + private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, PostgresAdvisoryLockKey key, TimeoutValue timeout) { var command = connection.CreateCommand(); @@ -128,55 +152,19 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post // we'll be using the pg_try_advisory_lock function which doesn't block in that case. commandText.AppendLine($"SET LOCAL lock_timeout = {(timeout.IsZero || timeout.IsInfinite ? 0 : timeout.InMilliseconds)};"); - if (connection.IsExernallyOwned) - { - commandText.Append($@" - SELECT - CASE WHEN EXISTS( - SELECT * - FROM pg_catalog.pg_locks l - JOIN pg_catalog.pg_database d - ON d.oid = l.database - WHERE l.locktype = 'advisory' - AND {AddPGLocksFilterParametersAndGetFilterExpression(command, key)} - AND l.pid = pg_catalog.pg_backend_pid() - AND d.datname = pg_catalog.current_database() - ) - THEN {AlreadyHeldReturnCode} - ELSE - " - ); - AppendAcquireFunctionCall(); - commandText.AppendLine().Append("END"); - } - else - { - commandText.Append("SELECT "); - AppendAcquireFunctionCall(); - } - commandText.Append(" AS result"); + commandText.Append("SELECT "); + var isTry = timeout.IsZero; + commandText.Append("pg_catalog.pg"); + if (isTry) { commandText.Append("_try"); } + commandText.Append("_advisory_lock"); + if (this._isShared) { commandText.Append("_shared"); } + commandText.Append('(').Append(AddKeyParametersAndGetKeyArguments(command, key)).Append(')') + .Append(" AS result"); command.SetCommandText(commandText.ToString()); command.SetTimeout(timeout); return command; - - void AppendAcquireFunctionCall() - { - // creates an expression like - // pg_try_advisory_lock(@key1, @key2)::int - // OR (SELECT 1 FROM (SELECT pg_advisory_lock(@key)) f) - var isTry = timeout.IsZero; - if (!isTry) { commandText.Append("(SELECT 1 FROM (SELECT "); } - commandText.Append("pg_catalog.pg"); - if (isTry) { commandText.Append("_try"); } - commandText.Append("_advisory"); - commandText.Append("_lock"); - if (this._isShared) { commandText.Append("_shared"); } - commandText.Append('(').Append(AddKeyParametersAndGetKeyArguments(command, key)).Append(')'); - if (isTry) { commandText.Append("::int"); } - else { commandText.Append(") f)"); } - } } private static async ValueTask HasTransactionAsync(DatabaseConnection connection) @@ -249,6 +237,7 @@ private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseC } else { + AddKeyParametersAndGetKeyArguments(command, key); classIdParameter = "key1"; objIdParameter = "key2"; objSubId = "2"; diff --git a/DistributedLock/DistributedLock.csproj b/DistributedLock/DistributedLock.csproj index 536c39fd..74129dcc 100644 --- a/DistributedLock/DistributedLock.csproj +++ b/DistributedLock/DistributedLock.csproj @@ -10,7 +10,7 @@ - 2.3.1 + 2.3.2 2.0.0.0 Michael Adelson Provides easy-to-use mutexes, reader-writer locks, and semaphores that can synchronize across processes and machines. This is an umbrella package that brings in the entire family of DistributedLock.* packages (e. g. DistributedLock.SqlServer) as references. Those packages can also be installed individually. diff --git a/README.md b/README.md index 8c83ac00..9aca9b10 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,8 @@ public class SomeService Contributions are welcome! If you are interested in contributing towards a new or existing issue, please let me know via comments on the issue so that I can help you get started and avoid wasted effort on your part. ## Release notes +- 2.3.2 + - Work around underlying Postgres race condition when waiting on advisory locks with a short non-zero timeout ([#147](https://github.com/madelson/DistributedLock/issues/147), DistributedLock.Postgres 1.0.4). Thanks @Tzachi009 for reporting and isolating the issue! - 2.3.1 - Fixed concurrency issue with `HandleLostToken` for relational database locks ([#133](https://github.com/madelson/DistributedLock/issues/133), DistributedLock.Core 1.0.5, DistributedLock.MySql 1.0.1, DistributedLock.Oracle 1.0.1, DistributedLock.Postgres 1.0.3, DistributedLock.SqlServer 1.0.2). Thanks [@OskarKlintrot](https://github.com/OskarKlintrot) for testing! - Fixed misleading error message why trying to disable auto-extension in Redis ([#130](https://github.com/madelson/DistributedLock/issues/130), DistributedLock.Redis 1.0.2)