Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Samples/ASP.NET/ASP.NET.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Build.Containers" Version="0.3.2" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" />
</ItemGroup>

Expand Down
31 changes: 0 additions & 31 deletions Samples/ASP.NET/Dockerfile

This file was deleted.

1 change: 0 additions & 1 deletion Source/Aggregates/Actors/AggregateActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ async Task OnStarted(IContext context)
try
{
var (tenantId, eventSourceId) = GetIdentifiers(context);

_eventSourceId = eventSourceId;
var serviceProvider = await _getServiceProvider(tenantId);
_aggregateWrapper = ActivatorUtilities.CreateInstance<AggregateWrapper<TAggregate>>(serviceProvider, _eventSourceId);
Expand Down
12 changes: 8 additions & 4 deletions Source/Aggregates/Internal/AggregateWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Dolittle.SDK.Events.Builders;
using Dolittle.SDK.Events.Store;
using Dolittle.SDK.Events.Store.Builders;
using Dolittle.SDK.Tenancy;
using Microsoft.Extensions.Logging;
#pragma warning disable CS0618 // Refers to EventSourceId which is marked obsolete for clients. Should still be used internally

Expand All @@ -23,6 +24,7 @@ namespace Dolittle.SDK.Aggregates.Internal;
class AggregateWrapper<TAggregate> where TAggregate : AggregateRoot
{
readonly EventSourceId _eventSourceId;
readonly TenantId _tenant;
readonly IEventStore _eventStore;
readonly IEventTypes _eventTypes;
readonly IServiceProvider _serviceProvider;
Expand All @@ -34,13 +36,15 @@ class AggregateWrapper<TAggregate> where TAggregate : AggregateRoot
/// Initializes a new instance of the <see cref="AggregateWrapper{TAggregate}"/> class.
/// </summary>
/// <param name="eventSourceId">The <see cref="EventSourceId"/> of the aggregate root instance.</param>
/// <param name="tenant">The tenant id.</param>
/// <param name="eventStore">The <see cref="IEventStore" /> used for committing the <see cref="UncommittedAggregateEvents" /> when actions are performed on the <typeparamref name="TAggregate">aggregate</typeparamref>. </param>
/// <param name="eventTypes">The <see cref="IEventTypes"/>.</param>
/// <param name="serviceProvider">The tenant scoped <see cref="IServiceProvider"/>.</param>
/// <param name="logger">The <see cref="ILogger" />.</param>
public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEventTypes eventTypes, IServiceProvider serviceProvider, ILogger<AggregateWrapper<TAggregate>> logger)
public AggregateWrapper(EventSourceId eventSourceId, TenantId tenant, IEventStore eventStore, IEventTypes eventTypes, IServiceProvider serviceProvider, ILogger<AggregateWrapper<TAggregate>> logger)
{
_eventSourceId = eventSourceId;
_tenant = tenant;
_eventTypes = eventTypes;
_eventStore = eventStore;
_serviceProvider = serviceProvider;
Expand All @@ -57,7 +61,7 @@ public async Task Perform(Func<TAggregate, Task> method, CancellationToken cance
_instance = await GetHydratedAggregate(cancellationToken);
var aggregateRootId = _instance.AggregateRootId;
activity?.Tag(aggregateRootId);
_logger.PerformingOn(typeof(TAggregate), aggregateRootId, _instance.EventSourceId);
_logger.PerformingOn(typeof(TAggregate), aggregateRootId, _instance.EventSourceId, _tenant);
await method(_instance);
if (_instance.AppliedEvents.Any())
{
Expand Down Expand Up @@ -97,7 +101,7 @@ bool TryGetAggregateRoot(out TAggregate aggregateRoot, out Exception exception)
Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, CancellationToken cancellationToken)
{
var eventSourceId = aggregateRoot.EventSourceId;
_logger.RehydratingAggregateRoot(typeof(TAggregate), aggregateRootId, eventSourceId);
_logger.RehydratingAggregateRoot(typeof(TAggregate), aggregateRootId, eventSourceId, _tenant);
var eventTypesToFetch = GetEventTypes(_eventTypes);
var committedEventsBatches = _eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken);
return aggregateRoot.RehydrateInternal(committedEventsBatches, AggregateRootMetadata<TAggregate>.MethodsPerEventType, cancellationToken);
Expand All @@ -117,7 +121,7 @@ static IEnumerable<EventType> GetEventTypes(IEventTypes eventTypes)

Task<CommittedAggregateEvents> CommitAppliedEvents(TAggregate aggregateRoot, AggregateRootId aggregateRootId)
{
_logger.CommittingEvents(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.AppliedEvents.Count(), aggregateRoot.EventSourceId);
_logger.CommittingEvents(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.AppliedEvents.Count(), aggregateRoot.EventSourceId, _tenant);
return _eventStore
.ForAggregate(aggregateRootId)
.WithEventSource(aggregateRoot.EventSourceId)
Expand Down
13 changes: 7 additions & 6 deletions Source/Aggregates/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using Dolittle.SDK.Events;
using Dolittle.SDK.Tenancy;
using Microsoft.Extensions.Logging;

namespace Dolittle.SDK.Aggregates;
Expand All @@ -13,11 +14,11 @@ namespace Dolittle.SDK.Aggregates;
static partial class Log
{
[LoggerMessage(0, LogLevel.Debug,
"Performing operation on {AggregateRoot} with aggregate root id {AggregateRootId} applying events to event source {EventSource}")]
internal static partial void PerformingOn(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource);
"Performing operation on {AggregateRoot} with aggregate root id {AggregateRootId} applying events to event source {EventSource} for tenant {Tenant}")]
internal static partial void PerformingOn(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource, TenantId tenant);

[LoggerMessage(0, LogLevel.Debug, "Rehydrating {AggregateRoot} with aggregate root id {AggregateRootId} with event source id {EventSource}")]
internal static partial void RehydratingAggregateRoot(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource);
[LoggerMessage(0, LogLevel.Debug, "Rehydrating {AggregateRoot} with aggregate root id {AggregateRootId} with event source id {EventSource} for tenant {Tenant}")]
internal static partial void RehydratingAggregateRoot(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, EventSourceId eventSource, TenantId tenant);

[LoggerMessage(0, LogLevel.Trace, "Re-applying {NumberOfEvents} events")]
internal static partial void ReApplying(this ILogger logger, int numberOfEvents);
Expand All @@ -26,9 +27,9 @@ static partial class Log
internal static partial void NoEventsToReApply(this ILogger logger);

[LoggerMessage(0, LogLevel.Debug,
"{AggregateRoot} with aggregate root id {AggregateRootId} is committing {NumberOfEvents} events to event source {EventSource}")]
"{AggregateRoot} with aggregate root id {AggregateRootId} is committing {NumberOfEvents} events to event source {EventSource} for tenant {Tenant}")]
internal static partial void CommittingEvents(this ILogger logger, Type aggregateRoot, AggregateRootId aggregateRootId, int numberOfEvents,
EventSourceId eventSource);
EventSourceId eventSource, TenantId tenant);

[LoggerMessage(0, LogLevel.Error, "{AggregateRoot} failed to instantiate")]
internal static partial void FailedToCreate(this ILogger logger, Exception e, Type aggregateRoot);
Expand Down
3 changes: 2 additions & 1 deletion Source/Events/Store/Internal/AggregateEventCommitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public async Task<CommittedAggregateEvents> CommitForAggregate(UncommittedAggreg
_logger.CommittingAggregateEvents(uncommittedAggregateEvents.Count,
uncommittedAggregateEvents.AggregateRoot,
uncommittedAggregateEvents.EventSource,
uncommittedAggregateEvents.ExpectedAggregateRootVersion);
uncommittedAggregateEvents.ExpectedAggregateRootVersion,
_executionContext.Tenant);

if (!_toProtobuf.TryConvert(uncommittedAggregateEvents, out var protobufEvents, out var error))
{
Expand Down
2 changes: 1 addition & 1 deletion Source/Events/Store/Internal/EventCommitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task<CommittedEvents> Commit(UncommittedEvents uncommittedEvents, C

try
{
_logger.CommittingEvents(uncommittedEvents.Count);
_logger.CommittingEvents(uncommittedEvents.Count, _executionContext.Tenant);

if (!_toProtobuf.TryConvert(uncommittedEvents, out var protobufEvents, out var error))
{
Expand Down
8 changes: 4 additions & 4 deletions Source/Events/Store/Internal/EventsForAggregateFetcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Task<CommittedAggregateEvents> FetchForAggregate(
EventSourceId eventSourceId,
CancellationToken cancellationToken = default)
{
_logger.FetchingAllEventsForAggregate(aggregateRootId, eventSourceId);
_logger.FetchingAllEventsForAggregate(aggregateRootId, eventSourceId, _executionContext.Tenant);
var request = CreateFetchRequestBase(aggregateRootId, eventSourceId);
request.FetchAllEvents = new FetchAllEventsForAggregateInBatchesRequest();
return AggregateBatches(eventSourceId, aggregateRootId, DoFetchForAggregate(request, cancellationToken));
Expand All @@ -70,7 +70,7 @@ public Task<CommittedAggregateEvents> FetchForAggregate(
IEnumerable<EventType> eventTypes,
CancellationToken cancellationToken = default)
{
_logger.FetchingEventsForAggregate(aggregateRootId, eventSourceId, eventTypes);
_logger.FetchingEventsForAggregate(aggregateRootId, eventSourceId, eventTypes, _executionContext.Tenant);
var request = CreateFetchRequestBase(aggregateRootId, eventSourceId);
request.FetchEvents = new FetchEventsForAggregateInBatchesRequest
{
Expand All @@ -82,7 +82,7 @@ public Task<CommittedAggregateEvents> FetchForAggregate(
/// <inheritdoc />
public IAsyncEnumerable<CommittedAggregateEvents> FetchStreamForAggregate(AggregateRootId aggregateRootId, EventSourceId eventSourceId, CancellationToken cancellationToken = default)
{
_logger.FetchingAllEventsForAggregate(aggregateRootId, eventSourceId);
_logger.FetchingAllEventsForAggregate(aggregateRootId, eventSourceId, _executionContext.Tenant);
var request = CreateFetchRequestBase(aggregateRootId, eventSourceId);
request.FetchAllEvents = new FetchAllEventsForAggregateInBatchesRequest();
return DoFetchForAggregate(request, cancellationToken);
Expand All @@ -91,7 +91,7 @@ public IAsyncEnumerable<CommittedAggregateEvents> FetchStreamForAggregate(Aggreg
/// <inheritdoc />
public IAsyncEnumerable<CommittedAggregateEvents> FetchStreamForAggregate(AggregateRootId aggregateRootId, EventSourceId eventSourceId, IEnumerable<EventType> eventTypes, CancellationToken cancellationToken = default)
{
_logger.FetchingEventsForAggregate(aggregateRootId, eventSourceId, eventTypes);
_logger.FetchingEventsForAggregate(aggregateRootId, eventSourceId, eventTypes, _executionContext.Tenant);
var request = CreateFetchRequestBase(aggregateRootId, eventSourceId);
request.FetchEvents = new FetchEventsForAggregateInBatchesRequest
{
Expand Down
17 changes: 9 additions & 8 deletions Source/Events/Store/Internal/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using Dolittle.SDK.Tenancy;
using Microsoft.Extensions.Logging;

namespace Dolittle.SDK.Events.Store.Internal;
Expand All @@ -12,11 +13,11 @@ namespace Dolittle.SDK.Events.Store.Internal;
/// </summary>
static partial class Log
{
[LoggerMessage(0, LogLevel.Debug, "Committing {NumberOfEvents} events for aggregate root type {AggregateRoot} and id {EventSource} with expected version {ExpectedVersion}")]
internal static partial void CommittingAggregateEvents(this ILogger logger, int numberOfEvents, AggregateRootId aggregateRoot, EventSourceId eventSource, AggregateRootVersion expectedVersion);
[LoggerMessage(0, LogLevel.Debug, "Committing {NumberOfEvents} events for aggregate root type {AggregateRoot} and id {EventSource} with expected version {ExpectedVersion} to tenant {Tenant}")]
internal static partial void CommittingAggregateEvents(this ILogger logger, int numberOfEvents, AggregateRootId aggregateRoot, EventSourceId eventSource, AggregateRootVersion expectedVersion, TenantId tenant);

[LoggerMessage(0, LogLevel.Debug, "Committing {NumberOfEvents} events")]
internal static partial void CommittingEvents(this ILogger logger, int numberOfEvents);
[LoggerMessage(0, LogLevel.Debug, "Committing {NumberOfEvents} events to tenant {Tenant}")]
internal static partial void CommittingEvents(this ILogger logger, int numberOfEvents, TenantId tenant);

[LoggerMessage(0, LogLevel.Error, "Could not convert UncommittedAggregateEvents to Protobuf.")]
internal static partial void UncommittedAggregateEventsCouldNotBeConverted(this ILogger logger, Exception ex);
Expand All @@ -30,11 +31,11 @@ static partial class Log
[LoggerMessage(0, LogLevel.Error, "The Runtime acknowledges that the events have been committed, but the returned CommittedEvents could not be converted.")]
internal static partial void CommittedEventsCouldNotBeConverted(this ILogger logger, Exception ex);

[LoggerMessage(0, LogLevel.Debug, "Fetching all events for aggregate root {AggregateRoot} and event source {EventSource}")]
internal static partial void FetchingAllEventsForAggregate(this ILogger logger, AggregateRootId aggregateRoot, EventSourceId eventSource);
[LoggerMessage(0, LogLevel.Debug, "Fetching all events from tenant {Tenant} for aggregate root {AggregateRoot} and event source {EventSource}")]
internal static partial void FetchingAllEventsForAggregate(this ILogger logger, AggregateRootId aggregateRoot, EventSourceId eventSource, TenantId tenant);

[LoggerMessage(0, LogLevel.Debug, "Fetching events for aggregate root {AggregateRoot} and event source {EventSource} that is of one of the following event types: [{EventTypes}]")]
internal static partial void FetchingEventsForAggregate(this ILogger logger, AggregateRootId aggregateRoot, EventSourceId eventSource, IEnumerable<EventType> eventTypes);
[LoggerMessage(0, LogLevel.Debug, "Fetching events from tenant {Tenant} for aggregate root {AggregateRoot} and event source {EventSource} that is of one of the following event types: [{EventTypes}")]
internal static partial void FetchingEventsForAggregate(this ILogger logger, AggregateRootId aggregateRoot, EventSourceId eventSource, IEnumerable<EventType> eventTypes, TenantId tenant);

[LoggerMessage(0, LogLevel.Error, "Could not convert CommittedAggregateEvents to SDK.")]
internal static partial void FetchedEventsForAggregateCouldNotBeConverted(this ILogger logger, Exception ex);
Expand Down
35 changes: 16 additions & 19 deletions Source/SDK/DolittleClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,32 +224,29 @@ public async Task<IDolittleClient> Connect(DolittleClientConfiguration configura
}

AddDefaultsFromServiceProviderInConfiguration(configuration);

var loggerFactory = configuration.LoggerFactory;
if (loggerFactory is not null)
{
_buildResults.WriteTo(loggerFactory.CreateLogger<DolittleClient>());
}

var loggerFactory = configuration.LoggerFactory ?? LoggerFactory.Create(_ => _.AddConsole());
_buildResults.WriteTo(loggerFactory.CreateLogger<DolittleClient>());
_grpcChannel = GrpcChannel.ForAddress(
$"http://{configuration.RuntimeHost}:{configuration.RuntimePort}",
new GrpcChannelOptions
{
Credentials = ChannelCredentials.Insecure,
MaxReceiveMessageSize = 32 * 1024 * 1024,
MaxSendMessageSize = 32 * 1024 * 1024,
$"http://{configuration.RuntimeHost}:{configuration.RuntimePort}",
new GrpcChannelOptions
{
Credentials = ChannelCredentials.Insecure,
MaxReceiveMessageSize = 32 * 1024 * 1024,
MaxSendMessageSize = 32 * 1024 * 1024,
#if NET5_0_OR_GREATER
HttpHandler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = true
}
HttpHandler = new SocketsHttpHandler
{
EnableMultipleHttp2Connections = true
}
#endif
});
});
var methodCaller = new MethodCaller(_grpcChannel, configuration.RuntimeHost, configuration.RuntimePort);
var (executionContext, tenants, otlpEndpoint) =
await ConnectToRuntime(methodCaller, configuration, loggerFactory, cancellationToken).ConfigureAwait(false);
Tenants = tenants;

#pragma warning disable CA1848
loggerFactory.CreateLogger<DolittleClient>().LogDebug("Configured tenants:\n{TenantListString}", string.Join('\n', tenants.Select(_ => $"\t{_.Id}")));
#pragma warning restore CA1848
await CreateDependencies(methodCaller, configuration.EventSerializerProvider, loggerFactory, executionContext, tenants).ConfigureAwait(false);
ConfigureContainer(configuration);
await RegisterAllUnregistered(methodCaller, configuration.PingInterval, executionContext, loggerFactory).ConfigureAwait(false);
Expand Down