From 8753df75ecb47e816eeb7086ea9313ffb6837e39 Mon Sep 17 00:00:00 2001 From: linzhiqiang Date: Thu, 17 Dec 2020 11:17:45 +0800 Subject: [PATCH] first --- .gitignore | 336 ++++++++++++++++++ Aix.ScheduleTask.sln | 39 ++ .../Aix.ScheduleTask.Example.csproj | 30 ++ .../Configs/DBOptions.cs | 13 + .../HostServices/StartHostService.cs | 54 +++ examples/Aix.ScheduleTask.Example/Program.cs | 30 ++ examples/Aix.ScheduleTask.Example/Startup.cs | 35 ++ .../appsettings.Production.json | 3 + .../appsettings.Staging.json | 3 + .../Aix.ScheduleTask.Example/appsettings.json | 6 + scripts/mysql.sql | 27 ++ scripts/sqlserver.sql | 46 +++ src/Aix.ScheduleTask/Aix.ScheduleTask.csproj | 16 + .../AixScheduleTaskOptions.cs | 24 ++ .../Model/DBModel/AixDistributionLock.cs | 33 ++ .../Model/DBModel/AixScheduleTaskInfo.cs | 154 ++++++++ .../Model/ScheduleTaskContext.cs | 12 + .../IAixDistributionLockRepository.cs | 13 + .../Repository/IAixScheduleTaskRepository.cs | 20 ++ .../Repository/IRepository.cs | 28 ++ .../AixDistributionLockMySqlRepository.cs | 25 ++ .../AixScheduleTaskMySqlRepository.cs | 39 ++ .../MySqlImpl/BaseMySqlRepository.cs | 37 ++ .../RepositoryImpl/SqlExecuteTrace.cs | 55 +++ .../AixDistributionLockSqlServerRepository.cs | 24 ++ .../AixScheduleTaskSqlServerRepository.cs | 37 ++ .../SqlServerImpl/BaseSqlServerRepository.cs | 37 ++ src/Aix.ScheduleTask/ScheduleTaskExecutor.cs | 260 ++++++++++++++ .../ServiceCollectionExtensions.cs | 52 +++ src/Aix.ScheduleTask/Utils/DateTimeUtils.cs | 36 ++ src/Aix.ScheduleTask/Utils/JsonUtils.cs | 45 +++ 31 files changed, 1569 insertions(+) create mode 100644 .gitignore create mode 100644 Aix.ScheduleTask.sln create mode 100644 examples/Aix.ScheduleTask.Example/Aix.ScheduleTask.Example.csproj create mode 100644 examples/Aix.ScheduleTask.Example/Configs/DBOptions.cs create mode 100644 examples/Aix.ScheduleTask.Example/HostServices/StartHostService.cs create mode 100644 examples/Aix.ScheduleTask.Example/Program.cs create mode 100644 examples/Aix.ScheduleTask.Example/Startup.cs create mode 100644 examples/Aix.ScheduleTask.Example/appsettings.Production.json create mode 100644 examples/Aix.ScheduleTask.Example/appsettings.Staging.json create mode 100644 examples/Aix.ScheduleTask.Example/appsettings.json create mode 100644 scripts/mysql.sql create mode 100644 scripts/sqlserver.sql create mode 100644 src/Aix.ScheduleTask/Aix.ScheduleTask.csproj create mode 100644 src/Aix.ScheduleTask/AixScheduleTaskOptions.cs create mode 100644 src/Aix.ScheduleTask/Model/DBModel/AixDistributionLock.cs create mode 100644 src/Aix.ScheduleTask/Model/DBModel/AixScheduleTaskInfo.cs create mode 100644 src/Aix.ScheduleTask/Model/ScheduleTaskContext.cs create mode 100644 src/Aix.ScheduleTask/Repository/IAixDistributionLockRepository.cs create mode 100644 src/Aix.ScheduleTask/Repository/IAixScheduleTaskRepository.cs create mode 100644 src/Aix.ScheduleTask/Repository/IRepository.cs create mode 100644 src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/AixDistributionLockMySqlRepository.cs create mode 100644 src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/AixScheduleTaskMySqlRepository.cs create mode 100644 src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/BaseMySqlRepository.cs create mode 100644 src/Aix.ScheduleTask/RepositoryImpl/SqlExecuteTrace.cs create mode 100644 src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/AixDistributionLockSqlServerRepository.cs create mode 100644 src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/AixScheduleTaskSqlServerRepository.cs create mode 100644 src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/BaseSqlServerRepository.cs create mode 100644 src/Aix.ScheduleTask/ScheduleTaskExecutor.cs create mode 100644 src/Aix.ScheduleTask/ServiceCollectionExtensions.cs create mode 100644 src/Aix.ScheduleTask/Utils/DateTimeUtils.cs create mode 100644 src/Aix.ScheduleTask/Utils/JsonUtils.cs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..49a36b9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,336 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. +## +## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +bld/ +[Bb]in/ +[Oo]bj/ +[Ll]og/ + +# Visual Studio 2015/2017 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# Visual Studio 2017 auto generated files +Generated\ Files/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUNIT +*.VisualState.xml +TestResult.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# Benchmark Results +BenchmarkDotNet.Artifacts/ + +# .NET Core +project.lock.json +project.fragment.lock.json +artifacts/ +**/Properties/launchSettings.json + +# StyleCop +StyleCopReport.xml + +# Files built by Visual Studio +*_i.c +*_p.c +*_i.h +*.ilk +*.meta +*.obj +*.iobj +*.pch +*.pdb +*.ipdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# Visual Studio Trace Files +*.e2e + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# JustCode is a .NET coding add-in +.JustCode + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# AxoCover is a Code Coverage Tool +.axoCover/* +!.axoCover/settings.json + +# Visual Studio code coverage results +*.coverage +*.coveragexml + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# Note: Comment the next line if you want to checkin your web deploy settings, +# but database connection strings (with potential passwords) will be unencrypted +*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# The packages folder can be ignored because of Package Restore +**/[Pp]ackages/* +# except build/, which is used as an MSBuild target. +!**/[Pp]ackages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/[Pp]ackages/repositories.config +# NuGet v3's project.json files produces more ignorable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt +*.appx + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +orleans.codegen.cs + +# Including strong name files can present a security risk +# (https://github.com/github/gitignore/pull/2483#issue-259490424) +#*.snk + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm +ServiceFabricBackup/ +*.rptproj.bak + +# SQL Server files +*.mdf +*.ldf +*.ndf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings +*.rptproj.rsuser + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat +node_modules/ + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio 6 auto-generated workspace file (contains which files were open etc.) +*.vbw + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# JetBrains Rider +.idea/ +*.sln.iml + +# CodeRush +.cr/ + +# Python Tools for Visual Studio (PTVS) +__pycache__/ +*.pyc + +# Cake - Uncomment if you are using it +# tools/** +# !tools/packages.config + +# Tabs Studio +*.tss + +# Telerik's JustMock configuration file +*.jmconfig + +# BizTalk build output +*.btp.cs +*.btm.cs +*.odx.cs +*.xsd.cs + +# OpenCover UI analysis results +OpenCover/ + +# Azure Stream Analytics local run output +ASALocalRun/ + +# MSBuild Binary and Structured Log +*.binlog + +# NVidia Nsight GPU debugger configuration file +*.nvuser + +# MFractors (Xamarin productivity tool) working folder +.mfractor/ +/Aix.MessageBus/bin +/Aix.MessageBus.Kafka/bin +/Samples/Sample/bin +/Samples/Sample/obj +/Aix.MessageBus/obj +/Aix.MessageBus.Kafka/obj diff --git a/Aix.ScheduleTask.sln b/Aix.ScheduleTask.sln new file mode 100644 index 0000000..771117f --- /dev/null +++ b/Aix.ScheduleTask.sln @@ -0,0 +1,39 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.30804.86 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{8F0ADB44-41D8-4437-9CE3-41F19E6D81D8}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aix.ScheduleTask", "src\Aix.ScheduleTask\Aix.ScheduleTask.csproj", "{B188D470-DF48-451C-8E64-3C746982230C}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{09D4E313-D377-4737-8A71-280632A51E30}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Aix.ScheduleTask.Example", "examples\Aix.ScheduleTask.Example\Aix.ScheduleTask.Example.csproj", "{06E36890-A5FB-4B8A-94FD-ACC92C4F2070}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {B188D470-DF48-451C-8E64-3C746982230C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B188D470-DF48-451C-8E64-3C746982230C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B188D470-DF48-451C-8E64-3C746982230C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B188D470-DF48-451C-8E64-3C746982230C}.Release|Any CPU.Build.0 = Release|Any CPU + {06E36890-A5FB-4B8A-94FD-ACC92C4F2070}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {06E36890-A5FB-4B8A-94FD-ACC92C4F2070}.Debug|Any CPU.Build.0 = Debug|Any CPU + {06E36890-A5FB-4B8A-94FD-ACC92C4F2070}.Release|Any CPU.ActiveCfg = Release|Any CPU + {06E36890-A5FB-4B8A-94FD-ACC92C4F2070}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {B188D470-DF48-451C-8E64-3C746982230C} = {8F0ADB44-41D8-4437-9CE3-41F19E6D81D8} + {06E36890-A5FB-4B8A-94FD-ACC92C4F2070} = {09D4E313-D377-4737-8A71-280632A51E30} + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {593DAEBE-1047-4266-9819-755065C66052} + EndGlobalSection +EndGlobal diff --git a/examples/Aix.ScheduleTask.Example/Aix.ScheduleTask.Example.csproj b/examples/Aix.ScheduleTask.Example/Aix.ScheduleTask.Example.csproj new file mode 100644 index 0000000..5a5cdf7 --- /dev/null +++ b/examples/Aix.ScheduleTask.Example/Aix.ScheduleTask.Example.csproj @@ -0,0 +1,30 @@ + + + + Exe + net5.0 + + + + + + + + + + + + + + + PreserveNewest + + + PreserveNewest + + + PreserveNewest + + + + diff --git a/examples/Aix.ScheduleTask.Example/Configs/DBOptions.cs b/examples/Aix.ScheduleTask.Example/Configs/DBOptions.cs new file mode 100644 index 0000000..9c94b8b --- /dev/null +++ b/examples/Aix.ScheduleTask.Example/Configs/DBOptions.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.Example.Configs +{ + public class DBOptions + { + public string Master { get; set; } + } +} diff --git a/examples/Aix.ScheduleTask.Example/HostServices/StartHostService.cs b/examples/Aix.ScheduleTask.Example/HostServices/StartHostService.cs new file mode 100644 index 0000000..a7d0ec9 --- /dev/null +++ b/examples/Aix.ScheduleTask.Example/HostServices/StartHostService.cs @@ -0,0 +1,54 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.Example.HostServices +{ + public class StartHostService : IHostedService + { + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + private readonly IHostEnvironment _hostEnvironment; + private readonly IScheduleTaskExecutor _scheduleTaskExecutor; + + public StartHostService(ILogger logger, IServiceProvider serviceProvider, IHostEnvironment hostEnvironment + , IScheduleTaskExecutor scheduleTaskExecutor) + { + _logger = logger; + _serviceProvider = serviceProvider; + _hostEnvironment = hostEnvironment; + + #region 定时服务相关 + + _scheduleTaskExecutor = scheduleTaskExecutor; + //注册定时任务事件 + _scheduleTaskExecutor.OnHandleMessage += _scheduleTaskExecutor_OnHandleMessage; + + #endregion + + } + public Task StartAsync(CancellationToken cancellationToken) + { + _scheduleTaskExecutor.Start(); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + private async Task _scheduleTaskExecutor_OnHandleMessage(Model.ScheduleTaskContext arg) + { + //Console.WriteLine($"执行任务:{arg.ExecutorParam}"); + _logger.LogInformation( $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff")}执行任务:{arg.ExecutorParam}"); + // await Task.Delay(TimeSpan.FromSeconds(5)); + await Task.CompletedTask; + } + } +} diff --git a/examples/Aix.ScheduleTask.Example/Program.cs b/examples/Aix.ScheduleTask.Example/Program.cs new file mode 100644 index 0000000..9edff90 --- /dev/null +++ b/examples/Aix.ScheduleTask.Example/Program.cs @@ -0,0 +1,30 @@ +using Microsoft.Extensions.Hosting; +using System; + +namespace Aix.ScheduleTask.Example +{ + class Program + { + static void Main(string[] args) + { + CreateHostBuilder(args).Build().Run(); + } + + public static IHostBuilder CreateHostBuilder(string[] args) + { + return Host.CreateDefaultBuilder(args) + .ConfigureHostConfiguration(configurationBuilder => + { + }) + .ConfigureAppConfiguration((hostBulderContext, configurationBuilder) => + { + + }) + .ConfigureLogging((hostBulderContext, loggingBuilder) => + { + + }) + .ConfigureServices(Startup.ConfigureServices); + } + } +} diff --git a/examples/Aix.ScheduleTask.Example/Startup.cs b/examples/Aix.ScheduleTask.Example/Startup.cs new file mode 100644 index 0000000..9f70419 --- /dev/null +++ b/examples/Aix.ScheduleTask.Example/Startup.cs @@ -0,0 +1,35 @@ +using Aix.ScheduleTask.Example.Configs; +using Aix.ScheduleTask.Example.HostServices; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.Example +{ + public class Startup + { + internal static void ConfigureServices(HostBuilderContext context, IServiceCollection services) + { + var dbOption = context.Configuration.GetSection("connectionStrings").Get(); + + #region 定时服务相关 + + var options = new AixScheduleTaskOptions + { + Master = dbOption.Master, + DBType =2 + }; + services.AddScheduleTask(options); + + #endregion + + //入口服务 + services.AddHostedService(); + } + } +} diff --git a/examples/Aix.ScheduleTask.Example/appsettings.Production.json b/examples/Aix.ScheduleTask.Example/appsettings.Production.json new file mode 100644 index 0000000..077404a --- /dev/null +++ b/examples/Aix.ScheduleTask.Example/appsettings.Production.json @@ -0,0 +1,3 @@ +{ + +} \ No newline at end of file diff --git a/examples/Aix.ScheduleTask.Example/appsettings.Staging.json b/examples/Aix.ScheduleTask.Example/appsettings.Staging.json new file mode 100644 index 0000000..c1860fa --- /dev/null +++ b/examples/Aix.ScheduleTask.Example/appsettings.Staging.json @@ -0,0 +1,3 @@ +{ + +} \ No newline at end of file diff --git a/examples/Aix.ScheduleTask.Example/appsettings.json b/examples/Aix.ScheduleTask.Example/appsettings.json new file mode 100644 index 0000000..922651a --- /dev/null +++ b/examples/Aix.ScheduleTask.Example/appsettings.json @@ -0,0 +1,6 @@ +{ + "connectionStrings": { + "master2": "Server=.;Database=netcoredemo;User ID=sa;Password=Sa123456;Max Pool Size=1000;Packet Size=32768;MultipleActiveResultSets=true;", + "master": "server=192.168.102.108;port=3306;database=demo;uid=admin;pwd=CgTbMMiaWntK2#;charset=utf8mb4;Connection Timeout=18000;SslMode=none;" + } +} \ No newline at end of file diff --git a/scripts/mysql.sql b/scripts/mysql.sql new file mode 100644 index 0000000..279d727 --- /dev/null +++ b/scripts/mysql.sql @@ -0,0 +1,27 @@ +create table `aix_distribution_lock` +( + `lock_name` VARCHAR(50) not null comment '' +) comment 'ֲʽ'; +alter table `aix_distribution_lock` + add constraint `PK_aix_disock_lock_nameE2BC` primary key (`lock_name`); + + + insert into aix_distribution_lock(lock_name) values('ScheduleTaskLock'); + + + create table `aix_schedule_task_info` +( + `id` INT auto_increment primary key not null comment '', + `status` TINYINT default 0 not null comment '״̬ 0= 1=', + `task_name` VARCHAR(50) not null comment '', + `task_desc` VARCHAR(200) comment '', + `cron` VARCHAR(50) not null comment 'ʱʽ', + `executor_param` VARCHAR(500) comment 'ִв', + `last_execute_time` BIGINT default 0 not null comment 'ϴִʱ', + `next_execute_time` BIGINT default 0 not null comment '´ִʱ', + `max_retry_count` INT default 0 not null comment 'Դ 0=', + `creator_id` VARCHAR(50) not null comment '˱', + `create_time` DATETIME default now() not null comment '', + `modifier_id` VARCHAR(50) not null comment '޸˱', + `modify_time` DATETIME default now() not null comment '޸' +) comment 'ʱ'; \ No newline at end of file diff --git a/scripts/sqlserver.sql b/scripts/sqlserver.sql new file mode 100644 index 0000000..372a8e5 --- /dev/null +++ b/scripts/sqlserver.sql @@ -0,0 +1,46 @@ +create table aix_distribution_lock +( + lock_name VARCHAR(50) not null /**/ +); +alter table aix_distribution_lock + add constraint PK_aix_disock_lock_nameE2BC primary key (lock_name); +EXEC sp_addextendedproperty 'MS_Description', 'ֲʽ', 'user', dbo, 'table', aix_distribution_lock, NULL, NULL; +EXEC sp_addextendedproperty 'MS_Description', '', 'user', dbo, 'table', aix_distribution_lock, 'column', lock_name; + + +insert into aix_distribution_lock(lock_name) values('ScheduleTaskLock'); + + + +create table aix_schedule_task_info +( + id INT identity(1, 1) not null /**/, + status TINYINT default 0 not null /*״̬ 0= 1=*/, + task_name VARCHAR(50) not null /**/, + task_desc VARCHAR(200) /**/, + cron VARCHAR(50) not null /*ʱʽ*/, + executor_param VARCHAR(500) /*ִв*/, + last_execute_time BIGINT default 0 not null /*ϴִʱ*/, + next_execute_time BIGINT default 0 not null /*´ִʱ*/, + max_retry_count INT default 0 not null /*Դ 0=*/, + creator_id VARCHAR(50) not null /*˱*/, + create_time DATETIME default getdate() not null /**/, + modifier_id VARCHAR(50) not null /*޸˱*/, + modify_time DATETIME default getdate() not null /*޸*/ +); +alter table aix_schedule_task_info + add constraint PK_aix_schnfo_id5BDF primary key (id); +EXEC sp_addextendedproperty 'MS_Description', 'ʱ', 'user', dbo, 'table', aix_schedule_task_info, NULL, NULL; +EXEC sp_addextendedproperty 'MS_Description', '', 'user', dbo, 'table', aix_schedule_task_info, 'column', id; +EXEC sp_addextendedproperty 'MS_Description', '״̬ 0= 1=', 'user', dbo, 'table', aix_schedule_task_info, 'column', status; +EXEC sp_addextendedproperty 'MS_Description', '', 'user', dbo, 'table', aix_schedule_task_info, 'column', task_name; +EXEC sp_addextendedproperty 'MS_Description', '', 'user', dbo, 'table', aix_schedule_task_info, 'column', task_desc; +EXEC sp_addextendedproperty 'MS_Description', 'ʱʽ', 'user', dbo, 'table', aix_schedule_task_info, 'column', cron; +EXEC sp_addextendedproperty 'MS_Description', 'ִв', 'user', dbo, 'table', aix_schedule_task_info, 'column', executor_param; +EXEC sp_addextendedproperty 'MS_Description', 'ϴִʱ', 'user', dbo, 'table', aix_schedule_task_info, 'column', last_execute_time; +EXEC sp_addextendedproperty 'MS_Description', '´ִʱ', 'user', dbo, 'table', aix_schedule_task_info, 'column', next_execute_time; +EXEC sp_addextendedproperty 'MS_Description', 'Դ 0=', 'user', dbo, 'table', aix_schedule_task_info, 'column', max_retry_count; +EXEC sp_addextendedproperty 'MS_Description', '˱', 'user', dbo, 'table', aix_schedule_task_info, 'column', creator_id; +EXEC sp_addextendedproperty 'MS_Description', '', 'user', dbo, 'table', aix_schedule_task_info, 'column', create_time; +EXEC sp_addextendedproperty 'MS_Description', '޸˱', 'user', dbo, 'table', aix_schedule_task_info, 'column', modifier_id; +EXEC sp_addextendedproperty 'MS_Description', '޸', 'user', dbo, 'table', aix_schedule_task_info, 'column', modify_time; \ No newline at end of file diff --git a/src/Aix.ScheduleTask/Aix.ScheduleTask.csproj b/src/Aix.ScheduleTask/Aix.ScheduleTask.csproj new file mode 100644 index 0000000..1245051 --- /dev/null +++ b/src/Aix.ScheduleTask/Aix.ScheduleTask.csproj @@ -0,0 +1,16 @@ + + + + netstandard2.0 + 分布式定时任务:数据库实现支持 sqlserver、mysql + + + + + + + + + + + diff --git a/src/Aix.ScheduleTask/AixScheduleTaskOptions.cs b/src/Aix.ScheduleTask/AixScheduleTaskOptions.cs new file mode 100644 index 0000000..d8a6d4f --- /dev/null +++ b/src/Aix.ScheduleTask/AixScheduleTaskOptions.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Aix.ScheduleTask +{ + public class AixScheduleTaskOptions + { + /// + /// 数据库连接字符串 + /// + public string Master { get; set; } + + /// + /// 数据库类型 1=SqlServer(默认值) 2=Mysql + /// + public int DBType { get; set; } = 1; + + /// + ///执行间隔时间(秒) 默认30秒 范围[5,30] + /// + public int CrontabIntervalSecond { get; set; } = 30; + } +} diff --git a/src/Aix.ScheduleTask/Model/DBModel/AixDistributionLock.cs b/src/Aix.ScheduleTask/Model/DBModel/AixDistributionLock.cs new file mode 100644 index 0000000..e3b42aa --- /dev/null +++ b/src/Aix.ScheduleTask/Model/DBModel/AixDistributionLock.cs @@ -0,0 +1,33 @@ +/* +该文件为自动生成,不要修改。 +生成时间:2020-08-27 15:29:28。 +*/ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Aix.ORM; + +namespace Aix.ScheduleTask.Model +{ + /// + /// 分布式锁 + /// + [Table("aix_distribution_lock")] + public partial class AixDistributionLock : BaseEntity + { + private string _lock_name; + + /// + /// 主键 varchar(50) + /// + [Column("lock_name",IsNullable=false)] + [PrimaryKey] + public string LockName + { + get { return _lock_name; } + set { _lock_name = value; OnPropertyChanged("lock_name"); } + } + } + +} \ No newline at end of file diff --git a/src/Aix.ScheduleTask/Model/DBModel/AixScheduleTaskInfo.cs b/src/Aix.ScheduleTask/Model/DBModel/AixScheduleTaskInfo.cs new file mode 100644 index 0000000..67b8a62 --- /dev/null +++ b/src/Aix.ScheduleTask/Model/DBModel/AixScheduleTaskInfo.cs @@ -0,0 +1,154 @@ +/* +该文件为自动生成,不要修改。 +生成时间:2020-08-27 15:29:28。 +*/ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Aix.ORM; + +namespace Aix.ScheduleTask.Model +{ + /// + /// 定时任务 + /// + [Table("aix_schedule_task_info")] + public partial class AixScheduleTaskInfo : BaseEntity + { + private int _id; + private byte _status; + private string _task_name; + private string _task_desc; + private string _cron; + private string _executor_param; + private long _last_execute_time; + private long _next_execute_time; + private int _max_retry_count; + private string _creator_id; + private DateTime _create_time; + private string _modifier_id; + private DateTime _modify_time; + + /// + /// 主键 int(4) + /// + [Column("id",IsNullable=false)] + [PrimaryKey] + [Identity] + public int Id + { + get { return _id; } + set { _id = value; OnPropertyChanged("id"); } + } + /// + /// 状态 0=禁用 1=启动 tinyint(1) + /// + [Column("status",IsNullable=false,DefaultValue="0")] + public byte Status + { + get { return _status; } + set { _status = value; OnPropertyChanged("status"); } + } + /// + /// 任务名称 varchar(50) + /// + [Column("task_name",IsNullable=false)] + public string TaskName + { + get { return _task_name; } + set { _task_name = value; OnPropertyChanged("task_name"); } + } + /// + /// 任务描述 varchar(200) + /// + [Column("task_desc",IsNullable=true)] + public string TaskDesc + { + get { return _task_desc; } + set { _task_desc = value; OnPropertyChanged("task_desc"); } + } + /// + /// 定时表达式 varchar(50) + /// + [Column("cron",IsNullable=false)] + public string Cron + { + get { return _cron; } + set { _cron = value; OnPropertyChanged("cron"); } + } + /// + /// 执行参数 varchar(500) + /// + [Column("executor_param",IsNullable=true)] + public string ExecutorParam + { + get { return _executor_param; } + set { _executor_param = value; OnPropertyChanged("executor_param"); } + } + /// + /// 上次执行时间 bigint(8) + /// + [Column("last_execute_time",IsNullable=false,DefaultValue="0")] + public long LastExecuteTime + { + get { return _last_execute_time; } + set { _last_execute_time = value; OnPropertyChanged("last_execute_time"); } + } + /// + /// 下次执行时间 bigint(8) + /// + [Column("next_execute_time",IsNullable=false,DefaultValue="0")] + public long NextExecuteTime + { + get { return _next_execute_time; } + set { _next_execute_time = value; OnPropertyChanged("next_execute_time"); } + } + /// + /// 最大重试次数 0=不重试 int(4) + /// + [Column("max_retry_count",IsNullable=false,DefaultValue="0")] + public int MaxRetryCount + { + get { return _max_retry_count; } + set { _max_retry_count = value; OnPropertyChanged("max_retry_count"); } + } + /// + /// 创建人编号 varchar(50) + /// + [Column("creator_id",IsNullable=false)] + public string CreatorId + { + get { return _creator_id; } + set { _creator_id = value; OnPropertyChanged("creator_id"); } + } + /// + /// 创建日期 datetime(8) + /// + [Column("create_time",IsNullable=false,DefaultValue="getdate()")] + public DateTime CreateTime + { + get { return _create_time; } + set { _create_time = value; OnPropertyChanged("create_time"); } + } + /// + /// 修改人编号 varchar(50) + /// + [Column("modifier_id",IsNullable=false)] + public string ModifierId + { + get { return _modifier_id; } + set { _modifier_id = value; OnPropertyChanged("modifier_id"); } + } + /// + /// 修改日期 datetime(8) + /// + [Column("modify_time",IsNullable=false,DefaultValue="getdate()")] + public DateTime ModifyTime + { + get { return _modify_time; } + set { _modify_time = value; OnPropertyChanged("modify_time"); } + } + } + +} \ No newline at end of file diff --git a/src/Aix.ScheduleTask/Model/ScheduleTaskContext.cs b/src/Aix.ScheduleTask/Model/ScheduleTaskContext.cs new file mode 100644 index 0000000..7790f60 --- /dev/null +++ b/src/Aix.ScheduleTask/Model/ScheduleTaskContext.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Aix.ScheduleTask.Model +{ + public class ScheduleTaskContext + { + public int Id { get; set; } + public string ExecutorParam { get; set; } + } +} diff --git a/src/Aix.ScheduleTask/Repository/IAixDistributionLockRepository.cs b/src/Aix.ScheduleTask/Repository/IAixDistributionLockRepository.cs new file mode 100644 index 0000000..1e0e2a5 --- /dev/null +++ b/src/Aix.ScheduleTask/Repository/IAixDistributionLockRepository.cs @@ -0,0 +1,13 @@ +using Aix.ScheduleTask.Model; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.Repository +{ + public interface IAixDistributionLockRepository:IRepository + { + Task Get(string lockName); + } +} diff --git a/src/Aix.ScheduleTask/Repository/IAixScheduleTaskRepository.cs b/src/Aix.ScheduleTask/Repository/IAixScheduleTaskRepository.cs new file mode 100644 index 0000000..61fa437 --- /dev/null +++ b/src/Aix.ScheduleTask/Repository/IAixScheduleTaskRepository.cs @@ -0,0 +1,20 @@ +using Aix.ORM; +using Aix.ORM.Common; +using Aix.ORM.DbTransactionManager; +using Aix.ScheduleTask.Model; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.Repository +{ + public interface IAixScheduleTaskRepository:IRepository + { + Task> PageQuery(PageView pageView); + + Task> QueryAllEnabled(long nextExecuteTime); + } + + +} diff --git a/src/Aix.ScheduleTask/Repository/IRepository.cs b/src/Aix.ScheduleTask/Repository/IRepository.cs new file mode 100644 index 0000000..9faa443 --- /dev/null +++ b/src/Aix.ScheduleTask/Repository/IRepository.cs @@ -0,0 +1,28 @@ +using Aix.ORM; +using Aix.ORM.DbTransactionManager; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.Repository +{ + /// + /// 公共接口 + /// + public interface IRepository + { + IDBTransScope BeginTransScope(TransScopeOption scopeOption = TransScopeOption.Required); + + void OpenNewContext(); + + Task UseLock(string lockName, int commandTimeout = 300); + + Task InsertAsync(BaseEntity entity); + + Task BatchInsertAsync(List list) where T : BaseEntity; + Task UpdateAsync(BaseEntity model); + + Task BatchUpdateAsync(List list) where T : BaseEntity; + } +} diff --git a/src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/AixDistributionLockMySqlRepository.cs b/src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/AixDistributionLockMySqlRepository.cs new file mode 100644 index 0000000..f2b8ad5 --- /dev/null +++ b/src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/AixDistributionLockMySqlRepository.cs @@ -0,0 +1,25 @@ +using Aix.ScheduleTask.Model; +using Aix.ScheduleTask.Repository; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.RepositoryImpl +{ + public class AixDistributionLockMySqlRepository : BaseMySqlRepository, IAixDistributionLockRepository + { + + readonly string AllColumns = " lock_name "; + public AixDistributionLockMySqlRepository(IServiceProvider provider, AixScheduleTaskOptions options) : base(provider, options.Master) + { + + } + + public Task Get(string lockName) + { + string sql =$"SELECT {AllColumns} FROM aix_distribution_lock WHERE lock_name = @lockName "; + return base.GetAsync(sql,new { lockName }); + } + } +} diff --git a/src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/AixScheduleTaskMySqlRepository.cs b/src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/AixScheduleTaskMySqlRepository.cs new file mode 100644 index 0000000..b08161e --- /dev/null +++ b/src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/AixScheduleTaskMySqlRepository.cs @@ -0,0 +1,39 @@ +using Aix.ORM.Common; +using Aix.ScheduleTask.Model; +using Aix.ScheduleTask.Repository; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.RepositoryImpl +{ + public class AixScheduleTaskMySqlRepository : BaseMySqlRepository,IAixScheduleTaskRepository + { + public AixScheduleTaskMySqlRepository(IServiceProvider provider, AixScheduleTaskOptions options) : base(provider, options.Master) + { + + } + + public async Task> PageQuery(PageView pageView) + { + var column = GetAllColumns(); + var table = "aix_schedule_task_info"; + + var sqlCondition = new StringBuilder(); + sqlCondition.Append(" AND status=1 "); + string sqlOrder = " ORDER BY id ASC "; + + return await base.PagedQueryAsync(pageView, column, table, sqlCondition.ToString(), null, sqlOrder); + } + + public Task> QueryAllEnabled(long nextExecuteTime) + { + var column = GetAllColumns(); + var sql = $"SELECT {column} FROM aix_schedule_task_info WHERE status=1 AND next_execute_time<=@nextExecuteTime ORDER BY id "; + return base.QueryAsync(sql, new { nextExecuteTime }); + } + + + } +} diff --git a/src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/BaseMySqlRepository.cs b/src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/BaseMySqlRepository.cs new file mode 100644 index 0000000..cab9081 --- /dev/null +++ b/src/Aix.ScheduleTask/RepositoryImpl/MySqlImpl/BaseMySqlRepository.cs @@ -0,0 +1,37 @@ +using Aix.ORM; +using Aix.ORM.Repository; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.RepositoryImpl +{ + public abstract class BaseMySqlRepository : MySqlRepository + { + + protected IServiceProvider _provider; + public BaseMySqlRepository(IServiceProvider provider, string connectionStrings) : base(connectionStrings) + { + _provider = provider; + } + + protected override AbstractSqlExecuteTrace GetSqlExecuteTrace(string sql, object paras) + { + return new SqlExecuteTrace(sql, paras, _provider); + } + + /// + /// 开启分布式锁,跟着当前事务结束而结束 mysql + /// + /// 请确保数据库中已存在该lockName + /// 超时时间 单位 秒 + /// + public Task UseLock(string lockName, int commandTimeout = 300) + { + string sql = "select lock_name from aix_distribution_lock where lock_name=@lockName for update"; + return base.ExecuteScalarAsync(sql, new { lockName }, commandTimeout); + } + + } +} diff --git a/src/Aix.ScheduleTask/RepositoryImpl/SqlExecuteTrace.cs b/src/Aix.ScheduleTask/RepositoryImpl/SqlExecuteTrace.cs new file mode 100644 index 0000000..424db53 --- /dev/null +++ b/src/Aix.ScheduleTask/RepositoryImpl/SqlExecuteTrace.cs @@ -0,0 +1,55 @@ +using Aix.ORM; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using Aix.ScheduleTask.Utils; + +namespace Aix.ScheduleTask.RepositoryImpl +{ + /// + /// sql执行 跟踪 + /// + public class SqlExecuteTrace : AbstractSqlExecuteTrace + { + protected IServiceProvider _provider; + private ILogger _logger; + private Stopwatch _stopwatch; + + public SqlExecuteTrace(string sql, object paras, IServiceProvider provider) : base(sql, paras) + { + _provider = provider; + _logger = provider.GetService>(); + } + public override void ExecuteStart() + { + _stopwatch = Stopwatch.StartNew(); + } + + public override void ExecuteException(Exception ex) + { + + _logger.LogError(ex, "SQL Error, SQL={0},params = {1}", Sql, JsonUtils.ToJson(Param)); + } + public override void ExecuteEnd() + { + _stopwatch.Stop(); + + var totalTime = _stopwatch.ElapsedMilliseconds; + + if (totalTime > 500) + { + _logger.LogWarning("SQL Warning:total time in {0} ms,SQL={1},params = {2}", totalTime, Sql, JsonUtils.ToJson(Param)); + } + else + { + _logger.LogDebug("SQL Debug:total time in {0} ms,SQL={1},params = {2}", totalTime, Sql, JsonUtils.ToJson(Param)); + } + } + + + + } +} diff --git a/src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/AixDistributionLockSqlServerRepository.cs b/src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/AixDistributionLockSqlServerRepository.cs new file mode 100644 index 0000000..80e53d8 --- /dev/null +++ b/src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/AixDistributionLockSqlServerRepository.cs @@ -0,0 +1,24 @@ +using Aix.ScheduleTask.Model; +using Aix.ScheduleTask.Repository; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.RepositoryImpl +{ + public class AixDistributionLockSqlServerRepository : BaseSqlServerRepository, IAixDistributionLockRepository + { + readonly string AllColumns = " lock_name "; + public AixDistributionLockSqlServerRepository(IServiceProvider provider, AixScheduleTaskOptions options) : base(provider, options.Master) + { + + } + + public Task Get(string lockName) + { + string sql = $"SELECT {AllColumns} FROM aix_distribution_lock WHERE lock_name = @lockName "; + return base.GetAsync(sql, new { lockName }); + } + } +} diff --git a/src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/AixScheduleTaskSqlServerRepository.cs b/src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/AixScheduleTaskSqlServerRepository.cs new file mode 100644 index 0000000..7b7b728 --- /dev/null +++ b/src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/AixScheduleTaskSqlServerRepository.cs @@ -0,0 +1,37 @@ +using Aix.ORM.Common; +using Aix.ScheduleTask.Model; +using Aix.ScheduleTask.Repository; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.RepositoryImpl +{ + public class AixScheduleTaskSqlServerRepository : BaseSqlServerRepository, IAixScheduleTaskRepository + { + public AixScheduleTaskSqlServerRepository(IServiceProvider provider, AixScheduleTaskOptions options) : base(provider, options.Master) + { + + } + + public async Task> PageQuery(PageView pageView) + { + var column = GetAllColumns(); + var table = "aix_schedule_task_info"; + + var sqlCondition = new StringBuilder(); + sqlCondition.Append(" AND status=1 "); + string sqlOrder = " ORDER BY id ASC "; + + return await base.PagedQueryAsync(pageView, column, table, sqlCondition.ToString(), null, sqlOrder); + } + + public Task> QueryAllEnabled(long nextExecuteTime) + { + var column = GetAllColumns(); + var sql = $"SELECT {column} FROM aix_schedule_task_info WHERE status=1 AND next_execute_time<=@nextExecuteTime ORDER BY id "; + return base.QueryAsync(sql, new { nextExecuteTime }); + } + } +} diff --git a/src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/BaseSqlServerRepository.cs b/src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/BaseSqlServerRepository.cs new file mode 100644 index 0000000..9da3398 --- /dev/null +++ b/src/Aix.ScheduleTask/RepositoryImpl/SqlServerImpl/BaseSqlServerRepository.cs @@ -0,0 +1,37 @@ +using Aix.ORM; +using Aix.ORM.Repository; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask.RepositoryImpl +{ + public abstract class BaseSqlServerRepository : MsSqlRepository + { + protected IServiceProvider _provider; + public BaseSqlServerRepository(IServiceProvider provider, string connectionStrings) : base(connectionStrings) + { + _provider = provider; + } + + protected override AbstractSqlExecuteTrace GetSqlExecuteTrace(string sql, object paras) + { + return new SqlExecuteTrace(sql, paras, _provider); + } + + /// + /// 开启分布式锁,跟着当前事务结束而结束 sqlserver + /// + /// 请确保数据库中已存在该lockName + /// 超时时间 单位 秒 + /// + public Task UseLock(string lockName, int commandTimeout = 300) + { + string sql = "select lock_name from aix_distribution_lock with (rowlock,UpdLock) where lock_name=@lockName "; + return base.ExecuteScalarAsync(sql, new { lockName }, commandTimeout); + } + + + } +} diff --git a/src/Aix.ScheduleTask/ScheduleTaskExecutor.cs b/src/Aix.ScheduleTask/ScheduleTaskExecutor.cs new file mode 100644 index 0000000..77c2670 --- /dev/null +++ b/src/Aix.ScheduleTask/ScheduleTaskExecutor.cs @@ -0,0 +1,260 @@ +using Aix.ScheduleTask.Model; +using Aix.ScheduleTask.Repository; +using Aix.ScheduleTask.Utils; +using Microsoft.Extensions.Logging; +using NCrontab; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Aix.ScheduleTask +{ + /* + * * * * * * + sec (0 - 59) + min (0 - 59) + hour (0 - 23) + day of month (1 - 31) + month (1 - 12) + day of week (0 - 6) (Sunday=0) + + 比如每天9点半执行,如果9点半这个时刻你停了一会,9点半后重新启动, 这时会重新执行一次的。 + */ + + public interface IScheduleTaskExecutor : IDisposable + { + Task Start(); + + event Func OnHandleMessage; + } + + /// + /// 定时任务执行器 + /// + public class ScheduleTaskExecutor : IScheduleTaskExecutor + { + ILogger _logger; + private readonly IAixScheduleTaskRepository _aixScheduleTaskRepository; + private readonly IAixDistributionLockRepository _aixDistributionLockRepository; + private readonly AixScheduleTaskOptions _options; + + private ConcurrentDictionary CrontabScheduleCache = new ConcurrentDictionary(); + private volatile bool _isStart = false; + private int CrontabIntervalSecond = 30; //没有数据时等待时间 + public event Func OnHandleMessage; + + readonly string ScheduleTaskLock = "ScheduleTaskLock"; + + public ScheduleTaskExecutor(ILogger logger, + AixScheduleTaskOptions options, + IAixScheduleTaskRepository aixScheduleTaskRepository, + IAixDistributionLockRepository aixDistributionLockRepository + ) + { + _logger = logger; + _options = options; + CrontabIntervalSecond = _options.CrontabIntervalSecond; + _aixScheduleTaskRepository = aixScheduleTaskRepository; + _aixDistributionLockRepository = aixDistributionLockRepository; + } + + public Task Start() + { + if (_isStart) return Task.CompletedTask; + lock (this) + { + if (_isStart) return Task.CompletedTask; + _isStart = true; + } + _logger.LogInformation("开始执行定时任务......"); + Task.Factory.StartNew(async () => + { + try + { + await InnerStart(); + } + catch (Exception ex) + { + _logger.LogError(ex, "定时任务启动出错"); + } + + }, TaskCreationOptions.LongRunning); + + return Task.CompletedTask; + } + + private async Task InnerStart() + { + await Init(); + while (_isStart) + { + try + { + await Execute(); + } + catch (Exception ex) + { + _logger.LogError(ex, "定时任务执行出错"); + await Task.Delay(TimeSpan.FromSeconds(5)); + } + + } + } + + private async Task Execute() + { + List nextExecuteDelays = new List(); //记录每个任务的下次执行时间,取最小的等待 + + using (var scope = _aixScheduleTaskRepository.BeginTransScope()) + { + await _aixScheduleTaskRepository.UseLock(ScheduleTaskLock, 300); + var now = DateTimeUtils.GetTimeStamp(); + var taskList = await _aixScheduleTaskRepository.QueryAllEnabled(CrontabIntervalSecond * 1000 + now); + //处理 + foreach (var task in taskList) + { + if (!_isStart) break; + try + { + var Schedule = ParseCron(task.Cron); + if (task.LastExecuteTime == 0) task.LastExecuteTime = now; + var nextExecuteTimeSpan = GetNextDueTime(Schedule, TimeStampToDateTime(task.LastExecuteTime), TimeStampToDateTime(now)); + if (nextExecuteTimeSpan.TotalMilliseconds <= 0) //时间到了,开始执行任务 + { + await HandleMessage(task); //建议插入任务队列 + + now = DateTimeUtils.GetTimeStamp(); + task.LastExecuteTime = now; + //计算下一次执行时间 + nextExecuteTimeSpan = GetNextDueTime(Schedule, TimeStampToDateTime(task.LastExecuteTime), TimeStampToDateTime(now)); + task.NextExecuteTime = now + (long)nextExecuteTimeSpan.TotalMilliseconds; + task.ModifyTime = DateTime.Now; + await _aixScheduleTaskRepository.UpdateAsync(task); + } + + if (task.NextExecuteTime == 0) //只有第一次且未执行时更新下即可 + { + task.NextExecuteTime = now + (long)nextExecuteTimeSpan.TotalMilliseconds; + task.ModifyTime = DateTime.Now; + await _aixScheduleTaskRepository.UpdateAsync(task); + } + + nextExecuteDelays.Add(nextExecuteTimeSpan); + } + catch (Exception ex) + { + _logger.LogError(ex, $"定时任务解析出错 {task.Id},{task.TaskName},{task.Cron}"); + } + } + + scope.Commit(); + } + + var depay = nextExecuteDelays.Any() ? nextExecuteDelays.Min() : TimeSpan.FromSeconds(CrontabIntervalSecond); + if (depay > TimeSpan.FromSeconds(CrontabIntervalSecond)) depay = TimeSpan.FromSeconds(CrontabIntervalSecond); + //_logger.LogInformation($"***********delay:{depay.TotalMilliseconds}**********************"); + await Task.Delay(depay); + } + + private Task HandleMessage(AixScheduleTaskInfo taskInfo) + { + if (OnHandleMessage == null) return Task.CompletedTask; + //把线程队列引用过来,根据id进入不同的线程队列,保证串行执行 + + // _logger.LogDebug($"执行定时任务:{taskInfo.Id},{taskInfo.TaskName},{taskInfo.ExecutorParam}"); + Task.Run(async () => + { + //_aixScheduleTaskRepository.OpenNewContext(); + try + { + await OnHandleMessage(new ScheduleTaskContext { Id = taskInfo.Id, ExecutorParam = taskInfo.ExecutorParam }); + } + catch (Exception ex) + { + _logger.LogError(ex, $"定时任务执行出错 {taskInfo.Id},{taskInfo.TaskName},{taskInfo.ExecutorParam}"); + } + + }); + + return Task.CompletedTask; + } + + public void Dispose() + { + if (!_isStart) return; + + lock (this) + { + if (!_isStart) return; + + _isStart = false; + + } + _logger.LogInformation("结束执行定时任务......"); + } + + #region private + + private CrontabSchedule ParseCron(string cron) + { + CrontabSchedule result; + if (CrontabScheduleCache.TryGetValue(cron, out result)) + { + return result; + } + var options = new CrontabSchedule.ParseOptions + { + IncludingSeconds = cron.Split(' ').Length > 5, + }; + result = CrontabSchedule.Parse(cron, options); + CrontabScheduleCache.TryAdd(cron, result); + return result; + } + + private static TimeSpan GetNextDueTime(CrontabSchedule Schedule, DateTime LastDueTime, DateTime now) + { + var nextOccurrence = Schedule.GetNextOccurrence(LastDueTime); + TimeSpan dueTime = nextOccurrence - now;// DateTime.Now; + + if (dueTime.TotalMilliseconds <= 0) + { + dueTime = TimeSpan.Zero; + } + + return dueTime; + } + + + + /// + /// 时间戳转时间 + /// + /// + /// + private static DateTime TimeStampToDateTime(long timestamp) + { + return DateTimeUtils.TimeStampToDateTime(timestamp); + } + + private async Task Init() + { + try + { + var model = await _aixDistributionLockRepository.Get(ScheduleTaskLock); + if (model == null) + { + await _aixDistributionLockRepository.InsertAsync(new AixDistributionLock { LockName = ScheduleTaskLock }); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "定时任务初始化出错"); + throw; + } + } + + #endregion + } +} diff --git a/src/Aix.ScheduleTask/ServiceCollectionExtensions.cs b/src/Aix.ScheduleTask/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..d3869a7 --- /dev/null +++ b/src/Aix.ScheduleTask/ServiceCollectionExtensions.cs @@ -0,0 +1,52 @@ +using Aix.ORM; +using Aix.ScheduleTask.Repository; +using Aix.ScheduleTask.RepositoryImpl; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Aix.ScheduleTask +{ + public static class ServiceCollectionExtensions + { + public static IServiceCollection AddScheduleTask(this IServiceCollection services, AixScheduleTaskOptions options) + { + Dapper.DefaultTypeMap.MatchNamesWithUnderscores = true; + Dapper.SqlMapper.Settings.CommandTimeout = 30;//秒 + //ORMSettings.SetConnectionFactory(new DBConnectionFactory()); + + ValidOptions(options); + services.AddSingleton(options); + if (options.DBType == 1) + { + services.AddSingleton(); + services.AddSingleton(); + } + else if (options.DBType == 2) + { + services.AddSingleton(); + services.AddSingleton(); + } + else + { + throw new Exception("请配置DBType,1=SqlServer(默认值) 2=Mysql "); + } + + services.AddSingleton(); + return services; + } + + private static void ValidOptions(AixScheduleTaskOptions options) + { + if (options == null) throw new Exception("请配置options参数"); + if (string.IsNullOrEmpty(options.Master)) throw new Exception("请配置options.Master"); + + if (options.CrontabIntervalSecond <= 0) throw new Exception("配置options.CrontabIntervalSecond 非法"); + if (options.CrontabIntervalSecond < 5) options.CrontabIntervalSecond = 5; + if (options.CrontabIntervalSecond > 30) options.CrontabIntervalSecond = 30; + } + } + + +} diff --git a/src/Aix.ScheduleTask/Utils/DateTimeUtils.cs b/src/Aix.ScheduleTask/Utils/DateTimeUtils.cs new file mode 100644 index 0000000..a9d65e7 --- /dev/null +++ b/src/Aix.ScheduleTask/Utils/DateTimeUtils.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Aix.ScheduleTask.Utils +{ + internal static class DateTimeUtils + { + public static long GetTimeStamp() + { + return GetTimeStamp(DateTime.Now); + } + + public static long GetTimeStamp(DateTime now) + { + DateTime theDate = now; + DateTime d1 = new DateTime(1970, 1, 1); + DateTime d2 = theDate.ToUniversalTime(); + TimeSpan ts = new TimeSpan(d2.Ticks - d1.Ticks); + return (long)ts.TotalMilliseconds; + + } + + /// + /// 时间戳转时间 + /// + /// + /// + public static DateTime TimeStampToDateTime(long timestamp) + { + //DateTime date = TimeZone.CurrentTimeZone.ToLocalTime(new System.DateTime(1970, 1, 1)); // 当地时区 + var date = TimeZoneInfo.ConvertTimeFromUtc(new System.DateTime(1970, 1, 1), TimeZoneInfo.Local); + return date.AddMilliseconds(timestamp); + } + } +} diff --git a/src/Aix.ScheduleTask/Utils/JsonUtils.cs b/src/Aix.ScheduleTask/Utils/JsonUtils.cs new file mode 100644 index 0000000..4da8b39 --- /dev/null +++ b/src/Aix.ScheduleTask/Utils/JsonUtils.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Aix.ScheduleTask.Utils +{ + internal static class JsonUtils + { + public static string ToJson(object obj) + { + if (obj == null) return string.Empty; + if (obj is string || obj.GetType().IsValueType) + { + return obj.ToString(); + } + return System.Text.Json.JsonSerializer.Serialize(obj); + } + public static T FromJson(string str) + { + var result = FromJson(str, typeof(T)); + if (result == null) return default(T); + return (T)result; + } + + public static object FromJson(string str, Type type) + { + if (string.IsNullOrWhiteSpace(str) || type == null) + { + return null; + } + + if (type == typeof(string)) + { + return str; + } + if (type.IsValueType) + { + return Convert.ChangeType(str, type); + } + + return System.Text.Json.JsonSerializer.Deserialize(str, type); + } + + } +}