Skip to content

Commit

Permalink
fix: Fix/129/quartz jobs miss fired problem (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
fernando-a-marins authored Jul 12, 2023
1 parent 20b242d commit f8fe911
Show file tree
Hide file tree
Showing 12 changed files with 248 additions and 244 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ jobs:
- name: Start Kafka
uses: 280780363/kafka-action@v1.0
with:
kafka version: "latest" # Optional, kafka version
zookeeper version: "latest" # Optional, zookeeper version
kafka version: "3.4.0-debian-11-r15" # Optional, kafka version
zookeeper version: "3.8.1-debian-11-r18" # Optional, zookeeper version
kafka port: 9092 # Optional, kafka port. Connect using localhost:9092
zookeeper port: 2181 # Optional, zookeeper port
auto create topic: "true" # Optional, auto create kafka topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,4 @@ private void SetupServices(HostBuilderContext context, IServiceCollection servic
services.AddSingleton<RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert>();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.1" />
<PackageReference Include="Moq" Version="4.18.4" />
<PackageReference Include="OpenCover" Version="4.7.1221" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
namespace KafkaFlow.Retry.IntegrationTests.PollingTests
{
using System.Collections.Generic;
using global::KafkaFlow.Retry.Durable.Definitions.Polling;
using global::KafkaFlow.Retry.Durable.Polling;
using Quartz;

internal class JobDataProviderSurrogate : IJobDataProvider
{
public JobDataProviderSurrogate(string schedulerId, PollingDefinition pollingDefinition, ITrigger trigger, List<IJobExecutionContext> jobExecutionContexts)
{
this.PollingDefinition = pollingDefinition;

this.Trigger = trigger;
this.TriggerName = this.GetTriggerName(schedulerId);

this.JobExecutionContexts = jobExecutionContexts;
this.JobDetail = this.CreateJobDetail();
}

public IJobDetail JobDetail { get; }

public List<IJobExecutionContext> JobExecutionContexts { get; }

public PollingDefinition PollingDefinition { get; }

public ITrigger Trigger { get; }

public string TriggerName { get; }

private IJobDetail CreateJobDetail()
{
var dataMap = new JobDataMap { { "JobExecution", this.JobExecutionContexts } };

return JobBuilder
.Create<JobSurrogate>()
.SetJobData(dataMap)
.Build();
}

private string GetTriggerName(string schedulerId)
{
return $"pollingJobTrigger_{schedulerId}_{this.PollingDefinition.PollingJobType}";
}
}
}
17 changes: 17 additions & 0 deletions src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobSurrogate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace KafkaFlow.Retry.IntegrationTests.PollingTests
{
using System.Collections.Generic;
using System.Threading.Tasks;
using Quartz;

internal class JobSurrogate : IJob
{
public Task Execute(IJobExecutionContext context)
{
var jobExecutionContexts = context.JobDetail.JobDataMap["JobExecution"] as List<IJobExecutionContext>;
jobExecutionContexts.Add(context);

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
namespace KafkaFlow.Retry.IntegrationTests.PollingTests
{
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using global::KafkaFlow.Retry.Durable.Definitions.Polling;
using global::KafkaFlow.Retry.Durable.Polling;
using Moq;
using Quartz;
using Xunit;

public class QueueTrackerCoordinatorTests
{
private readonly Mock<IJobDataProvidersFactory> mockJobDataProvidersFactory;
private readonly ITriggerProvider triggerProvider;

public QueueTrackerCoordinatorTests()
{
this.triggerProvider = new TriggerProvider();

this.mockJobDataProvidersFactory = new Mock<IJobDataProvidersFactory>();
}

[Fact]
public async Task QueueTrackerCoordinator_ForceMisfireJob_SuccessWithCorrectScheduledFiredTimes()
{
// arrange
var schedulerId = "MisfiredJobsDoesNothing";
var jobExecutionContexts = new List<IJobExecutionContext>();

var waitForScheduleInSeconds = 5;
var jobActiveTimeInSeconds = 8;
var pollingInSeconds = 2;

var cronExpression = $"*/{pollingInSeconds} * * ? * * *";

var retryDurableJobDataProvider = this.CreateRetryDurableJobDataProvider(schedulerId, cronExpression, jobExecutionContexts);

this.mockJobDataProvidersFactory
.Setup(m => m.Create(It.IsAny<IMessageProducer>(), It.IsAny<ILogHandler>()))
.Returns(new[] { retryDurableJobDataProvider });

var queueTrackerCoordinator = this.CreateQueueTrackerCoordinator(schedulerId);

// act

Thread.Sleep(waitForScheduleInSeconds * 1000);

await queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of<IMessageProducer>(), Mock.Of<ILogHandler>());

Thread.Sleep(jobActiveTimeInSeconds * 1000);

await queueTrackerCoordinator.UnscheduleJobsAsync();

// assert
var scheduledFiredTimes = jobExecutionContexts
.Where(ctx => ctx.ScheduledFireTimeUtc.HasValue)
.Select(ctx => ctx.ScheduledFireTimeUtc.Value)
.OrderBy(x => x)
.ToList();

var currentScheduledFiredTime = scheduledFiredTimes.First();
var otherScheduledFiredTimes = scheduledFiredTimes.Skip(1).ToList();

foreach (var scheduledFiredTime in otherScheduledFiredTimes)
{
currentScheduledFiredTime.AddSeconds(pollingInSeconds).Should().Be(scheduledFiredTime);

currentScheduledFiredTime = scheduledFiredTime;
}
}

[Fact]
public async Task QueueTrackerCoordinator_ScheduleAndUnscheduleDifferentJobs_Success()
{
// arrange
var schedulerId = "twoJobsSchedulerId";
var jobExecutionContexts = new List<IJobExecutionContext>();

var timePollingActiveInSeconds = 4;

var retryDurableCronExpression = "*/2 * * ? * * *";
var cleanupCronExpression = "*/4 * * ? * * *";

var retryDurableMinExpectedJobsFired = 2;
var retryDurableMaxExpectedJobsFired = 3;
var cleanupMinExpectedJobsFired = 1;
var cleanupMaxExpectedJobsFired = 2;

var retryDurableJobDataProvider = this.CreateRetryDurableJobDataProvider(schedulerId, retryDurableCronExpression, jobExecutionContexts);
var cleanupJobDataProvider = this.CreateCleanupJobDataProvider(schedulerId, cleanupCronExpression, jobExecutionContexts);

this.mockJobDataProvidersFactory
.Setup(m => m.Create(It.IsAny<IMessageProducer>(), It.IsAny<ILogHandler>()))
.Returns(new[] { retryDurableJobDataProvider, cleanupJobDataProvider });

var queueTrackerCoordinator = this.CreateQueueTrackerCoordinator(schedulerId);

// act
await queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of<IMessageProducer>(), Mock.Of<ILogHandler>());

Thread.Sleep(timePollingActiveInSeconds * 1000);

await queueTrackerCoordinator.UnscheduleJobsAsync();

// assert
jobExecutionContexts.Where(ctx => !ctx.PreviousFireTimeUtc.HasValue).Should().HaveCount(2);

var retryDurableFiresContexts = jobExecutionContexts.Where(ctx => ctx.Trigger.Key.Name == retryDurableJobDataProvider.TriggerName);
var cleanupFiresContexts = jobExecutionContexts.Where(ctx => ctx.Trigger.Key.Name == cleanupJobDataProvider.TriggerName);

retryDurableFiresContexts
.Should()
.HaveCountGreaterThanOrEqualTo(retryDurableMinExpectedJobsFired)
.And
.HaveCountLessThanOrEqualTo(retryDurableMaxExpectedJobsFired);

retryDurableFiresContexts.Should().ContainSingle(ctx => !ctx.PreviousFireTimeUtc.HasValue);

cleanupFiresContexts
.Should()
.HaveCountGreaterThanOrEqualTo(cleanupMinExpectedJobsFired)
.And
.HaveCountLessThanOrEqualTo(cleanupMaxExpectedJobsFired);

cleanupFiresContexts.Should().ContainSingle(ctx => !ctx.PreviousFireTimeUtc.HasValue);
}

private JobDataProviderSurrogate CreateCleanupJobDataProvider(string schedulerId, string cronExpression, List<IJobExecutionContext> jobExecutionContexts)
{
var cleanupPollingDefinition =
new CleanupPollingDefinition(
enabled: true,
cronExpression: cronExpression,
timeToLiveInDays: 1,
rowsPerRequest: 10
);

return this.CreateJobDataProvider(schedulerId, cleanupPollingDefinition, jobExecutionContexts);
}

private JobDataProviderSurrogate CreateJobDataProvider(string schedulerId, PollingDefinition pollingDefinition, List<IJobExecutionContext> jobExecutionContexts)
{
var trigger = this.triggerProvider.GetPollingTrigger(schedulerId, pollingDefinition);

return new JobDataProviderSurrogate(schedulerId, pollingDefinition, trigger, jobExecutionContexts);
}

private IQueueTrackerCoordinator CreateQueueTrackerCoordinator(string schedulerId)
{
var queueTrackerFactory = new QueueTrackerFactory(schedulerId, this.mockJobDataProvidersFactory.Object);

return new QueueTrackerCoordinator(queueTrackerFactory);
}

private JobDataProviderSurrogate CreateRetryDurableJobDataProvider(string schedulerId, string cronExpression, List<IJobExecutionContext> jobExecutionContexts)
{
var retryDurablePollingDefinition =
new RetryDurablePollingDefinition(
enabled: true,
cronExpression: cronExpression,
fetchSize: 100,
expirationIntervalFactor: 1
);

return this.CreateJobDataProvider(schedulerId, retryDurablePollingDefinition, jobExecutionContexts);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Xunit;

[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
[assembly: ExcludeFromCodeCoverage]
[assembly: ExcludeFromCodeCoverage]
[assembly: CollectionBehavior(DisableTestParallelization = true)]
2 changes: 1 addition & 1 deletion src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,4 @@ internal async Task RetryDurableTest(
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class QueueTrackerCoordinatorTests
{
private readonly Mock<IJobDataProvider> mockJobDataProvider;
private readonly Mock<IQueueTrackerFactory> mockQueueTrackerFactory;
private readonly QueueTrackerCoordinator queueTrackerCoordinator;
private readonly IQueueTrackerCoordinator queueTrackerCoordinator;

public QueueTrackerCoordinatorTests()
{
Expand Down Expand Up @@ -90,4 +90,4 @@ public async Task QueueTrackerCoordinator_UnscheduleJobs_Success()
this.mockJobDataProvider.Verify(m => m.Trigger, Times.Exactly(2));
}
}
}
}
Loading

0 comments on commit f8fe911

Please sign in to comment.