Skip to content

Commit

Permalink
publish
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhiqiang committed Dec 19, 2020
1 parent 8753df7 commit ccaba57
Show file tree
Hide file tree
Showing 19 changed files with 172 additions and 103 deletions.
2 changes: 1 addition & 1 deletion Aix.ScheduleTask.sln
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aix.ScheduleTask", "src\Aix
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{09D4E313-D377-4737-8A71-280632A51E30}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Aix.ScheduleTask.Example", "examples\Aix.ScheduleTask.Example\Aix.ScheduleTask.Example.csproj", "{06E36890-A5FB-4B8A-94FD-ACC92C4F2070}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aix.ScheduleTask.Example", "examples\Aix.ScheduleTask.Example\Aix.ScheduleTask.Example.csproj", "{06E36890-A5FB-4B8A-94FD-ACC92C4F2070}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
2 changes: 1 addition & 1 deletion examples/Aix.ScheduleTask.Example/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ internal static void ConfigureServices(HostBuilderContext context, IServiceColle
var options = new AixScheduleTaskOptions
{
Master = dbOption.Master,
DBType =2
DBType =1
};
services.AddScheduleTask(options);

Expand Down
4 changes: 2 additions & 2 deletions examples/Aix.ScheduleTask.Example/appsettings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"connectionStrings": {
"master2": "Server=.;Database=netcoredemo;User ID=sa;Password=Sa123456;Max Pool Size=1000;Packet Size=32768;MultipleActiveResultSets=true;",
"master": "server=192.168.102.108;port=3306;database=demo;uid=admin;pwd=CgTbMMiaWntK2#;charset=utf8mb4;Connection Timeout=18000;SslMode=none;"
"master": "Server=.;Database=netcoredemo;User ID=sa;Password=Sa123456;Max Pool Size=1000;Packet Size=32768;MultipleActiveResultSets=true;",
"master1": "server=192.168.102.108;port=3306;database=demo;uid=admin;pwd=CgTbMMiaWntK2#;charset=utf8mb4;Connection Timeout=18000;SslMode=none;"
}
}
14 changes: 14 additions & 0 deletions scripts/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
set -ex

cd $(dirname $0)/../

artifactsFolder="./artifacts"

if [ -d $artifactsFolder ]; then
rm -R $artifactsFolder
fi

mkdir -p $artifactsFolder

dotnet restore ./Aix.ScheduleTask.sln
dotnet build ./Aix.ScheduleTask.sln -c Release
File renamed without changes.
File renamed without changes.
18 changes: 18 additions & 0 deletions scripts/nuget.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
set -ex

cd $(dirname $0)/../

artifactsFolder="./artifacts"

if [ -d $artifactsFolder ]; then
rm -R $artifactsFolder
fi

mkdir -p $artifactsFolder


dotnet build ./src/Aix.ScheduleTask/Aix.ScheduleTask.csproj -c Release

dotnet pack ./src/Aix.ScheduleTask/Aix.ScheduleTask.csproj -c Release -o $artifactsFolder

dotnet nuget push ./$artifactsFolder/Aix.ScheduleTask.*.nupkg -k $PRIVATE_NUGET_KEY -s http://192.168.102.34:8081/repository/nuget-hosted
17 changes: 17 additions & 0 deletions scripts/package.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
set -ex

cd $(dirname $0)/../

artifactsFolder="./artifacts"

if [ -d $artifactsFolder ]; then
rm -R $artifactsFolder
fi

mkdir -p $artifactsFolder

dotnet restore ./Aix.ScheduleTask.sln
dotnet build ./Aix.ScheduleTask.sln -c Release


dotnet pack ./src/Aix.ScheduleTask/Aix.ScheduleTask.csproj -c Release -o $artifactsFolder
2 changes: 1 addition & 1 deletion src/Aix.ScheduleTask/Aix.ScheduleTask.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Aix.ORM" Version="1.0.6" />
<PackageReference Include="Aix.ORM" Version="1.0.7" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />
<PackageReference Include="ncrontab" Version="3.3.1" />
Expand Down
5 changes: 5 additions & 0 deletions src/Aix.ScheduleTask/AixScheduleTaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@ public class AixScheduleTaskOptions
///执行间隔时间(秒) 默认30秒 范围[5,30]
/// </summary>
public int CrontabIntervalSecond { get; set; } = 30;

/// <summary>
/// 集群类型 0=多集群(默认值) 1=单实例
/// </summary>
public int ClusterType { get; set; } = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

namespace Aix.ScheduleTask.Repository
{
public interface IAixDistributionLockRepository:IRepository
public interface IAixDistributionLockRepository: ICommonRepository
{
Task<AixDistributionLock> Get(string lockName);

Task<string> UseLock(string lockName, int commandTimeout = 300);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Aix.ScheduleTask.Repository
{
public interface IAixScheduleTaskRepository:IRepository
public interface IAixScheduleTaskRepository: ICommonRepository
{
Task<PagedList<AixScheduleTaskInfo>> PageQuery(PageView pageView);

Expand Down
18 changes: 18 additions & 0 deletions src/Aix.ScheduleTask/Repository/ICommonRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Aix.ORM;
using Aix.ORM.DbTransactionManager;
using Aix.ORM.Repository;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Aix.ScheduleTask.Repository
{
/// <summary>
/// 公共接口
/// </summary>
public interface ICommonRepository : IRepository
{
// Task<string> UseLock(string lockName, int commandTimeout = 300);
}
}
28 changes: 0 additions & 28 deletions src/Aix.ScheduleTask/Repository/IRepository.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,17 @@ public Task<AixDistributionLock> Get(string lockName)
string sql =$"SELECT {AllColumns} FROM aix_distribution_lock WHERE lock_name = @lockName ";
return base.GetAsync<AixDistributionLock>(sql,new { lockName });
}

/// <summary>
/// 开启分布式锁,跟着当前事务结束而结束 mysql
/// </summary>
/// <param name="lockName">请确保数据库中已存在该lockName</param>
/// <param name="commandTimeout">超时时间 单位 秒</param>
/// <returns></returns>
public Task<string> UseLock(string lockName, int commandTimeout = 300)
{
string sql = "select lock_name from aix_distribution_lock where lock_name=@lockName for update";
return base.ExecuteScalarAsync<string>(sql, new { lockName }, commandTimeout);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,5 @@ protected override AbstractSqlExecuteTrace GetSqlExecuteTrace(string sql, object
return new SqlExecuteTrace(sql, paras, _provider);
}

/// <summary>
/// 开启分布式锁,跟着当前事务结束而结束 mysql
/// </summary>
/// <param name="lockName">请确保数据库中已存在该lockName</param>
/// <param name="commandTimeout">超时时间 单位 秒</param>
/// <returns></returns>
public Task<string> UseLock(string lockName, int commandTimeout = 300)
{
string sql = "select lock_name from aix_distribution_lock where lock_name=@lockName for update";
return base.ExecuteScalarAsync<string>(sql, new { lockName }, commandTimeout);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,18 @@ public Task<AixDistributionLock> Get(string lockName)
string sql = $"SELECT {AllColumns} FROM aix_distribution_lock WHERE lock_name = @lockName ";
return base.GetAsync<AixDistributionLock>(sql, new { lockName });
}

/// <summary>
/// 开启分布式锁,跟着当前事务结束而结束 sqlserver
/// </summary>
/// <param name="lockName">请确保数据库中已存在该lockName</param>
/// <param name="commandTimeout">超时时间 单位 秒</param>
/// <returns></returns>
public Task<string> UseLock(string lockName, int commandTimeout = 300)
{
string sql = "select lock_name from aix_distribution_lock with (rowlock,UpdLock) where lock_name=@lockName ";
return base.ExecuteScalarAsync<string>(sql, new { lockName }, commandTimeout);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,7 @@ protected override AbstractSqlExecuteTrace GetSqlExecuteTrace(string sql, object
return new SqlExecuteTrace(sql, paras, _provider);
}

/// <summary>
/// 开启分布式锁,跟着当前事务结束而结束 sqlserver
/// </summary>
/// <param name="lockName">请确保数据库中已存在该lockName</param>
/// <param name="commandTimeout">超时时间 单位 秒</param>
/// <returns></returns>
public Task<string> UseLock(string lockName, int commandTimeout = 300)
{
string sql = "select lock_name from aix_distribution_lock with (rowlock,UpdLock) where lock_name=@lockName ";
return base.ExecuteScalarAsync<string>(sql, new { lockName }, commandTimeout);
}



}
}
109 changes: 65 additions & 44 deletions src/Aix.ScheduleTask/ScheduleTaskExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,70 +92,91 @@ private async Task InnerStart()
{
try
{
await Execute();
await DistributionLockWrap();
}
catch (Exception ex)
{
_logger.LogError(ex, "定时任务执行出错");
await Task.Delay(TimeSpan.FromSeconds(5));
if (ex.Message.ToLower().IndexOf("timeout") < 0)
{
_logger.LogError(ex, "定时任务执行出错");
await Task.Delay(TimeSpan.FromSeconds(5));
}
}

}
}

private async Task Execute()
/// <summary>
/// 根据是否集群环境是否增加分布式锁
/// </summary>
/// <returns></returns>
private async Task DistributionLockWrap()
{
List<TimeSpan> nextExecuteDelays = null;
if (_options.ClusterType == 1)
{
nextExecuteDelays = await Execute();
}
else
{
using (var scope = _aixScheduleTaskRepository.BeginTransScope())
{
await _aixDistributionLockRepository.UseLock(ScheduleTaskLock, 300);
nextExecuteDelays = await Execute();

scope.Commit();
}
}

var depay = nextExecuteDelays.Any() ? nextExecuteDelays.Min() : TimeSpan.FromSeconds(CrontabIntervalSecond);
if (depay > TimeSpan.FromSeconds(CrontabIntervalSecond)) depay = TimeSpan.FromSeconds(CrontabIntervalSecond);
await Task.Delay(depay);
}

private async Task<List<TimeSpan>> Execute()
{
List<TimeSpan> nextExecuteDelays = new List<TimeSpan>(); //记录每个任务的下次执行时间,取最小的等待

using (var scope = _aixScheduleTaskRepository.BeginTransScope())
var now = DateTimeUtils.GetTimeStamp();
var taskList = await _aixScheduleTaskRepository.QueryAllEnabled(CrontabIntervalSecond * 1000 + now);
//处理
foreach (var task in taskList)
{
await _aixScheduleTaskRepository.UseLock(ScheduleTaskLock, 300);
var now = DateTimeUtils.GetTimeStamp();
var taskList = await _aixScheduleTaskRepository.QueryAllEnabled(CrontabIntervalSecond * 1000 + now);
//处理
foreach (var task in taskList)
if (!_isStart) break;
try
{
if (!_isStart) break;
try
var Schedule = ParseCron(task.Cron);
if (task.LastExecuteTime == 0) task.LastExecuteTime = now;
var nextExecuteTimeSpan = GetNextDueTime(Schedule, TimeStampToDateTime(task.LastExecuteTime), TimeStampToDateTime(now));
if (nextExecuteTimeSpan.TotalMilliseconds <= 0) //时间到了,开始执行任务
{
var Schedule = ParseCron(task.Cron);
if (task.LastExecuteTime == 0) task.LastExecuteTime = now;
var nextExecuteTimeSpan = GetNextDueTime(Schedule, TimeStampToDateTime(task.LastExecuteTime), TimeStampToDateTime(now));
if (nextExecuteTimeSpan.TotalMilliseconds <= 0) //时间到了,开始执行任务
{
await HandleMessage(task); //建议插入任务队列

now = DateTimeUtils.GetTimeStamp();
task.LastExecuteTime = now;
//计算下一次执行时间
nextExecuteTimeSpan = GetNextDueTime(Schedule, TimeStampToDateTime(task.LastExecuteTime), TimeStampToDateTime(now));
task.NextExecuteTime = now + (long)nextExecuteTimeSpan.TotalMilliseconds;
task.ModifyTime = DateTime.Now;
await _aixScheduleTaskRepository.UpdateAsync(task);
}

if (task.NextExecuteTime == 0) //只有第一次且未执行时更新下即可
{
task.NextExecuteTime = now + (long)nextExecuteTimeSpan.TotalMilliseconds;
task.ModifyTime = DateTime.Now;
await _aixScheduleTaskRepository.UpdateAsync(task);
}

nextExecuteDelays.Add(nextExecuteTimeSpan);
await HandleMessage(task); //建议插入任务队列

now = DateTimeUtils.GetTimeStamp();
task.LastExecuteTime = now;
//计算下一次执行时间
nextExecuteTimeSpan = GetNextDueTime(Schedule, TimeStampToDateTime(task.LastExecuteTime), TimeStampToDateTime(now));
task.NextExecuteTime = now + (long)nextExecuteTimeSpan.TotalMilliseconds;
task.ModifyTime = DateTime.Now;
await _aixScheduleTaskRepository.UpdateAsync(task);
}
catch (Exception ex)

if (task.NextExecuteTime == 0) //只有第一次且未执行时更新下即可
{
_logger.LogError(ex, $"定时任务解析出错 {task.Id},{task.TaskName},{task.Cron}");
task.NextExecuteTime = now + (long)nextExecuteTimeSpan.TotalMilliseconds;
task.ModifyTime = DateTime.Now;
await _aixScheduleTaskRepository.UpdateAsync(task);
}
}

scope.Commit();
nextExecuteDelays.Add(nextExecuteTimeSpan);
}
catch (Exception ex)
{
_logger.LogError(ex, $"定时任务解析出错 {task.Id},{task.TaskName},{task.Cron}");
}
}

var depay = nextExecuteDelays.Any() ? nextExecuteDelays.Min() : TimeSpan.FromSeconds(CrontabIntervalSecond);
if (depay > TimeSpan.FromSeconds(CrontabIntervalSecond)) depay = TimeSpan.FromSeconds(CrontabIntervalSecond);
//_logger.LogInformation($"***********delay:{depay.TotalMilliseconds}**********************");
await Task.Delay(depay);
return nextExecuteDelays;
}

private Task HandleMessage(AixScheduleTaskInfo taskInfo)
Expand Down

0 comments on commit ccaba57

Please sign in to comment.