Skip to content

Commit

Permalink
fix(consumer): remove stopping token in messagehandler
Browse files Browse the repository at this point in the history
  • Loading branch information
ArneD authored and jvandaal committed Dec 5, 2024
1 parent c32fd65 commit f94d486
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context) =>
{
_logger.LogInformation("Handling next message");

await commandHandlingProjector.ProjectAsync(commandHandler, message, stoppingToken).ConfigureAwait(false);
await backOfficeProjector.ProjectAsync(context, message, stoppingToken).ConfigureAwait(false);

//CancellationToken.None to prevent halfway consumption
await context.SaveChangesAsync(CancellationToken.None);

await HandleMessage(commandHandlingProjector, commandHandler, message, backOfficeProjector, context);
}, stoppingToken);
}
catch (Exception)
Expand All @@ -73,5 +66,17 @@ await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context)
throw;
}
}

private async Task HandleMessage(ConnectedProjector<CommandHandler> commandHandlingProjector, CommandHandler commandHandler, object message,
ConnectedProjector<ConsumerAddressContext> backOfficeProjector, ConsumerAddressContext context)
{
_logger.LogInformation("Handling next message");

await commandHandlingProjector.ProjectAsync(commandHandler, message, CancellationToken.None).ConfigureAwait(false);
await backOfficeProjector.ProjectAsync(context, message, CancellationToken.None).ConfigureAwait(false);

//CancellationToken.None to prevent halfway consumption
await context.SaveChangesAsync(CancellationToken.None);
}
}
}

0 comments on commit f94d486

Please sign in to comment.