Skip to content

Commit

Permalink
Make postgres locks robust to underlying postgres race condition with…
Browse files Browse the repository at this point in the history
… lock_timeout.

Fix #147
  • Loading branch information
madelson committed Nov 12, 2022
1 parent 865aa38 commit 746fbf5
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 67 deletions.
2 changes: 1 addition & 1 deletion DistributedLock.Postgres/DistributedLock.Postgres.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>1.0.3</Version>
<Version>1.0.4</Version>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<Authors>Michael Adelson</Authors>
<Description>Provides a distributed lock implementation based on Postgresql</Description>
Expand Down
119 changes: 54 additions & 65 deletions DistributedLock.Postgres/PostgresAdvisoryLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@ namespace Medallion.Threading.Postgres
/// </summary>
internal class PostgresAdvisoryLock : IDbSynchronizationStrategy<object>
{
// matches SqlApplicationLock
private const int AlreadyHeldReturnCode = 103;
private static readonly object Cookie = new();

private static readonly object Cookie = new object();

public static readonly PostgresAdvisoryLock ExclusiveLock = new PostgresAdvisoryLock(isShared: false),
SharedLock = new PostgresAdvisoryLock(isShared: true);
public static readonly PostgresAdvisoryLock ExclusiveLock = new(isShared: false),
SharedLock = new(isShared: true);

private readonly bool _isShared;

Expand All @@ -40,7 +37,16 @@ private PostgresAdvisoryLock(bool isShared)
{
const string SavePointName = "medallion_threading_postgres_advisory_lock_acquire";

var key = new PostgresAdvisoryLockKey(resourceName);
PostgresAdvisoryLockKey key = new(resourceName);

if (connection.IsExernallyOwned
&& await this.IsHoldingLockAsync(connection, key, cancellationToken).ConfigureAwait(false))
{
if (timeout.IsZero) { return null; }
if (timeout.IsInfinite) { throw new DeadlockException("Attempted to acquire a lock that is already held on the same connection"); }
await SyncViaAsync.Delay(timeout, cancellationToken).ConfigureAwait(false);
return null;
}

var hasTransaction = await HasTransactionAsync(connection).ConfigureAwait(false);
if (hasTransaction)
Expand All @@ -58,10 +64,10 @@ private PostgresAdvisoryLock(bool isShared)

using var acquireCommand = this.CreateAcquireCommand(connection, key, timeout);

int acquireCommandResult;
object acquireCommandResult;
try
{
acquireCommandResult = (int)await acquireCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
acquireCommandResult = await acquireCommand.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand All @@ -73,7 +79,14 @@ private PostgresAdvisoryLock(bool isShared)
{
// lock_timeout error code from https://www.postgresql.org/docs/10/errcodes-appendix.html
case "55P03":
return null;
// Even though we hit a lock timeout, an underlying race condition in Postgres means that we might actually
// have acquired the lock right before timing out. To account for this, we simply re-check whether we are
// holding the lock to determine the final return value. See https://github.com/madelson/DistributedLock/issues/147
// and https://www.postgresql.org/message-id/63573.1668271677%40sss.pgh.pa.us for more details.
// NOTE: we use CancellationToken.None for this check because if we ARE holding the lock it would be invalid to abort.
return await this.IsHoldingLockAsync(connection, key, CancellationToken.None).ConfigureAwait(false)
? Cookie
: null;
// deadlock_detected error code from https://www.postgresql.org/docs/10/errcodes-appendix.html
case "40P01":
throw new DeadlockException($"The request for the distributed lock failed with exit code '{postgresException.SqlState}' (deadlock_detected)", ex);
Expand All @@ -91,18 +104,13 @@ private PostgresAdvisoryLock(bool isShared)

await RollBackTransactionTimeoutVariablesIfNeededAsync().ConfigureAwait(false);

switch (acquireCommandResult)
return acquireCommandResult switch
{
case 0: return null;
case 1: return Cookie;
case AlreadyHeldReturnCode:
if (timeout.IsZero) { return null; }
if (timeout.IsInfinite) { throw new DeadlockException("Attempted to acquire a lock that is already held on the same connection"); }
await SyncViaAsync.Delay(timeout, cancellationToken).ConfigureAwait(false);
return null;
default:
throw new InvalidOperationException($"Unexpected return code {acquireCommandResult}");
}
DBNull _ => Cookie, // indicates we called pg_advisory_lock and not pg_try_advisory_lock
false => null,
true => Cookie,
_ => throw new InvalidOperationException($"Unexpected value '{acquireCommandResult}' from acquire command")
};

async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync()
{
Expand All @@ -116,6 +124,22 @@ async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync()
}
}

private async Task<bool> IsHoldingLockAsync(DatabaseConnection connection, PostgresAdvisoryLockKey key, CancellationToken cancellationToken)
{
using var command = connection.CreateCommand();
command.SetCommandText($@"
SELECT COUNT(*)
FROM pg_catalog.pg_locks l
JOIN pg_catalog.pg_database d
ON d.oid = l.database
WHERE l.locktype = 'advisory'
AND {AddPGLocksFilterParametersAndGetFilterExpression(command, key)}
AND l.pid = pg_catalog.pg_backend_pid()
AND d.datname = pg_catalog.current_database()"
);
return (long)await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false) != 0;
}

private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, PostgresAdvisoryLockKey key, TimeoutValue timeout)
{
var command = connection.CreateCommand();
Expand All @@ -128,55 +152,19 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post
// we'll be using the pg_try_advisory_lock function which doesn't block in that case.
commandText.AppendLine($"SET LOCAL lock_timeout = {(timeout.IsZero || timeout.IsInfinite ? 0 : timeout.InMilliseconds)};");

if (connection.IsExernallyOwned)
{
commandText.Append($@"
SELECT
CASE WHEN EXISTS(
SELECT *
FROM pg_catalog.pg_locks l
JOIN pg_catalog.pg_database d
ON d.oid = l.database
WHERE l.locktype = 'advisory'
AND {AddPGLocksFilterParametersAndGetFilterExpression(command, key)}
AND l.pid = pg_catalog.pg_backend_pid()
AND d.datname = pg_catalog.current_database()
)
THEN {AlreadyHeldReturnCode}
ELSE
"
);
AppendAcquireFunctionCall();
commandText.AppendLine().Append("END");
}
else
{
commandText.Append("SELECT ");
AppendAcquireFunctionCall();
}
commandText.Append(" AS result");
commandText.Append("SELECT ");
var isTry = timeout.IsZero;
commandText.Append("pg_catalog.pg");
if (isTry) { commandText.Append("_try"); }
commandText.Append("_advisory_lock");
if (this._isShared) { commandText.Append("_shared"); }
commandText.Append('(').Append(AddKeyParametersAndGetKeyArguments(command, key)).Append(')')
.Append(" AS result");

command.SetCommandText(commandText.ToString());
command.SetTimeout(timeout);

return command;

void AppendAcquireFunctionCall()
{
// creates an expression like
// pg_try_advisory_lock(@key1, @key2)::int
// OR (SELECT 1 FROM (SELECT pg_advisory_lock(@key)) f)
var isTry = timeout.IsZero;
if (!isTry) { commandText.Append("(SELECT 1 FROM (SELECT "); }
commandText.Append("pg_catalog.pg");
if (isTry) { commandText.Append("_try"); }
commandText.Append("_advisory");
commandText.Append("_lock");
if (this._isShared) { commandText.Append("_shared"); }
commandText.Append('(').Append(AddKeyParametersAndGetKeyArguments(command, key)).Append(')');
if (isTry) { commandText.Append("::int"); }
else { commandText.Append(") f)"); }
}
}

private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection connection)
Expand Down Expand Up @@ -249,6 +237,7 @@ private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseC
}
else
{
AddKeyParametersAndGetKeyArguments(command, key);
classIdParameter = "key1";
objIdParameter = "key2";
objSubId = "2";
Expand Down
2 changes: 1 addition & 1 deletion DistributedLock/DistributedLock.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>2.3.1</Version>
<Version>2.3.2</Version>
<AssemblyVersion>2.0.0.0</AssemblyVersion>
<Authors>Michael Adelson</Authors>
<Description>Provides easy-to-use mutexes, reader-writer locks, and semaphores that can synchronize across processes and machines. This is an umbrella package that brings in the entire family of DistributedLock.* packages (e. g. DistributedLock.SqlServer) as references. Those packages can also be installed individually.
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public class SomeService
Contributions are welcome! If you are interested in contributing towards a new or existing issue, please let me know via comments on the issue so that I can help you get started and avoid wasted effort on your part.

## Release notes
- 2.3.2
- Work around underlying Postgres race condition when waiting on advisory locks with a short non-zero timeout ([#147](https://github.com/madelson/DistributedLock/issues/147), DistributedLock.Postgres 1.0.4). Thanks @Tzachi009 for reporting and isolating the issue!
- 2.3.1
- Fixed concurrency issue with `HandleLostToken` for relational database locks ([#133](https://github.com/madelson/DistributedLock/issues/133), DistributedLock.Core 1.0.5, DistributedLock.MySql 1.0.1, DistributedLock.Oracle 1.0.1, DistributedLock.Postgres 1.0.3, DistributedLock.SqlServer 1.0.2). Thanks [@OskarKlintrot](https://github.com/OskarKlintrot) for testing!
- Fixed misleading error message why trying to disable auto-extension in Redis ([#130](https://github.com/madelson/DistributedLock/issues/130), DistributedLock.Redis 1.0.2)
Expand Down

0 comments on commit 746fbf5

Please sign in to comment.