Skip to content

Commit

Permalink
feat: 添加AsyncLock的同步版本SyncLock (#572)
Browse files Browse the repository at this point in the history
  • Loading branch information
joesdu authored Oct 8, 2024
2 parents df41ecf + baaf488 commit 99d520b
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 13 deletions.
38 changes: 29 additions & 9 deletions src/EasilyNET.Core/Threading/AsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace EasilyNET.Core.Threading;
public sealed class AsyncLock
{
private readonly Task<Release> _release;

private readonly AsyncSemaphore _semaphore = new();

/// <summary>
Expand All @@ -20,6 +19,7 @@ public AsyncLock()
}

/// <summary>
/// 获取内部信号量的占用状态
/// </summary>
/// <returns></returns>
public int GetSemaphoreTaken() => _semaphore.GetTaken();
Expand All @@ -31,7 +31,7 @@ public AsyncLock()
public int GetQueueCount() => _semaphore.GetQueueCount();

/// <summary>
/// 锁定返回一个 <see cref="Release" /> 对象
/// 锁定,返回一个 <see cref="Release" /> 对象
/// </summary>
/// <returns></returns>
public Task<Release> LockAsync()
Expand All @@ -41,23 +41,43 @@ public Task<Release> LockAsync()
}

/// <summary>
/// 锁定任务的执行。
/// 锁定任务的执行,无返回值
/// </summary>
/// <param name="taskFunc"></param>
/// <returns></returns>
public async Task LockAsync(Func<Task> taskFunc)
{
using (await LockAsync())
{
await taskFunc();
}
}

/// <summary>
/// 锁定任务的执行,可返回执行函数的结果
/// </summary>
/// <param name="task"></param>
/// <param name="taskFunc"></param>
/// <returns></returns>
public async Task LockAsync(Task task)
// ReSharper disable once UnusedMethodReturnValue.Global
public async Task<T> LockAsync<T>(Func<Task<T>> taskFunc)
{
using var r = LockAsync();
await task;
using (await LockAsync())
{
var t = await taskFunc();
return t;
}
}

/// <remarks>
/// 释放者
/// Release
/// </remarks>
/// <param name="asyncLock"></param>
public readonly struct Release(AsyncLock asyncLock) : IDisposable
{
/// <inheritdoc />
public void Dispose() => asyncLock._semaphore.Release();
public void Dispose()
{
asyncLock._semaphore.Release();
}
}
}
74 changes: 74 additions & 0 deletions src/EasilyNET.Core/Threading/SyncLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// ReSharper disable UnusedMember.Global

namespace EasilyNET.Core.Threading;

/// <summary>
/// 同步锁,用于仅让一个线程访问共享资源。
/// </summary>
public sealed class SyncLock
{
private readonly SyncSemaphore _semaphore = new();
private int _ownerThreadId = -1;

/// <summary>
/// 获取内部信号量的占用状态
/// </summary>
/// <returns></returns>
public int GetSemaphoreTaken() => _semaphore.GetTaken();

/// <summary>
/// 获取内部信号量的队列计数
/// </summary>
/// <returns></returns>
public int GetQueueCount() => _semaphore.GetQueueCount();

/// <summary>
/// 锁定,返回一个 <see cref="Release" /> 对象
/// </summary>
/// <returns></returns>
public Release Lock()
{
var currentThreadId = Environment.CurrentManagedThreadId;
if (_ownerThreadId == currentThreadId)
{
throw new InvalidOperationException("Reentrant lock detected");
}
_semaphore.Wait();
_ownerThreadId = currentThreadId;
return new(this);
}

/// <summary>
/// 锁定任务的执行,无返回值
/// </summary>
/// <param name="action"></param>
public void Lock(Action action)
{
using var r = Lock();
action();
}

/// <summary>
/// 锁定任务的执行,可返回执行函数的结果
/// </summary>
/// <param name="action"></param>
public T Lock<T>(Func<T> action)
{
using var r = Lock();
return action();
}

/// <remarks>
/// Release
/// </remarks>
/// <param name="syncLock"></param>
public readonly struct Release(SyncLock syncLock) : IDisposable
{
/// <inheritdoc />
public void Dispose()
{
syncLock._ownerThreadId = -1;
syncLock._semaphore.Release();
}
}
}
57 changes: 57 additions & 0 deletions src/EasilyNET.Core/Threading/SyncSemaphore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System.Collections.Concurrent;

namespace EasilyNET.Core.Threading;

/// <summary>
/// 同步信号量。
/// </summary>
internal sealed class SyncSemaphore
{
private readonly ConcurrentQueue<ManualResetEventSlim> _waiters = [];
private int _isTaken;

/// <summary>
/// 获取是否被占用
/// </summary>
/// <returns></returns>
public int GetTaken() => _isTaken;

/// <summary>
/// 获取等待的任务数量
/// </summary>
/// <returns></returns>
public int GetQueueCount() => _waiters.Count;

/// <summary>
/// 同步等待
/// </summary>
public void Wait()
{
// 如果 _isTaken 的值是 0,则将其设置为 1,并返回。
if (Interlocked.CompareExchange(ref _isTaken, 1, 0) == 0)
{
return;
}
// 如果 _isTaken 的值不是 0,创建一个新的 ManualResetEventSlim,并将其设置为未终止状态。
var mre = new ManualResetEventSlim(false);
// 将 ManualResetEventSlim 实例添加到等待队列中。
_waiters.Enqueue(mre);
// 等待 ManualResetEventSlim 被终止。
mre.Wait();
}

/// <summary>
/// 释放
/// </summary>
public void Release()
{
if (_waiters.TryDequeue(out var toRelease))
{
toRelease.Set();
}
else
{
Interlocked.Exchange(ref _isTaken, 0);
}
}
}
84 changes: 80 additions & 4 deletions test/EasilyNET.Test.Unit/Threading/AsyncLockTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@

namespace EasilyNET.Test.Unit.Threading;

/// <summary>
/// 测试异步锁
/// </summary>
[TestClass]
public class AsyncLockTest(TestContext testContext)
public class AsyncLockTests(TestContext testContext)
{
// ReSharper disable once CollectionNeverQueried.Local
private static readonly Dictionary<string, string> _dictionary = [];
Expand Down Expand Up @@ -115,4 +112,83 @@ public async Task WaitingTasksAreReleasedWhenSemaphoreIsReleased()
res2.Dispose(); //释放
asyncLock.GetSemaphoreTaken().Should().Be(0);
}

/// <summary>
/// 测试基本锁定和释放功能,确保 _isTaken 状态正确。
/// </summary>
[TestMethod]
public async Task LockAsync_ShouldLockAndRelease()
{
var asyncLock = new AsyncLock();
Assert.AreEqual(0, asyncLock.GetSemaphoreTaken());
using (await asyncLock.LockAsync())
{
Assert.AreEqual(1, asyncLock.GetSemaphoreTaken());
}
Assert.AreEqual(0, asyncLock.GetSemaphoreTaken());
}

/// <summary>
/// 测试 LockAsync(Action action) 方法,确保传入的操作被执行。
/// </summary>
[TestMethod]
public async Task LockAsync_ShouldExecuteAction()
{
var asyncLock = new AsyncLock();
var executed = false;
await asyncLock.LockAsync(() => Task.Run(() => executed = true));
Assert.IsTrue(executed);
}

/// <summary>
/// 测试在锁定时,后续任务会排队等待,确保任务按顺序执行。
/// </summary>
[TestMethod]
public async Task LockAsync_ShouldQueueWhenLocked()
{
var asyncLock = new AsyncLock();
var task1Completed = false;
var task2Completed = false;
var task1 = Task.Run(async () =>
{
using (await asyncLock.LockAsync())
{
await Task.Delay(100);
task1Completed = true;
}
});
var task2 = Task.Run(async () =>
{
using (await asyncLock.LockAsync())
{
task2Completed = true;
}
});
await Task.WhenAll(task1, task2);
Assert.IsTrue(task1Completed);
Assert.IsTrue(task2Completed);
}

/// <summary>
/// 测试多线程环境下的锁定行为,确保计数器正确增加。
/// </summary>
[TestMethod]
public async Task LockAsync_ShouldHandleMultipleThreads()
{
var asyncLock = new AsyncLock();
var counter = 0;
var tasks = new Task[10];
for (var i = 0; i < tasks.Length; i++)
{
tasks[i] = Task.Run(async () =>
{
for (var j = 0; j < 100; j++)
{
await asyncLock.LockAsync(async () => await Task.Run(() => counter++));
}
});
}
await Task.WhenAll(tasks);
Assert.AreEqual(1000, counter);
}
}
Loading

0 comments on commit 99d520b

Please sign in to comment.