Skip to content

Commit

Permalink
Merge pull request #57 from neuroglia-io/feat-restart-subscription
Browse files Browse the repository at this point in the history
Added a button to allow restarting a subscription
  • Loading branch information
cdavernas authored Apr 15, 2024
2 parents 808414f + f9a858e commit 2e14db5
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public virtual async Task InitializeAsync(CancellationToken cancellationToken)
.DistinctUntilChanged()
.Subscribe(policy => this.DefaultRetryPolicy = policy ?? new HttpClientRetryPolicy(), this.CancellationToken);
await this.SetStatusPhaseAsync(SubscriptionStatusPhase.Active).ConfigureAwait(false);
await this.InitializeCloudEventStreamAsync();
_ = this.InitializeCloudEventStreamAsync();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Json.Patch;
using Neuroglia.Data.Infrastructure.ResourceOriented;
using Neuroglia.Data.PatchModel.Services;
using Neuroglia.Data;
using System.Xml.Linq;
using Neuroglia.Serialization.Json;

namespace CloudStreams.Core.Application.Commands.Resources.Generic;

/// <summary>
Expand All @@ -32,10 +39,10 @@ public class PatchResourceCommand<TResource>
public PatchResourceCommand(string name, string? @namespace, Patch patch, bool dryRun)
{
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name));
Name = name;
Namespace = @namespace;
Patch = patch ?? throw new ArgumentNullException(nameof(patch));
DryRun = dryRun;
this.Name = name;
this.Namespace = @namespace;
this.Patch = patch ?? throw new ArgumentNullException(nameof(patch));
this.DryRun = dryRun;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ public PatchResourceCommand(string group, string version, string plural, string
if (string.IsNullOrWhiteSpace(version)) throw new ArgumentNullException(nameof(version));
if (string.IsNullOrWhiteSpace(plural)) throw new ArgumentNullException(nameof(plural));
if (string.IsNullOrWhiteSpace(name)) throw new ArgumentNullException(nameof(name));
Group = group;
Version = version;
Plural = plural;
Name = name;
Namespace = @namespace;
Patch = patch ?? throw new ArgumentNullException(nameof(patch));
DryRun = dryRun;
this.Group = group;
this.Version = version;
this.Plural = plural;
this.Name = name;
this.Namespace = @namespace;
this.Patch = patch ?? throw new ArgumentNullException(nameof(patch));
this.DryRun = dryRun;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class ResourceManagementComponent<TResource>
/// <summary>
/// The list of displayed <see cref="Resource"/>s
/// </summary>
protected List<TResource>? resources;
protected EquatableList<TResource>? resources;
/// <summary>
/// The <see cref="Offcanvas"/> used to show the <see cref="Resource"/>'s details
/// </summary>
Expand Down Expand Up @@ -99,10 +99,9 @@ private void OnStateChanged(Action<ResourceManagementComponent<TResource>> patch
/// Updates the <see cref="ResourceManagementComponent{TResource}.resources"/>
/// </summary>
/// <param name="resources"></param>
protected void OnResourceCollectionChanged(List<TResource>? resources)
protected void OnResourceCollectionChanged(EquatableList<TResource>? resources)
{
if (resources == null) this.resources = null;
else this.resources = resources;
this.resources = resources;
this.StateHasChanged();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public record ResourceManagementComponentState<TResource>
/// <summary>
/// Gets a <see cref="List{T}"/> that contains all cached <see cref="IResource"/>s
/// </summary>
public List<TResource>? Resources { get; set; }
public EquatableList<TResource>? Resources { get; set; }

/// <summary>
/// Gets/sets a boolean value that indicates whether data is currently being gathered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ResourceManagementComponentStore<TResource>(ICloudStreamsCoreApiCli
{

ResourceDefinition? definition;
List<TResource>? resources;
EquatableList<TResource>? resources;

/// <summary>
/// Gets an <see cref="IObservable{T}"/> used to observe <see cref="ResourceDefinition"/>s of the specified type
Expand All @@ -41,7 +41,7 @@ public class ResourceManagementComponentStore<TResource>(ICloudStreamsCoreApiCli
/// <summary>
/// Gets an <see cref="IObservable{T}"/> used to observe <see cref="IResource"/>s of the specified type
/// </summary>
public IObservable<List<TResource>?> Resources => this.Select(s => s.Resources);
public IObservable<EquatableList<TResource>?> Resources => this.Select(s => s.Resources);
/// <summary>
/// Gets an <see cref="IObservable{T}"/> used to observe <see cref="CloudEventListState.Loading"/> changes
/// </summary>
Expand Down Expand Up @@ -94,7 +94,7 @@ public virtual async Task ListResourcesAsync()
{
Loading = true
});
this.resources = await (await resourceManagementApi.Manage<TResource>().ListAsync().ConfigureAwait(false)).ToListAsync().ConfigureAwait(false);
this.resources = new EquatableList<TResource>(await (await resourceManagementApi.Manage<TResource>().ListAsync().ConfigureAwait(false)).ToListAsync().ConfigureAwait(false));
this.Reduce(s => s with
{
Resources = this.resources,
Expand Down Expand Up @@ -136,7 +136,7 @@ protected virtual Task OnResourceWatchEventAsync(IResourceWatchEvent<TResource>
case ResourceWatchEventType.Created:
this.Reduce(state =>
{
List<TResource> resources = state.Resources == null ? new() : new(state.Resources);
var resources = state.Resources == null ? [] : new EquatableList<TResource>(state.Resources);
resources.Add(e.Resource);
return state with
{
Expand All @@ -151,7 +151,7 @@ protected virtual Task OnResourceWatchEventAsync(IResourceWatchEvent<TResource>
{
return state;
}
List<TResource> resources = new(state.Resources);
var resources = new EquatableList<TResource>(state.Resources);
var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName());
if (resource == null) return state;
var index = resources.IndexOf(resource);
Expand All @@ -170,7 +170,7 @@ protected virtual Task OnResourceWatchEventAsync(IResourceWatchEvent<TResource>
{
return state;
}
List<TResource> resources = new(state.Resources);
var resources = new EquatableList<TResource>(state.Resources);
var resource = resources.FirstOrDefault(r => r.GetQualifiedName() == e.Resource.GetQualifiedName());
if (resource == null) return state;
resources.Remove(resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
}
@if(resource.Status?.Stream?.Fault != null)
{
<span class="badge text-dark bg-danger ms-1 cursor-pointer" title="@resource.Status.Stream.Fault.ToString()" @onclick="async e => await OnResumeSubscription(resource)" @onclick:preventDefault="true" @onclick:stopPropagation="true">faulted</span>
<span class="badge text-dark bg-danger ms-1 cursor-pointer" title="@resource.Status.Stream.Fault.ToString()" @onclick="async e => await OnResumeSubscriptionAsync(resource)" @onclick:preventDefault="true" @onclick:stopPropagation="true">faulted</span>
}
</td>
<td class="text-center"><a href="@resource.Spec.Subscriber.Uri" target="_blank">@resource.Spec.Subscriber.Uri</a></td>
Expand Down Expand Up @@ -89,8 +89,9 @@
</td>
<td class="text-center">@(resource.Spec?.Stream == null ? "-" : resource.Status == null || resource.Status?.Stream == null ? "" : resource.Status?.Stream?.AckedOffset)</td>
<td class="text-end">
<button class="btn btn-outline-primary btn-sm" title="Edit '@resource.Metadata.Name'" @onclick="async _ => await OnShowResourceEditorAsync(resource)" @onclick:preventDefault @onclick:stopPropagation><Icon Name="IconName.Pencil" /></button>
<button class="btn btn-outline-danger btn-sm" title="Delete '@resource.Metadata.Name'" @onclick="async _ => await OnDeleteResourceAsync(resource)" @onclick:preventDefault @onclick:stopPropagation><Icon Name="IconName.Trash" /></button>
<button class="btn btn-outline-primary btn-sm" title="Replay all events matching subscription '@resource.Metadata.Name', which implicitly enables streaming." @onclick="async _ => await OnRestartSubscriptionAsync(resource)" @onclick:preventDefault @onclick:stopPropagation><Icon Name="IconName.ArrowClockwise" /></button>
<button class="btn btn-outline-primary btn-sm" title="Edit the subscription '@resource.Metadata.Name'" @onclick="async _ => await OnShowResourceEditorAsync(resource)" @onclick:preventDefault @onclick:stopPropagation><Icon Name="IconName.Pencil" /></button>
<button class="btn btn-outline-danger btn-sm" title="Delete the subscription '@resource.Metadata.Name'" @onclick="async _ => await OnDeleteResourceAsync(resource)" @onclick:preventDefault @onclick:stopPropagation><Icon Name="IconName.Trash" /></button>
</td>
</tr>
</Virtualize>
Expand All @@ -110,10 +111,27 @@
@code
{

Task OnResumeSubscription(Subscription subscription)
Task OnResumeSubscriptionAsync(Subscription subscription)
{
var patch = new Patch(PatchType.JsonPatch, new JsonPatch(PatchOperation.Remove(JsonPointer.Create<Subscription>(s => s.Status!.Stream!.Fault!).ToCamelCase())));
return this.ApiClient.Resources.Subscriptions.PatchStatusAsync(patch, subscription.GetName(), subscription.GetNamespace());
}

async Task OnRestartSubscriptionAsync(Subscription subscription)
{
var offset = subscription.Spec.Stream?.Offset;
if (offset == StreamPosition.StartOfStream) offset = StreamPosition.EndOfStream;
else offset = StreamPosition.StartOfStream;
var streamNode = this.Serializer.SerializeToNode(new CloudEventStream() { Offset = offset });
var patch = new Patch(PatchType.JsonPatch, new JsonPatch(subscription.Spec.Stream == null
? PatchOperation.Add(JsonPointer.Create<Subscription>(s => s.Spec.Stream!).ToCamelCase(), streamNode)
: PatchOperation.Replace(JsonPointer.Create<Subscription>(s => s.Spec.Stream!).ToCamelCase(), streamNode)));
subscription = await this.ApiClient.Resources.Subscriptions.PatchAsync(patch, subscription.GetName(), subscription.GetNamespace());
if (subscription.Spec.Stream?.Offset == StreamPosition.EndOfStream)
{
patch = new Patch(PatchType.JsonPatch, new JsonPatch(PatchOperation.Replace(JsonPointer.Create<Subscription>(s => s.Spec.Stream!).ToCamelCase(), this.Serializer.SerializeToNode(new CloudEventStream() { Offset = StreamPosition.StartOfStream }))));
await this.ApiClient.Resources.Subscriptions.PatchAsync(patch, subscription.GetName(), subscription.GetNamespace());
}
}

}
1 change: 0 additions & 1 deletion src/dashboard/CloudStreams.Dashboard/StatefulComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using CloudStreams.Dashboard.StateManagement;
using Microsoft.AspNetCore.Components;

namespace CloudStreams.Dashboard;
Expand Down

0 comments on commit 2e14db5

Please sign in to comment.