Skip to content

Commit

Permalink
fix(Solution): Fixed solutiin packages
Browse files Browse the repository at this point in the history
fix(Broker): Fixed the SubscriptionHandler by retrying observing upon initialization, thus enabling delayed subscriptions
  • Loading branch information
cdavernas committed Apr 15, 2024
1 parent b986bd0 commit 8a3ecac
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="8.0.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.20.1" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.9.5" />
<PackageReference Include="Neuroglia.Plugins" Version="4.9.5" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.9.5" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.9.6" />
<PackageReference Include="Neuroglia.Plugins" Version="4.9.6" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.9.6" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="6.5.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="6.5.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="FluentValidation" Version="11.9.0" />
<PackageReference Include="Grpc.Core.Api" Version="2.62.0" />
<PackageReference Include="Neuroglia.Data.Expressions.Abstractions" Version="4.9.5" />
<PackageReference Include="Neuroglia.Data.Expressions.Abstractions" Version="4.9.6" />
<PackageReference Include="Polly" Version="8.3.1" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,20 @@ protected virtual async Task InitializeCloudEventStreamAsync()
this.StreamOffset = 0;
}
if (offset >= 0 && (ulong)offset == this.StreamOffset) offset = -1;
this.CloudEventStream = await this.EventStore.ObserveAsync(offset, this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false);
while (true)
{
try
{
this.CloudEventStream = await this.EventStore.ObserveAsync(offset, this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false);
break;
}
catch (StreamNotFoundException)
{
var delay = 5000;
this.Logger.LogDebug("Failed to observe the cloud event stream because the first cloud event is yet to be published. Retrying in {delay} milliseconds...", delay);
await Task.Delay(delay).ConfigureAwait(false);
}
}
}
else
{
Expand All @@ -241,8 +254,20 @@ protected virtual async Task InitializeCloudEventStreamAsync()
this.StreamOffset = 0;
}
if (offset >= 0 && (ulong)offset == this.StreamOffset) offset = -1;
this.CloudEventStream = await this.EventStore.ObservePartitionAsync(this.Subscription.Spec.Partition, offset, this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false);

while (true)
{
try
{
this.CloudEventStream = await this.EventStore.ObservePartitionAsync(this.Subscription.Spec.Partition, offset, this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false);
break;
}
catch (StreamNotFoundException)
{
var delay = 5000;
this.Logger.LogDebug("Failed to observe the cloud event stream because the first cloud event is yet to be published. Retrying in {delay} milliseconds...", delay);
await Task.Delay(delay).ConfigureAwait(false);
}
}
}
this._Subscription = this.CloudEventStream.ToAsyncEnumerable().WhereAwait(this.FiltersAsync).ToObservable().SubscribeAsync(this.OnCloudEventAsync, onErrorAsync: this.OnSubscriptionErrorAsync, null);
if (offset != StreamPosition.EndOfStream && (ulong)offset < this.StreamOffset) _ = this.CatchUpAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -441,9 +466,9 @@ protected virtual async Task RetryDispatchAsync(CloudEvent e, ulong offset, bool

AsyncPolicy retryPolicy = policyConfiguration.MaxAttempts.HasValue ?
Policy.Handle(exceptionPredicate)
.WaitAndRetryAsync(policyConfiguration.MaxAttempts.Value, policyConfiguration.BackoffDuration.ForAttempt)
.WaitAndRetryAsync(policyConfiguration.MaxAttempts.Value, attempt => policyConfiguration.BackoffDuration == null ? TimeSpan.FromSeconds(3) : policyConfiguration.BackoffDuration.ForAttempt(attempt))
: Policy.Handle(exceptionPredicate)
.WaitAndRetryForeverAsync(policyConfiguration.BackoffDuration.ForAttempt);
.WaitAndRetryForeverAsync(attempt => policyConfiguration.BackoffDuration == null ? TimeSpan.FromSeconds(3) : policyConfiguration.BackoffDuration.ForAttempt(attempt));

retryPolicy = circuitBreakerPolicy == null ? retryPolicy : retryPolicy.WrapAsync(circuitBreakerPolicy);
await retryPolicy.ExecuteAsync(async _ => await this.DispatchAsync(e, offset, false, catchUpWhenAvailable), this.CancellationToken).ConfigureAwait(false);
Expand Down
8 changes: 4 additions & 4 deletions src/core/CloudStreams.Core.Api/CloudStreams.Core.Api.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
<PackageReference Include="FluentValidation.DependencyInjectionExtensions" Version="11.9.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="8.0.4" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.20.1" />
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.9.5" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.9.5" />
<PackageReference Include="Neuroglia.Plugins" Version="4.9.5" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.9.5" />
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.9.6" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.9.6" />
<PackageReference Include="Neuroglia.Plugins" Version="4.9.6" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.9.6" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="6.5.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="6.5.0" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.8.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="8.0.4" />
<PackageReference Include="Neuroglia.Data.Infrastructure.EventSourcing" Version="4.9.5" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Abstractions" Version="4.9.5" />
<PackageReference Include="Neuroglia.Data.Infrastructure.EventSourcing" Version="4.9.6" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Abstractions" Version="4.9.6" />
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
</ItemGroup>

Expand Down
6 changes: 3 additions & 3 deletions src/core/CloudStreams.Core/CloudStreams.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Neuroglia.Data.Infrastructure.EventSourcing.Abstractions" Version="4.9.5" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.9.5" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.9.5" />
<PackageReference Include="Neuroglia.Data.Infrastructure.EventSourcing.Abstractions" Version="4.9.6" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.9.6" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.9.6" />
</ItemGroup>

</Project>
5 changes: 4 additions & 1 deletion src/core/CloudStreams.Core/Resources/RetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ namespace CloudStreams.Core.Resources;
public record RetryPolicy
{

static readonly RetryBackoffDuration DefaultRetryBackoffDuration = RetryBackoffDuration.Constant(Duration.FromSeconds(3));
const int DefaultMaxAttempts = 5;

/// <summary>
/// Initializes a new <see cref="RetryPolicy"/>
/// </summary>
Expand All @@ -32,7 +35,7 @@ public RetryPolicy() { }
/// <param name="maxAttempts"></param>
public RetryPolicy(RetryBackoffDuration backoffDuration, int? maxAttempts = null)
{
this.BackoffDuration = backoffDuration ?? throw new ArgumentNullException(nameof(backoffDuration));
this.BackoffDuration = backoffDuration ?? DefaultRetryBackoffDuration;
this.MaxAttempts = maxAttempts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.20.1" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.9.5" />
<PackageReference Include="Neuroglia.Plugins" Version="4.9.5" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.9.5" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.9.6" />
<PackageReference Include="Neuroglia.Plugins" Version="4.9.6" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.9.6" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="6.5.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="6.5.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}
catch (StreamNotFoundException)
{
var delay = 3000;
this.Logger.LogWarning("Failed to observe the cloud event stream because the first cloud event is yet to be published. Retrying in {delay} milliseconds...", delay);
var delay = 5000;
this.Logger.LogDebug("Failed to observe the cloud event stream because the first cloud event is yet to be published. Retrying in {delay} milliseconds...", delay);
await Task.Delay(delay, stoppingToken).ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

<ItemGroup>
<PackageReference Include="FluentValidation" Version="11.9.0" />
<PackageReference Include="Neuroglia.Data.Expressions.Abstractions" Version="4.9.5" />
<PackageReference Include="Neuroglia.Data.Expressions.Abstractions" Version="4.9.6" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 8a3ecac

Please sign in to comment.