diff --git a/src/broker/CloudStreams.Broker.Application/Services/SubscriptionHandler.cs b/src/broker/CloudStreams.Broker.Application/Services/SubscriptionHandler.cs index 5380eda6..c78d5e2a 100644 --- a/src/broker/CloudStreams.Broker.Application/Services/SubscriptionHandler.cs +++ b/src/broker/CloudStreams.Broker.Application/Services/SubscriptionHandler.cs @@ -196,7 +196,7 @@ public virtual async Task InitializeAsync(CancellationToken cancellationToken) .DistinctUntilChanged() .Subscribe(policy => this.DefaultRetryPolicy = policy ?? new HttpClientRetryPolicy(), this.CancellationToken); await this.SetStatusPhaseAsync(SubscriptionStatusPhase.Active).ConfigureAwait(false); - await this.InitializeCloudEventStreamAsync(); + _ = this.InitializeCloudEventStreamAsync(); } /// diff --git a/src/core/CloudStreams.Core.Application/Commands/Resources/Generic/PatchResourceCommand.cs b/src/core/CloudStreams.Core.Application/Commands/Resources/Generic/PatchResourceCommand.cs index 1a756809..9a9929e4 100644 --- a/src/core/CloudStreams.Core.Application/Commands/Resources/Generic/PatchResourceCommand.cs +++ b/src/core/CloudStreams.Core.Application/Commands/Resources/Generic/PatchResourceCommand.cs @@ -11,6 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Json.Patch; +using Neuroglia.Data.Infrastructure.ResourceOriented; +using Neuroglia.Data.PatchModel.Services; +using Neuroglia.Data; +using System.Xml.Linq; +using Neuroglia.Serialization.Json; + namespace CloudStreams.Core.Application.Commands.Resources.Generic; /// @@ -32,10 +39,10 @@ public class PatchResourceCommand public PatchResourceCommand(string name, string? @namespace, Patch patch, bool dryRun) { if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name)); - Name = name; - Namespace = @namespace; - Patch = patch ?? throw new ArgumentNullException(nameof(patch)); - DryRun = dryRun; + this.Name = name; + this.Namespace = @namespace; + this.Patch = patch ?? throw new ArgumentNullException(nameof(patch)); + this.DryRun = dryRun; } /// diff --git a/src/core/CloudStreams.Core.Application/Commands/Resources/PatchResourceCommand.cs b/src/core/CloudStreams.Core.Application/Commands/Resources/PatchResourceCommand.cs index 7648823f..9a6ccb17 100644 --- a/src/core/CloudStreams.Core.Application/Commands/Resources/PatchResourceCommand.cs +++ b/src/core/CloudStreams.Core.Application/Commands/Resources/PatchResourceCommand.cs @@ -41,13 +41,13 @@ public PatchResourceCommand(string group, string version, string plural, string if (string.IsNullOrWhiteSpace(version)) throw new ArgumentNullException(nameof(version)); if (string.IsNullOrWhiteSpace(plural)) throw new ArgumentNullException(nameof(plural)); if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name)); - Group = group; - Version = version; - Plural = plural; - Name = name; - Namespace = @namespace; - Patch = patch ?? throw new ArgumentNullException(nameof(patch)); - DryRun = dryRun; + this.Group = group; + this.Version = version; + this.Plural = plural; + this.Name = name; + this.Namespace = @namespace; + this.Patch = patch ?? throw new ArgumentNullException(nameof(patch)); + this.DryRun = dryRun; } /// diff --git a/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponent.cs b/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponent.cs index 24f03209..6c65062d 100644 --- a/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponent.cs +++ b/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponent.cs @@ -42,7 +42,7 @@ public abstract class ResourceManagementComponent /// /// The list of displayed s /// - protected List? resources; + protected EquatableList? resources; /// /// The used to show the 's details /// @@ -99,10 +99,9 @@ private void OnStateChanged(Action> patch /// Updates the /// /// - protected void OnResourceCollectionChanged(List? resources) + protected void OnResourceCollectionChanged(EquatableList? resources) { - if (resources == null) this.resources = null; - else this.resources = resources; + this.resources = resources; this.StateHasChanged(); } diff --git a/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponentState.cs b/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponentState.cs index 1ae6e354..21a3dfac 100644 --- a/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponentState.cs +++ b/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponentState.cs @@ -29,7 +29,7 @@ public record ResourceManagementComponentState /// /// Gets a that contains all cached s /// - public List? Resources { get; set; } + public EquatableList? Resources { get; set; } /// /// Gets/sets a boolean value that indicates whether data is currently being gathered diff --git a/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponentStore.cs b/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponentStore.cs index 5d1626d3..08213a50 100644 --- a/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponentStore.cs +++ b/src/dashboard/CloudStreams.Dashboard/Components/ResourceManagement/ResourceManagementComponentStore.cs @@ -31,7 +31,7 @@ public class ResourceManagementComponentStore(ICloudStreamsCoreApiCli { ResourceDefinition? definition; - List? resources; + EquatableList? resources; /// /// Gets an used to observe s of the specified type @@ -41,7 +41,7 @@ public class ResourceManagementComponentStore(ICloudStreamsCoreApiCli /// /// Gets an used to observe s of the specified type /// - public IObservable?> Resources => this.Select(s => s.Resources); + public IObservable?> Resources => this.Select(s => s.Resources); /// /// Gets an used to observe changes /// @@ -94,7 +94,7 @@ public virtual async Task ListResourcesAsync() { Loading = true }); - this.resources = await (await resourceManagementApi.Manage().ListAsync().ConfigureAwait(false)).ToListAsync().ConfigureAwait(false); + this.resources = new EquatableList(await (await resourceManagementApi.Manage().ListAsync().ConfigureAwait(false)).ToListAsync().ConfigureAwait(false)); this.Reduce(s => s with { Resources = this.resources, @@ -136,7 +136,7 @@ protected virtual Task OnResourceWatchEventAsync(IResourceWatchEvent case ResourceWatchEventType.Created: this.Reduce(state => { - List resources = state.Resources == null ? new() : new(state.Resources); + var resources = state.Resources == null ? [] : new EquatableList(state.Resources); resources.Add(e.Resource); return state with { @@ -151,7 +151,7 @@ protected virtual Task OnResourceWatchEventAsync(IResourceWatchEvent { return state; } - List resources = new(state.Resources); + var resources = new EquatableList(state.Resources); var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName()); if (resource == null) return state; var index = resources.IndexOf(resource); @@ -170,7 +170,7 @@ protected virtual Task OnResourceWatchEventAsync(IResourceWatchEvent { return state; } - List resources = new(state.Resources); + var resources = new EquatableList(state.Resources); var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName()); if (resource == null) return state; resources.Remove(resource); diff --git a/src/dashboard/CloudStreams.Dashboard/Pages/Subscriptions/List/View.razor b/src/dashboard/CloudStreams.Dashboard/Pages/Subscriptions/List/View.razor index fd62ec77..d5816d29 100644 --- a/src/dashboard/CloudStreams.Dashboard/Pages/Subscriptions/List/View.razor +++ b/src/dashboard/CloudStreams.Dashboard/Pages/Subscriptions/List/View.razor @@ -52,7 +52,7 @@ } @if(resource.Status?.Stream?.Fault != null) { - faulted + faulted } @resource.Spec.Subscriber.Uri @@ -89,8 +89,9 @@ @(resource.Spec?.Stream == null ? "-" : resource.Status == null || resource.Status?.Stream == null ? "" : resource.Status?.Stream?.AckedOffset) - - + + + @@ -110,10 +111,27 @@ @code { - Task OnResumeSubscription(Subscription subscription) + Task OnResumeSubscriptionAsync(Subscription subscription) { var patch = new Patch(PatchType.JsonPatch, new JsonPatch(PatchOperation.Remove(JsonPointer.Create(s => s.Status!.Stream!.Fault!).ToCamelCase()))); return this.ApiClient.Resources.Subscriptions.PatchStatusAsync(patch, subscription.GetName(), subscription.GetNamespace()); } + async Task OnRestartSubscriptionAsync(Subscription subscription) + { + var offset = subscription.Spec.Stream?.Offset; + if (offset == StreamPosition.StartOfStream) offset = StreamPosition.EndOfStream; + else offset = StreamPosition.StartOfStream; + var streamNode = this.Serializer.SerializeToNode(new CloudEventStream() { Offset = offset }); + var patch = new Patch(PatchType.JsonPatch, new JsonPatch(subscription.Spec.Stream == null + ? PatchOperation.Add(JsonPointer.Create(s => s.Spec.Stream!).ToCamelCase(), streamNode) + : PatchOperation.Replace(JsonPointer.Create(s => s.Spec.Stream!).ToCamelCase(), streamNode))); + subscription = await this.ApiClient.Resources.Subscriptions.PatchAsync(patch, subscription.GetName(), subscription.GetNamespace()); + if (subscription.Spec.Stream?.Offset == StreamPosition.EndOfStream) + { + patch = new Patch(PatchType.JsonPatch, new JsonPatch(PatchOperation.Replace(JsonPointer.Create(s => s.Spec.Stream!).ToCamelCase(), this.Serializer.SerializeToNode(new CloudEventStream() { Offset = StreamPosition.StartOfStream })))); + await this.ApiClient.Resources.Subscriptions.PatchAsync(patch, subscription.GetName(), subscription.GetNamespace()); + } + } + } \ No newline at end of file diff --git a/src/dashboard/CloudStreams.Dashboard/StatefulComponent.cs b/src/dashboard/CloudStreams.Dashboard/StatefulComponent.cs index 44ec1075..db5ec761 100644 --- a/src/dashboard/CloudStreams.Dashboard/StatefulComponent.cs +++ b/src/dashboard/CloudStreams.Dashboard/StatefulComponent.cs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using CloudStreams.Dashboard.StateManagement; using Microsoft.AspNetCore.Components; namespace CloudStreams.Dashboard;