Skip to content

Commit

Permalink
Merge pull request #88 from madelson/release-2.0.2
Browse files Browse the repository at this point in the history
Release 2.0.2
  • Loading branch information
madelson authored Apr 24, 2021
2 parents 7c7fa82 + a60febb commit 6ee3eb6
Show file tree
Hide file tree
Showing 16 changed files with 142 additions and 62 deletions.
2 changes: 1 addition & 1 deletion DistributedLock.Core/DistributedLock.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>1.0.0</Version>
<Version>1.0.1</Version>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<Authors>Michael Adelson</Authors>
<Description>Core interfaces and utilities that support the DistributedLock.* family of packages</Description>
Expand Down
25 changes: 20 additions & 5 deletions DistributedLock.Core/Internal/Data/ConnectionMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,12 @@ private async ValueTask StopOrDisposeAsync(bool isDispose)
this.CloseOrCancelMonitoringHandleRegistrationsNoLock(isCancel: false);

task = this._monitoringWorkerTask;
this._monitorStateChangedTokenSource?.Cancel();

// Note: synchronous cancel here should be safe because we've already set
// the state to disposed above which the monitoring loop will check if it
// takes over the Cancel() thread.
this._monitorStateChangedTokenSource?.Cancel();

// unsubscribe from state change tracking
if (this._stateChangedHandler != null
&& this._weakConnection.TryGetTarget(out var connection))
Expand Down Expand Up @@ -304,7 +308,7 @@ private bool StartMonitorWorkerIfNeededNoLock()

// skip if there's nothing to do
if (this._keepaliveCadence.IsInfinite && !this.HasRegisteredMonitoringHandlesNoLock) { return false; }

this._monitorStateChangedTokenSource = new CancellationTokenSource();
// Set up the task as a continuation on the previous task to avoid concurrency in the case where the previous
// one is spinning down. If we change states in rapid succession we could end up with multiple tasks queued up
Expand All @@ -318,9 +322,20 @@ private bool StartMonitorWorkerIfNeededNoLock()

private void FireStateChangedNoLock()
{
this._monitorStateChangedTokenSource!.Cancel();
this._monitorStateChangedTokenSource.Dispose();
var monitorStateChangedTokenSource = this._monitorStateChangedTokenSource!;
this._monitorStateChangedTokenSource = new CancellationTokenSource();
// Canceling asynchronously is important because the Cancel() thread can end up
// running continuations inside the monitoring loop (e. g. see
// https://github.com/madelson/DistributedLock/issues/85). Now that we set the new
// token source before canceling the old one we should avoid that particular issue, but
// it is still safer and easier to reason about not to have that happen. This also ensures
// that FireStateChangedNoLock() always returns quickly, even if the monitoring loop
// were to do some synchronous work on the continuation thread.
Task.Run(() =>
{
try { monitorStateChangedTokenSource.Cancel(); }
finally { monitorStateChangedTokenSource.Dispose(); }
});
}

private async Task MonitorWorkerLoop()
Expand Down Expand Up @@ -357,7 +372,7 @@ private async Task<bool> DoMonitoringAsync(CancellationToken cancellationToken)
using var _ = await this._connectionLock.AcquireAsync(CancellationToken.None).ConfigureAwait(false);

// 1-min increments is kind of an arbitrary choice. We want to avoid this being too short since each time
// we "come up to breathe" that's a waste of resource. We also want to avoid this being too long since
// we "come up to breathe" that's a waste of resources. We also want to avoid this being too long since
// in case people have some kind of monitoring set up for hanging queries
await connection.SleepAsync(
sleepTime: TimeSpan.FromMinutes(1),
Expand Down
16 changes: 8 additions & 8 deletions DistributedLock.Core/Internal/Data/DatabaseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ private async ValueTask DisposeOrCloseAsync(bool isDispose)
finally
{
#if NETSTANDARD2_1
if (!SyncViaAsync.IsSynchronous && this.InnerConnection is DbConnection dbConnection)
{
await (isDispose ? dbConnection.DisposeAsync() : dbConnection.CloseAsync().AsValueTask()).ConfigureAwait(false);
}
else
{
SyncDisposeConnection();
}
if (!SyncViaAsync.IsSynchronous && this.InnerConnection is DbConnection dbConnection)
{
await (isDispose ? dbConnection.DisposeAsync() : dbConnection.CloseAsync().AsValueTask()).ConfigureAwait(false);
}
else
{
SyncDisposeConnection();
}
#elif NETSTANDARD2_0 || NET461
SyncDisposeConnection();
#else
Expand Down
39 changes: 32 additions & 7 deletions DistributedLock.Core/Internal/Data/MultiplexedConnectionLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ internal sealed class MultiplexedConnectionLock : IAsyncDisposable
private readonly AsyncLock _mutex = AsyncLock.Create();
private readonly Dictionary<string, TimeoutValue> _heldLocksToKeepaliveCadences = new Dictionary<string, TimeoutValue>();
private readonly DatabaseConnection _connection;
/// <summary>
/// Tracks whether we've successfully opened the connection. We track this explicity instead of just looking at
/// <see cref="DatabaseConnection.CanExecuteQueries"/> because we want to make sure we close() explicitly for every
/// open() and also we want to make sure we do not try to re-open a broken connection.
/// </summary>
private bool _connectionOpened;

public MultiplexedConnectionLock(DatabaseConnection connection)
{
this._connection = connection;
}

private bool IsConnectionBrokenNoLock => this._connectionOpened && !this._connection.CanExecuteQueries;

public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
string name,
TimeoutValue timeout,
Expand All @@ -33,8 +41,8 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
bool opportunistic)
where TLockCookie : class
{
using var mutextHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
if (mutextHandle == null)
using var mutexHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
if (mutexHandle == null)
{
// mutex wasn't free, so just give up
Invariant.Require(opportunistic);
Expand All @@ -43,6 +51,10 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
return new Result(MultiplexedConnectionLockRetry.Retry, canSafelyDispose: false);
}

// This is technically redundant with the similar catch block below, but avoids needing to have
// to attempt a query on a connection that we know is broken.
if (opportunistic && this.IsConnectionBrokenNoLock) { return this.GetAlreadyBrokenResultNoLock(); }

try
{
if (this._heldLocksToKeepaliveCadences.ContainsKey(name))
Expand All @@ -53,9 +65,10 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
return this.GetFailureResultNoLock(isAlreadyHeld: true, opportunistic, timeout);
}

if (!this._connection.CanExecuteQueries)
if (!this._connectionOpened)
{
await this._connection.OpenAsync(cancellationToken).ConfigureAwait(false);
this._connectionOpened = true;
}

var lockCookie = await strategy.TryAcquireAsync(this._connection, name, opportunistic ? TimeSpan.Zero : timeout, cancellationToken).ConfigureAwait(false);
Expand All @@ -71,6 +84,11 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
// shortened the timeout
return this.GetFailureResultNoLock(isAlreadyHeld: false, opportunistic, timeout);
}
// never punish for the connection being broken already (see https://github.com/madelson/DistributedLock/issues/83)
catch when (opportunistic && this.IsConnectionBrokenNoLock)
{
return this.GetAlreadyBrokenResultNoLock();
}
finally
{
await this.CloseConnectionIfNeededNoLockAsync().ConfigureAwait(false);
Expand All @@ -90,6 +108,11 @@ public async ValueTask<bool> GetIsInUseAsync()
return mutexHandle == null || this._heldLocksToKeepaliveCadences.Count != 0;
}

private Result GetAlreadyBrokenResultNoLock() =>
// Retry on any already-broken connection to avoid "leaking" the killing or death of connections. We want there to be no observable
// results (other than perf) of multiplexing vs. not.
new Result(MultiplexedConnectionLockRetry.Retry, canSafelyDispose: this._heldLocksToKeepaliveCadences.Count == 0);

private Result GetFailureResultNoLock(bool isAlreadyHeld, bool opportunistic, TimeoutValue timeout)
{
// only opportunistic acquisitions trigger retries
Expand Down Expand Up @@ -151,11 +174,13 @@ private async ValueTask ReleaseAsync<TLockCookie>(IDbSynchronizationStrategy<TLo
}
}

private ValueTask CloseConnectionIfNeededNoLockAsync()
private async ValueTask CloseConnectionIfNeededNoLockAsync()
{
return this._heldLocksToKeepaliveCadences.Count == 0 && this._connection.CanExecuteQueries
? this._connection.CloseAsync()
: default;
if (this._connectionOpened && this._heldLocksToKeepaliveCadences.Count == 0)
{
await this._connection.CloseAsync().ConfigureAwait(false);
this._connectionOpened = false;
}
}

private void SetKeepaliveCadenceNoLock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public MultiplexedConnectionLockPool(Func<string, DatabaseConnection> connection
{
// opportunistic phase: see if we can use a connection that is already holding a lock
// to acquire the current lock
var existingLock = await this.GetOrCreateLockAsync(connectionString).ConfigureAwait(false);
var existingLock = await this.GetExistingLockOrDefaultAsync(connectionString).ConfigureAwait(false);
if (existingLock != null)
{
var canSafelyDisposeExistingLock = false;
Expand Down Expand Up @@ -89,6 +89,7 @@ public MultiplexedConnectionLockPool(Func<string, DatabaseConnection> connection
try
{
result = await TryAcquireAsync(@lock, opportunistic: false).ConfigureAwait(false);
Invariant.Require(result!.Value.Retry == MultiplexedConnectionLockRetry.NoRetry, "Acquire on fresh lock should not recommend a retry");
}
finally
{
Expand All @@ -101,7 +102,7 @@ public MultiplexedConnectionLockPool(Func<string, DatabaseConnection> connection
@lock.TryAcquireAsync(name, timeout, strategy, keepaliveCadence, cancellationToken, opportunistic);
}

private async ValueTask<MultiplexedConnectionLock?> GetOrCreateLockAsync(string connectionString)
private async ValueTask<MultiplexedConnectionLock?> GetExistingLockOrDefaultAsync(string connectionString)
{
using var _ = await this._lock.AcquireAsync(CancellationToken.None).ConfigureAwait(false);

Expand Down
4 changes: 2 additions & 2 deletions DistributedLock.Postgres/DistributedLock.Postgres.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>1.0.0</Version>
<Version>1.0.1</Version>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<Authors>Michael Adelson</Authors>
<Description>Provides a distributed lock implementation based on Postgresql</Description>
Expand Down Expand Up @@ -41,7 +41,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Npgsql" Version="4.1.7" />
<PackageReference Include="Npgsql" Version="5.0.4" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion DistributedLock.SqlServer/DistributedLock.SqlServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<PropertyGroup>
<Version>1.0.0</Version>
<Version>1.0.1</Version>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<Authors>Michael Adelson</Authors>
<Description>Provides a distributed lock implementation based on SQL Server</Description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Medallion.Threading.Tests.Data
{
Expand Down Expand Up @@ -95,5 +96,29 @@ public void TestKeepaliveDoesNotCreateRaceCondition()
}
});
}

// replicates issue from https://github.com/madelson/DistributedLock/issues/85
[Test]
public async Task TestAccessingHandleLostTokenWhileKeepaliveActiveDoesNotBlock()
{
this._lockProvider.Strategy.KeepaliveCadence = TimeSpan.FromMinutes(5);

var @lock = this._lockProvider.CreateLock(string.Empty);
var handle = await @lock.TryAcquireAsync();
if (handle != null)
{
var accessHandleLostTokenTask = Task.Run(() =>
{
if (handle.HandleLostToken.CanBeCanceled)
{
handle.HandleLostToken.Register(() => { });
}
});
Assert.IsTrue(await accessHandleLostTokenTask.WaitAsync(TimeSpan.FromSeconds(5)));

// do this only on success; on failure we're likely deadlocked and dispose will hang
await handle.DisposeAsync();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
Expand Down Expand Up @@ -122,5 +123,31 @@ async Task Test()

string MakeLockName(int i) => $"{nameof(TestHighConcurrencyWithSmallPool)}_{i}";
}

[Test]
public async Task TestBrokenConnectionDoesNotCorruptPool()
{
// This makes sure that for the Semaphore5 lock initial 4 tickets are taken with the default
// application name and therefore won't be killed
this._lockProvider.CreateLock("1");
this._lockProvider.CreateLock("2");
var applicationName = this._lockProvider.Strategy.SetUniqueApplicationName();

var lock1 = this._lockProvider.CreateLock("1");
await using var handle1 = await lock1.AcquireAsync();

// kill the session
await this._lockProvider.Strategy.Db.KillSessionsAsync(applicationName);

var lock2 = this._lockProvider.CreateLock("2");
Assert.DoesNotThrowAsync(async () => await (await lock2.AcquireAsync()).DisposeAsync());

await using var handle2 = await lock2.AcquireAsync();
Assert.DoesNotThrow(() => lock2.TryAcquire()?.Dispose());

Assert.Catch(() => handle1.Dispose());

Assert.DoesNotThrowAsync(async () => await (await lock1.AcquireAsync()).DisposeAsync());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ public sealed class TestingDbConnectionOptions
public DbTransaction? Transaction { get; set; }

public T Create<T>(
Func<string, (bool useMultiplexing, bool useTransaction, TimeSpan keepaliveCadence), T> fromConnectionString,
Func<string, (bool useMultiplexing, bool useTransaction, TimeSpan? keepaliveCadence), T> fromConnectionString,
Func<DbConnection, T> fromConnection,
Func<DbTransaction, T> fromTransaction)
{
if (this.ConnectionString != null)
{
return fromConnectionString(this.ConnectionString, (this.ConnectionStringUseMultiplexing, this.ConnectionStringUseTransaction, this.ConnectionStringKeepaliveCadence ?? Timeout.InfiniteTimeSpan));
return fromConnectionString(this.ConnectionString, (this.ConnectionStringUseMultiplexing, this.ConnectionStringUseTransaction, this.ConnectionStringKeepaliveCadence));
}

if (this.Connection != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public int CountActiveSessions(string applicationName)
using var command = connection.CreateCommand();
command.CommandText = "SELECT COUNT(*)::int FROM pg_stat_activity WHERE application_name = @applicationName";
command.Parameters.AddWithValue("applicationName", applicationName);
return (int)command.ExecuteScalar();
return (int)command.ExecuteScalar()!;
}

public IsolationLevel GetIsolationLevel(DbConnection connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ public override IDistributedLock CreateLockWithExactName(string name) =>

public override string GetSafeName(string name) => new PostgresAdvisoryLockKey(name, allowHashing: true).ToString();

internal static Action<PostgresConnectionOptionsBuilder> ToPostgresOptions((bool useMultiplexing, bool useTransaction, TimeSpan keepaliveCadence) options) =>
o => o.UseMultiplexing(options.useMultiplexing).KeepaliveCadence(options.keepaliveCadence);
internal static Action<PostgresConnectionOptionsBuilder> ToPostgresOptions((bool useMultiplexing, bool useTransaction, TimeSpan? keepaliveCadence) options) => o =>
{
o.UseMultiplexing(options.useMultiplexing);
if (options.keepaliveCadence is { } keepaliveCadence) { o.KeepaliveCadence(keepaliveCadence); }
};
}

public sealed class TestingPostgresDistributedReaderWriterLockProvider<TStrategy> : TestingReaderWriterLockProvider<TStrategy>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ public override IDistributedLock CreateLockWithExactName(string name) =>

public override string GetSafeName(string name) => SqlDistributedLock.GetSafeName(name);

internal static Action<SqlConnectionOptionsBuilder> ToSqlOptions((bool useMultiplexing, bool useTransaction, TimeSpan keepaliveCadence) options) =>
o => o.UseMultiplexing(options.useMultiplexing).UseTransaction(options.useTransaction).KeepaliveCadence(options.keepaliveCadence);
internal static Action<SqlConnectionOptionsBuilder> ToSqlOptions((bool useMultiplexing, bool useTransaction, TimeSpan? keepaliveCadence) options) => o =>
{
o.UseMultiplexing(options.useMultiplexing).UseTransaction(options.useTransaction);
if (options.keepaliveCadence is { } keepaliveCadence) { o.KeepaliveCadence(keepaliveCadence); }
};
}

public sealed class TestingSqlDistributedReaderWriterLockProvider<TStrategy, TDb> : TestingUpgradeableReaderWriterLockProvider<TStrategy>
Expand Down
Loading

0 comments on commit 6ee3eb6

Please sign in to comment.