Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions generate_data.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
--SELECT pg_snapshot_xmin(pg_current_snapshot()); must return value > 1_000_000
INSERT INTO outbox_messages
(topic, "key", "type", payload, headers)
(topic, "partition", transaction_id, "key", "type", payload, headers)
SELECT
'topic-' || n.v%2,
'topic-1',
n.v%2,
n.v::text::xid8,
n.v,
'type',
'{"byte250":"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"}',
Expand Down
77 changes: 63 additions & 14 deletions src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.Extensions.Options;
using Outbox.Configurations;
using Outbox.Entities;
using Outbox.WebApi.Linq2db;

namespace Outbox.WebApi.BackgroundServices;

Expand All @@ -17,6 +18,8 @@ public class OutboxBackgroundService : BackgroundService, IOutboxMessagesProcess
private readonly ILogger<OutboxBackgroundService> _logger;

private readonly AutoResetEvent _autoResetEvent = new(false);

private (string Topic, int Partition)? _offsetWithMessages;

public OutboxBackgroundService(
IServiceProvider serviceProvider,
Expand All @@ -39,7 +42,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

var processedMessages = await ProcessMessagesAsync(dbContext, stoppingToken);

if (processedMessages != _outboxOptions.Value.BatchSize)
if (processedMessages == -1) //no partition
await WaitForOutboxMessage(stoppingToken);
}
catch (Exception e)
Expand All @@ -51,32 +54,74 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

private async Task<int> ProcessMessagesAsync(AppDbContext dbContext, CancellationToken cancellationToken)
{
var outboxMessages = await dbContext.OutboxMessages
.Where(x => DateTimeOffset.UtcNow > x.AvailableAfter)
.AsNoTracking()
.ToLinqToDB()
.OrderBy(x => x.Id)
.Take(_outboxOptions.Value.BatchSize)
dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;

var dataConnection = dbContext.CreateLinqToDBConnection();

IQueryable<OutboxOffset> offsetsQuery = dataConnection.GetTable<OutboxOffset>();
if (_offsetWithMessages != null)
{
offsetsQuery = offsetsQuery.Where(x => x.Topic == _offsetWithMessages.Value.Topic &&
x.Partition == _offsetWithMessages.Value.Partition);

_offsetWithMessages = null;
}
else
{
offsetsQuery = offsetsQuery.Where(x => DateTimeOffset.UtcNow > x.AvailableAfter)
.OrderBy(x => x.AvailableAfter); //earliest available
}

var offsets = await offsetsQuery
.Take(1)
.SubQueryHint(PostgreSQLHints.ForUpdate)
.SubQueryHint(PostgreSQLHints.SkipLocked)
.AsSubQuery()
.UpdateWithOutput(x => x,
x => new OutboxMessage
x => new OutboxOffset
{
AvailableAfter = DateTimeOffset.UtcNow + _outboxOptions.Value.LockedDelay
},
(_, _, inserted) => inserted)
.AsQueryable()
.ToArrayAsyncLinqToDB(cancellationToken);

if (!offsets.Any()) return -1; //no partition
var offset = offsets.Single();

var outboxMessages = await dataConnection.GetTable<OutboxMessage>()
.Where(x => x.Topic == offset.Topic &&
x.Partition == offset.Partition &&
x.TransactionId >= offset.LastProcessedTransactionId &&
(
x.TransactionId > offset.LastProcessedTransactionId ||
x.TransactionId == offset.LastProcessedTransactionId && x.Id > offset.LastProcessedId
) &&
x.TransactionId < PostgreSqlExtensions.MinCurrentTransactionId
)
.OrderBy(x => x.TransactionId).ThenBy(x => x.Id)
.Take(_outboxOptions.Value.BatchSize)
.ToArrayAsyncLinqToDB(cancellationToken);

if (!outboxMessages.Any()) return 0;
if (!outboxMessages.Any())
{
await dataConnection.GetTable<OutboxOffset>()
.Where(x => x.Id == offset.Id)
.Set(x => x.AvailableAfter, DateTimeOffset.UtcNow + _outboxOptions.Value.NoMessagesDelay)
.UpdateAsync(cancellationToken);

return 0;
}

await ProcessOutboxMessagesAsync(outboxMessages, cancellationToken);

var messageIds = outboxMessages.Select(x => x.Id).ToArray();
await dbContext.OutboxMessages
.Where(x => messageIds.Contains(x.Id))
.ExecuteDeleteAsync(cancellationToken);
var lastMessage = outboxMessages.Last();
await dataConnection.GetTable<OutboxOffset>()
.Where(x => x.Id == offset.Id)
.Set(x => x.LastProcessedId, lastMessage.Id)
.Set(x => x.LastProcessedTransactionId, lastMessage.TransactionId)
.Set(x => x.AvailableAfter, DateTimeOffset.UtcNow)
.UpdateAsync(cancellationToken);

return outboxMessages.Length;
}
Expand Down Expand Up @@ -107,7 +152,11 @@ private Task ProcessMessageAsync<TKey>(TKey key, OutboxMessage message, Cancella
}


public void NewMessagesPersisted() => _autoResetEvent.Set();
public void NewMessagesPersisted(string topic, int partition)
{
_offsetWithMessages = (topic, partition);
_autoResetEvent.Set();
}

private async ValueTask WaitForOutboxMessage(CancellationToken stoppingToken)
{
Expand Down
8 changes: 4 additions & 4 deletions src/Outbox.WebApi/EFCore/OutboxInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ public class OutboxInterceptor : ISaveChangesInterceptor
public OutboxInterceptor(IOutboxMessagesProcessor outboxMessagesProcessor) =>
_outboxMessagesProcessor = outboxMessagesProcessor;

private readonly ConcurrentDictionary<DbContext, string> _contextsWithOutboxMessages = new();
private readonly ConcurrentDictionary<DbContext, (string Topic, int Partition)> _contextsWithOutboxMessages = new();

private void OnSavingChanges(DbContext context)
{
var message = context.ChangeTracker.Entries<OutboxMessage>()
.FirstOrDefault(x => x.State == EntityState.Added);
if (message != null) _contextsWithOutboxMessages.TryAdd(context, message.Entity.Topic);
if (message != null) _contextsWithOutboxMessages.TryAdd(context, (message.Entity.Topic, message.Entity.Partition));
}

private void OnSaveChanges(DbContext context)
{
if (_contextsWithOutboxMessages.ContainsKey(context))
{
if (_contextsWithOutboxMessages.TryRemove(context, out _))
if (_contextsWithOutboxMessages.TryRemove(context, out var item))
{
_outboxMessagesProcessor.NewMessagesPersisted();
_outboxMessagesProcessor.NewMessagesPersisted(item.Topic, item.Partition);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Text.Json;
using LinqToDB;
using LinqToDB.Data;
using LinqToDB.EntityFrameworkCore;
using LinqToDB.Mapping;
using LinqToDB.Metadata;
Expand All @@ -25,6 +26,9 @@ public override MappingSchema CreateMappingSchema(IModel model, IMetadataReader?
{
var result = base.CreateMappingSchema(model, metadataReader, convertorSelector, dataOptions);

//default parameter type for ulong is decimal, it produces an error on mapping to xid8
result.SetConvertExpression<ulong, DataParameter>(value => new DataParameter(null, value, DataType.UInt64));

result.SetConverter<string, Dictionary<string, string>>(str => JsonSerializer.Deserialize<Dictionary<string, string>>(str) ?? new Dictionary<string, string>());
result.SetConverter<Dictionary<string, string>, string>(dict => JsonSerializer.Serialize(dict));

Expand Down
9 changes: 9 additions & 0 deletions src/Outbox.WebApi/Linq2db/PostgreSqlExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using LinqToDB;

namespace Outbox.WebApi.Linq2db;

public static class PostgreSqlExtensions
{
[Sql.Expression(ProviderName.PostgreSQL, "pg_snapshot_xmin(pg_current_snapshot())", ServerSideOnly = true)]
public static ulong MinCurrentTransactionId => throw new NotImplementedException();
}
1 change: 1 addition & 0 deletions src/Outbox.WebApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
var message = new OutboxMessage
{
Topic = dto.Topic,
Partition = 0, //hash key % partitions count, ...
Type = dto.Type,
Key = dto.Key,
Payload = dto.Payload,
Expand Down
3 changes: 2 additions & 1 deletion src/Outbox/Entities/OutboxMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
public class OutboxMessage
{
public int Id { get; set; }
public ulong TransactionId { get; set; }
public string Topic { get; set; } = null!;
public int Partition { get; set; }
public string? Key { get; set; }
public string Type { get; set; } = null!; //metadata
public string Payload { get; set; } = null!;
public Dictionary<string, string> Headers { get; set; } = null!;
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset AvailableAfter { get; set; }
}
5 changes: 5 additions & 0 deletions src/Outbox/Entities/OutboxOffset.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,10 @@ namespace Outbox.Entities;
public class OutboxOffset
{
public int Id { get; set; }
public string Topic { get; set; } = null!;
public int Partition { get; set; }
public int LastProcessedId { get; set; }
public ulong LastProcessedTransactionId { get; set; }
public DateTimeOffset AvailableAfter { get; set; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.HasKey(x => x.Id);

builder.Property(x => x.TransactionId).HasDefaultValueSql("pg_current_xact_id()").HasColumnType("xid8");
builder.Property(x => x.Payload).HasColumnType("jsonb");
builder.Property(x => x.Headers).HasColumnType("jsonb");
builder.Property(x => x.Key).HasMaxLength(128);
builder.Property(x => x.Type).HasMaxLength(128);
builder.Property(x => x.Topic).HasMaxLength(128);
builder.Property(x => x.CreatedAt).HasDefaultValueSql("now()");
builder.Property(x => x.AvailableAfter).HasDefaultValueSql("now()");

builder.HasIndex(x => x.AvailableAfter);
builder.HasIndex(x => new {x.Topic, x.Partition, x.TransactionId, x.Id});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,28 @@ public class OutboxOffsetEntityTypeConfiguration : IEntityTypeConfiguration<Outb
{
public void Configure(EntityTypeBuilder<OutboxOffset> builder)
{
builder.HasData(new OutboxOffset {Id = 1, LastProcessedId = 0});
builder.Property(x => x.Topic).HasMaxLength(128);
builder.Property(e => e.LastProcessedTransactionId).HasColumnType("xid8").HasDefaultValueSql("'0'::xid8");
builder.Property(x => x.AvailableAfter).HasDefaultValueSql("now()");

builder.HasIndex(x => new {x.Topic, x.Partition}).IsUnique();

builder.HasData(new OutboxOffset
{
Id = 2,
Topic = "topic-1",
Partition = 0,
LastProcessedId = 0,
LastProcessedTransactionId = 0,
AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00")
}, new OutboxOffset
{
Id = 3,
Topic = "topic-1",
Partition = 1,
LastProcessedId = 0,
LastProcessedTransactionId = 0,
AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00")
});
}
}
2 changes: 1 addition & 1 deletion src/Outbox/IOutboxMessagesProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ namespace Outbox;

public interface IOutboxMessagesProcessor
{
void NewMessagesPersisted();
void NewMessagesPersisted(string topic, int partition);
}
Loading