Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions generate_data.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
--SELECT pg_snapshot_xmin(pg_current_snapshot()); must return value > 1_000_000
INSERT INTO outbox_messages
(topic, "partition", transaction_id, "key", "type", payload, headers)
(topic, "partition", "key", "type", payload, headers)
SELECT
'topic-1',
n.v%2,
n.v::text::xid8,
n.v,
'type',
'{"byte250":"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"}',
Expand Down
12 changes: 2 additions & 10 deletions src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using Microsoft.Extensions.Options;
using Outbox.Configurations;
using Outbox.Entities;
using Outbox.WebApi.Linq2db;

namespace Outbox.WebApi.BackgroundServices;

Expand Down Expand Up @@ -92,14 +91,8 @@ private async Task<int> ProcessMessagesAsync(AppDbContext dbContext, Cancellatio
var outboxMessages = await dataConnection.GetTable<OutboxMessage>()
.Where(x => x.Topic == offset.Topic &&
x.Partition == offset.Partition &&
x.TransactionId >= offset.LastProcessedTransactionId &&
(
x.TransactionId > offset.LastProcessedTransactionId ||
x.TransactionId == offset.LastProcessedTransactionId && x.Id > offset.LastProcessedId
) &&
x.TransactionId < PostgreSqlExtensions.MinCurrentTransactionId
)
.OrderBy(x => x.TransactionId).ThenBy(x => x.Id)
x.Id > offset.LastProcessedId)
.OrderBy(x => x.Id)
.Take(_outboxOptions.Value.BatchSize)
.ToArrayAsyncLinqToDB(cancellationToken);

Expand All @@ -119,7 +112,6 @@ await dataConnection.GetTable<OutboxOffset>()
await dataConnection.GetTable<OutboxOffset>()
.Where(x => x.Id == offset.Id)
.Set(x => x.LastProcessedId, lastMessage.Id)
.Set(x => x.LastProcessedTransactionId, lastMessage.TransactionId)
.Set(x => x.AvailableAfter, DateTimeOffset.UtcNow)
.UpdateAsync(cancellationToken);

Expand Down
57 changes: 57 additions & 0 deletions src/Outbox.WebApi/EFCore/IOutboxMessageContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System.Data.Common;
using System.Text.Json;
using Npgsql;
using NpgsqlTypes;
using Outbox.Entities;

namespace Outbox.WebApi.EFCore;

public interface IOutboxMessageContext
{
void Add(OutboxMessage message);
Task SaveChangesAsync(DbTransaction transaction, CancellationToken ct);
}

public class OutboxMessageContext : IOutboxMessageContext
{
private readonly IOutboxMessagesProcessor _outboxMessagesProcessor;
private readonly List<OutboxMessage> _messages = new();

public OutboxMessageContext(IOutboxMessagesProcessor outboxMessagesProcessor) =>
_outboxMessagesProcessor = outboxMessagesProcessor;

public void Add(OutboxMessage message)
{
_messages.Add(message);
}

public async Task SaveChangesAsync(DbTransaction transaction, CancellationToken ct)
{
if (!_messages.Any()) return;

await using var batch = new NpgsqlBatch(transaction.Connection as NpgsqlConnection, transaction as NpgsqlTransaction);

var lockCommand = batch.CreateBatchCommand();
//LOCK TABLE outbox.outbox_messages IN ROW EXCLUSIVE MODE; --alternative to advisory lock
lockCommand.CommandText = "SELECT pg_advisory_xact_lock(554738281823524)";
batch.BatchCommands.Add(lockCommand);

foreach (var message in _messages)
{
var command = batch.CreateBatchCommand();
command.CommandText = "insert into outbox.outbox_messages(topic, partition, type, key, payload, headers) values (@topic, @partition, @type, @key, @payload, @headers)";
command.Parameters.AddWithValue("@topic", message.Topic);
command.Parameters.AddWithValue("@partition", message.Partition);
command.Parameters.AddWithValue("@type", message.Type);
command.Parameters.AddWithValue("@key", message.Key);
command.Parameters.AddWithValue("@payload", NpgsqlDbType.Jsonb, message.Payload);
command.Parameters.AddWithValue("@headers", NpgsqlDbType.Jsonb, JsonSerializer.Serialize(message.Headers));

batch.BatchCommands.Add(command);
}

await batch.ExecuteNonQueryAsync(ct);

_outboxMessagesProcessor.NewMessagesPersisted(_messages.First().Topic, _messages.First().Partition);
}
}
82 changes: 0 additions & 82 deletions src/Outbox.WebApi/EFCore/OutboxInterceptor.cs

This file was deleted.

3 changes: 0 additions & 3 deletions src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ public override MappingSchema CreateMappingSchema(IModel model, IMetadataReader?
{
var result = base.CreateMappingSchema(model, metadataReader, convertorSelector, dataOptions);

//default parameter type for ulong is decimal, it produces an error on mapping to xid8
result.SetConvertExpression<ulong, DataParameter>(value => new DataParameter(null, value, DataType.UInt64));

result.SetConverter<string, Dictionary<string, string>>(str => JsonSerializer.Deserialize<Dictionary<string, string>>(str) ?? new Dictionary<string, string>());
result.SetConverter<Dictionary<string, string>, string>(dict => JsonSerializer.Serialize(dict));

Expand Down
9 changes: 0 additions & 9 deletions src/Outbox.WebApi/Linq2db/PostgreSqlExtensions.cs

This file was deleted.

19 changes: 13 additions & 6 deletions src/Outbox.WebApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using EFCore.MigrationExtensions.PostgreSQL;
using LinqToDB.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Npgsql;
using Outbox;
using Outbox.Configurations;
Expand All @@ -26,18 +27,19 @@
dataSource,
options => options.MigrationsHistoryTable("_migrations", "outbox"))
.UseSnakeCaseNamingConvention()
.AddInterceptors(new ForUpdateInterceptor(), serviceProvider.GetRequiredService<OutboxInterceptor>());
.AddInterceptors(new ForUpdateInterceptor());

optionsBuilder.UseSqlObjects();
});

builder.Services.AddScoped<IOutboxMessageContext, OutboxMessageContext>();

LinqToDBForEFTools.Implementation = new OutboxLinqToDBForEFToolsImpl(builder.Configuration.GetConnectionString("Outbox")!);
LinqToDBForEFTools.Initialize();

builder.Services.AddKafkaClient();

builder.Services.Configure<OutboxConfiguration>(builder.Configuration.GetSection("Outbox"));
builder.Services.AddSingleton<OutboxInterceptor>();

builder.Services.AddSingleton<OutboxBackgroundService>();
builder.Services.AddHostedService(sp => sp.GetRequiredService<OutboxBackgroundService>());
Expand All @@ -57,7 +59,7 @@

app.UseHttpsRedirection();

app.MapPost("/messages", async (CreateMessageDto dto, AppDbContext dbContext, CancellationToken ct) =>
app.MapPost("/messages", async (CreateMessageDto dto, AppDbContext dbContext, IOutboxMessageContext outboxMessageContext, CancellationToken ct) =>
{
var activityContext = Activity.Current?.Context;
var message = new OutboxMessage
Expand All @@ -69,10 +71,15 @@
Payload = dto.Payload,
Headers = activityContext.GetHeaders()
};

dbContext.OutboxMessages.Add(message);
outboxMessageContext.Add(message);

var transaction = await dbContext.Database.BeginTransactionAsync(ct);

await dbContext.SaveChangesAsync(ct);
await dbContext.SaveChangesAsync(ct); //save business entities

await outboxMessageContext.SaveChangesAsync(transaction.GetDbTransaction(), ct);

await transaction.CommitAsync(ct);
})
.WithName("CreateMessage");

Expand Down
1 change: 0 additions & 1 deletion src/Outbox/Entities/OutboxMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
public class OutboxMessage
{
public int Id { get; set; }
public ulong TransactionId { get; set; }
public string Topic { get; set; } = null!;
public int Partition { get; set; }
public string? Key { get; set; }
Expand Down
1 change: 0 additions & 1 deletion src/Outbox/Entities/OutboxOffset.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ public class OutboxOffset
public string Topic { get; set; } = null!;
public int Partition { get; set; }
public int LastProcessedId { get; set; }
public ulong LastProcessedTransactionId { get; set; }
public DateTimeOffset AvailableAfter { get; set; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ public class OutboxMessageEntityTypeConfiguration : IEntityTypeConfiguration<Out
public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.HasKey(x => x.Id);

builder.Property(x => x.TransactionId).HasDefaultValueSql("pg_current_xact_id()").HasColumnType("xid8");

builder.Property(x => x.Payload).HasColumnType("jsonb");
builder.Property(x => x.Headers).HasColumnType("jsonb");
builder.Property(x => x.Key).HasMaxLength(128);
builder.Property(x => x.Type).HasMaxLength(128);
builder.Property(x => x.Topic).HasMaxLength(128);
builder.Property(x => x.CreatedAt).HasDefaultValueSql("now()");

builder.HasIndex(x => new {x.Topic, x.Partition, x.TransactionId, x.Id});
builder.HasIndex(x => new {x.Topic, x.Partition, x.Id});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ public class OutboxOffsetEntityTypeConfiguration : IEntityTypeConfiguration<Outb
public void Configure(EntityTypeBuilder<OutboxOffset> builder)
{
builder.Property(x => x.Topic).HasMaxLength(128);
builder.Property(e => e.LastProcessedTransactionId).HasColumnType("xid8").HasDefaultValueSql("'0'::xid8");
builder.Property(x => x.AvailableAfter).HasDefaultValueSql("now()");

builder.HasIndex(x => new {x.Topic, x.Partition}).IsUnique();
Expand All @@ -20,15 +19,13 @@ public void Configure(EntityTypeBuilder<OutboxOffset> builder)
Topic = "topic-1",
Partition = 0,
LastProcessedId = 0,
LastProcessedTransactionId = 0,
AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00")
}, new OutboxOffset
{
Id = 3,
Topic = "topic-1",
Partition = 1,
LastProcessedId = 0,
LastProcessedTransactionId = 0,
AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00")
});
}
Expand Down
Loading