Skip to content

Commit

Permalink
Merge branch 'main' into dont-display-push-notifications-for-new-data…
Browse files Browse the repository at this point in the history
…wallet-modifications
  • Loading branch information
mergify[bot] authored Dec 21, 2024
2 parents 5e3539b + 4695dfb commit 323a4be
Show file tree
Hide file tree
Showing 35 changed files with 354 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Feature: GET /SyncRuns/{id}/ExternalEvents
And an active Relationship r between i1 and i2
And i1 has sent a Message m to i2
And i1 has terminated r
And 2 second(s) have passed
And a sync run sr started by i2
When i2 sends a GET request to the /SyncRuns/sr.id/ExternalEvents endpoint
Then the response status code is 200 (OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ namespace Backbone.DatabaseMigrator;

public class DummyEventBus : IEventBus
{
public void Publish(DomainEvent @event)
public Task Publish(DomainEvent @event)
{
return Task.CompletedTask;
}

public void StartConsuming()
public Task StartConsuming()
{
return Task.CompletedTask;
}

public void Subscribe<T, TH>() where T : DomainEvent where TH : IDomainEventHandler<T>
public Task Subscribe<T, TH>() where T : DomainEvent where TH : IDomainEventHandler<T>
{
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,30 @@ public EventHandlerService(IEventBus eventBus, IEnumerable<AbstractModule> modul
_logger = logger;
}

public Task StartAsync(CancellationToken cancellationToken)
public async Task StartAsync(CancellationToken cancellationToken)
{
SubscribeToEvents();
StartConsuming();

return Task.CompletedTask;
await SubscribeToEvents();
await StartConsuming();
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

private void SubscribeToEvents()
private async Task SubscribeToEvents()
{
_logger.LogInformation("Subscribing to events...");
foreach (var module in _modules)
{
module.ConfigureEventBus(_eventBus);
await module.ConfigureEventBus(_eventBus);
}

_logger.LogInformation("Successfully subscribed to events.");
}

private void StartConsuming()
private async Task StartConsuming()
{
_eventBus.StartConsuming();
await _eventBus.StartConsuming();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ public async Task Anonymizes_the_identity_in_all_messages_instead_of_deleting_th
{
// Arrange
var identityToBeDeleted = await SeedDatabaseWithIdentityWithRipeDeletionProcess();
var peer = CreateRandomIdentityAddress();
var peer = await SeedDatabaseWithIdentity();
var relationship = await SeedDatabaseWithActiveRelationshipBetween(identityToBeDeleted, peer);

GetService<MessagesDbContext>();

var sentMessage = await SeedDatabaseWithMessage(from: identityToBeDeleted.Address, to: peer);
var receivedMessage = await SeedDatabaseWithMessage(from: peer, to: identityToBeDeleted.Address);
var sentMessage = await SeedDatabaseWithMessage(relationship, from: identityToBeDeleted, to: peer);
var receivedMessage = await SeedDatabaseWithMessage(relationship, from: peer, to: identityToBeDeleted);

// Act
await _host.StartAsync();
Expand All @@ -116,11 +115,9 @@ public async Task Deletes_relationships()
{
// Arrange
var identityToBeDeleted = await SeedDatabaseWithIdentityWithRipeDeletionProcess();
var peer = CreateRandomIdentityAddress();

GetService<RelationshipsDbContext>();
var peerOfIdentityToBeDeleted = await SeedDatabaseWithIdentity();

await SeedDatabaseWithActiveRelationshipBetween(peer, identityToBeDeleted.Address);
await SeedDatabaseWithActiveRelationshipBetween(identityToBeDeleted, peerOfIdentityToBeDeleted);

// Act
await _host.StartAsync();
Expand All @@ -138,8 +135,6 @@ public async Task Deletes_relationship_templates()
// Arrange
var identityToBeDeleted = await SeedDatabaseWithIdentityWithRipeDeletionProcess();

GetService<RelationshipsDbContext>();

await SeedDatabaseWithRelationshipTemplateOf(identityToBeDeleted.Address);

// Act
Expand All @@ -154,27 +149,29 @@ public async Task Deletes_relationship_templates()

#region Seeders

private async Task<Message> SeedDatabaseWithMessage(IdentityAddress from, IdentityAddress to)
private async Task<Message> SeedDatabaseWithMessage(Relationship relationship, Identity from, Identity to)
{
var dbContext = GetService<MessagesDbContext>();

var recipient = new RecipientInformation(to, RelationshipId.New(), []);
var message = new Message(from, DeviceId.New(), [], [], [recipient]);
var recipient = new RecipientInformation(to.Address, RelationshipId.Parse(relationship.Id.Value), []);
var message = new Message(from.Address, from.Devices.First().Id, [], [], [recipient]);

await dbContext.SaveEntity(message);

return message;
}

private async Task SeedDatabaseWithActiveRelationshipBetween(IdentityAddress from, IdentityAddress to)
private async Task<Relationship> SeedDatabaseWithActiveRelationshipBetween(Identity from, Identity to)
{
var dbContext = GetService<RelationshipsDbContext>();

var template = new RelationshipTemplate(from, DeviceId.New(), null, null, []);
var relationship = new Relationship(template, to, DeviceId.New(), [], []);
relationship.Accept(from, DeviceId.New(), []);
var template = new RelationshipTemplate(to.Address, to.Devices.First().Id, null, null, []);
var relationship = new Relationship(template, from.Address, from.Devices.First().Id, [], []);
relationship.Accept(to.Address, to.Devices.First().Id, []);

await dbContext.SaveEntity(relationship);

return relationship;
}

private async Task SeedDatabaseWithRelationshipTemplateOf(IdentityAddress owner)
Expand All @@ -190,10 +187,11 @@ private async Task<Identity> SeedDatabaseWithIdentityWithRipeDeletionProcess()
{
var dbContext = GetService<DevicesDbContext>();

var identity = new Identity("test", CreateRandomIdentityAddress(), [], TierId.Generate(), 1, CommunicationLanguage.DEFAULT_LANGUAGE);
var tier = await dbContext.Tiers.FirstAsync(t => t.Id != Tier.QUEUED_FOR_DELETION.Id);

var device = new Device(identity, CommunicationLanguage.DEFAULT_LANGUAGE);
identity.Devices.Add(device);
var identity = new Identity("test", CreateRandomIdentityAddress(), [], tier.Id, 1, CommunicationLanguage.DEFAULT_LANGUAGE);

var device = identity.Devices.First();

SystemTime.Set(SystemTime.UtcNow.AddMonths(-1));
identity.StartDeletionProcessAsOwner(device.Id);
Expand All @@ -204,6 +202,19 @@ private async Task<Identity> SeedDatabaseWithIdentityWithRipeDeletionProcess()
return identity;
}

private async Task<Identity> SeedDatabaseWithIdentity()
{
var dbContext = GetService<DevicesDbContext>();

var tier = await dbContext.Tiers.FirstAsync(t => t.Id != Tier.QUEUED_FOR_DELETION.Id);

var identity = new Identity("test", CreateRandomIdentityAddress(), [], tier.Id, 1, CommunicationLanguage.DEFAULT_LANGUAGE);

await dbContext.SaveEntity(identity);

return identity;
}

#endregion

private T GetService<T>() where T : notnull
Expand Down
6 changes: 4 additions & 2 deletions BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ public abstract class AbstractModule

public abstract void ConfigureServices(IServiceCollection services, IConfigurationSection configuration);

public abstract void ConfigureEventBus(IEventBus eventBus);
public abstract Task ConfigureEventBus(IEventBus eventBus);

public virtual void PostStartupValidation(IServiceProvider serviceProvider) { }
public virtual void PostStartupValidation(IServiceProvider serviceProvider)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ namespace Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventB

public interface IEventBus
{
void Publish(IEnumerable<DomainEvent> events)
async Task Publish(IEnumerable<DomainEvent> events)
{
foreach (var domainEvent in events)
{
Publish(domainEvent);
await Publish(domainEvent);
}
}

void Publish(DomainEvent @event);
void StartConsuming();
Task Publish(DomainEvent @event);
Task StartConsuming();

void Subscribe<T, TH>()
Task Subscribe<T, TH>()
where T : DomainEvent
where TH : IDomainEventHandler<T>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<PackageReference Include="Microsoft.Extensions.Logging" Version="9.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.0" />
<PackageReference Include="Polly" Version="8.5.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
<PackageReference Include="Serilog" Version="4.2.0" />
<PackageReference Include="System.Interactive.Async" Version="6.0.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public async ValueTask DisposeAsync()
await _processor.CloseAsync();
}

public async void Publish(DomainEvent @event)
public async Task Publish(DomainEvent @event)
{
var eventName = @event.GetType().Name.Replace(DOMAIN_EVENT_SUFFIX, "");
var jsonMessage = JsonConvert.SerializeObject(@event, new JsonSerializerSettings
Expand All @@ -78,7 +78,7 @@ await _logger.TraceTime(async () =>
_logger.LogDebug("Successfully sent domain event with id '{MessageId}'.", message.MessageId);
}

public void Subscribe<T, TH>()
public async Task Subscribe<T, TH>()
where T : DomainEvent
where TH : IDomainEventHandler<T>
{
Expand All @@ -90,12 +90,12 @@ public void Subscribe<T, TH>()
{
_logger.LogInformation("Trying to create subscription on Service Bus...");

_serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(TOPIC_NAME, _subscriptionName,
await _serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(TOPIC_NAME, _subscriptionName,
new CreateRuleOptions
{
Filter = new CorrelationRuleFilter { Subject = eventName },
Name = eventName
}).GetAwaiter().GetResult();
});

_logger.LogInformation("Successfully created subscription on Service Bus.");
}
Expand All @@ -109,9 +109,9 @@ public void Subscribe<T, TH>()
_subscriptionManager.AddSubscription<T, TH>();
}

public void StartConsuming()
public async Task StartConsuming()
{
RegisterSubscriptionClientMessageHandlerAsync().GetAwaiter().GetResult();
await RegisterSubscriptionClientMessageHandlerAsync();
}

private async Task RegisterSubscriptionClientMessageHandlerAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async ValueTask DisposeAsync()
await _connection.DisposeAsync();
}

public async void Publish(DomainEvent @event)
public async Task Publish(DomainEvent @event)
{
var eventName = @event.GetType().Name.Replace(DOMAIN_EVENT_SUFFIX, "");

Expand All @@ -77,7 +77,7 @@ public async void Publish(DomainEvent @event)
_logger.EventWasNotProcessed(messageId);
}

public void Subscribe<T, TH>()
public Task Subscribe<T, TH>()
where T : DomainEvent
where TH : IDomainEventHandler<T>
{
Expand All @@ -86,11 +86,13 @@ public void Subscribe<T, TH>()
_logger.LogInformation("Subscribing to event '{EventName}' with {EventHandler}", eventName, typeof(TH).Name);

_subscriptionManager.AddSubscription<T, TH>();

return Task.CompletedTask;
}

public void StartConsuming()
public async Task StartConsuming()
{
_connection.SubscriberClient.StartAsync(OnIncomingEvent);
await _connection.SubscriberClient.StartAsync(OnIncomingEvent);
}

private static string RemoveDomainEventSuffix(string typeName)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
using System.Collections.Concurrent;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventBus;
using Backbone.BuildingBlocks.Domain.Events;

namespace Backbone.BuildingBlocks.Infrastructure.EventBus;

public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
{
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;

public InMemoryEventBusSubscriptionsManager()
{
_handlers = new Dictionary<string, List<SubscriptionInfo>>();
}
private readonly ConcurrentDictionary<string, List<SubscriptionInfo>> _handlers = [];

public void Clear()
{
Expand Down Expand Up @@ -45,7 +41,7 @@ public string GetEventKey<T>()
return GetEventKey(typeof(T));
}

public static string GetEventKey(Type eventType)
private static string GetEventKey(Type eventType)
{
return eventType.Name;
}
Expand All @@ -54,7 +50,7 @@ private void DoAddSubscription(Type handlerType, Type eventType)
{
var eventName = GetEventKey(eventType);

if (!HasSubscriptionsForEvent(eventName)) _handlers.Add(eventName, []);
_handlers.TryAdd(eventName, []);

if (_handlers[eventName].Any(s => s.HandlerType == handlerType))
throw new ArgumentException(
Expand Down
Loading

0 comments on commit 323a4be

Please sign in to comment.