Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions src/Outbox.WebApi/BackgroundServices/OutboxBackgroundService.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System.Text;
using Confluent.Kafka;
using LinqToDB;
using LinqToDB.DataProvider.PostgreSQL;
using LinqToDB.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Outbox.Configurations;
using Outbox.Entities;
using Outbox.Extensions;

namespace Outbox.WebApi.BackgroundServices;

Expand Down Expand Up @@ -49,14 +51,23 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

private async Task<int> ProcessMessagesAsync(AppDbContext dbContext, CancellationToken cancellationToken)
{
var transaction = await dbContext.Database.BeginTransactionAsync(cancellationToken);

var outboxMessages = await dbContext.OutboxMessages
.Where(x => DateTimeOffset.UtcNow > x.AvailableAfter)
.AsNoTracking()
.ToLinqToDB()
.OrderBy(x => x.Id)
.Take(_outboxOptions.Value.BatchSize)
.ForUpdateSkipLocked()
.ToArrayAsync(cancellationToken);
.SubQueryHint(PostgreSQLHints.ForUpdate)
.SubQueryHint(PostgreSQLHints.SkipLocked)
.AsSubQuery()
.UpdateWithOutput(x => x,
x => new OutboxMessage
{
AvailableAfter = DateTimeOffset.UtcNow + _outboxOptions.Value.LockedDelay
},
(_, _, inserted) => inserted)
.AsQueryable()
.ToArrayAsyncLinqToDB(cancellationToken);

if (!outboxMessages.Any()) return 0;

Expand All @@ -67,8 +78,6 @@ await dbContext.OutboxMessages
.Where(x => messageIds.Contains(x.Id))
.ExecuteDeleteAsync(cancellationToken);

await transaction.CommitAsync(cancellationToken);

return outboxMessages.Length;
}

Expand Down
33 changes: 33 additions & 0 deletions src/Outbox.WebApi/Linq2db/OutboxLinqToDBForEFToolsImpl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.Text.Json;
using LinqToDB;
using LinqToDB.EntityFrameworkCore;
using LinqToDB.Mapping;
using LinqToDB.Metadata;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;

namespace Outbox.WebApi.Linq2db;

public class OutboxLinqToDBForEFToolsImpl : LinqToDBForEFToolsImplDefault
{
private readonly string _connectionsString;

public OutboxLinqToDBForEFToolsImpl(string connectionsString) => _connectionsString = connectionsString;

public override EFConnectionInfo ExtractConnectionInfo(IDbContextOptions? options) =>
new()
{
ConnectionString = _connectionsString,
};

public override MappingSchema CreateMappingSchema(IModel model, IMetadataReader? metadataReader, IValueConverterSelector? convertorSelector, DataOptions dataOptions)
{
var result = base.CreateMappingSchema(model, metadataReader, convertorSelector, dataOptions);

result.SetConverter<string, Dictionary<string, string>>(str => JsonSerializer.Deserialize<Dictionary<string, string>>(str) ?? new Dictionary<string, string>());
result.SetConverter<Dictionary<string, string>, string>(dict => JsonSerializer.Serialize(dict));

return result;
}
}
1 change: 1 addition & 0 deletions src/Outbox.WebApi/Outbox.WebApi.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<ItemGroup>
<PackageReference Include="Confluent.Kafka.DependencyInjection" Version="3.2.0" />
<PackageReference Include="EFCore.MigrationExtensions.PostgreSQL" Version="9.0.1" />
<PackageReference Include="linq2db.EntityFrameworkCore" Version="9.1.0-preview.4" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.2"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.3">
<PrivateAssets>all</PrivateAssets>
Expand Down
5 changes: 5 additions & 0 deletions src/Outbox.WebApi/Program.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
using System.Diagnostics;
using Confluent.Kafka;
using EFCore.MigrationExtensions.PostgreSQL;
using LinqToDB.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using Npgsql;
using Outbox;
using Outbox.Configurations;
using Outbox.Entities;
using Outbox.WebApi.BackgroundServices;
using Outbox.WebApi.EFCore;
using Outbox.WebApi.Linq2db;
using Outbox.WebApi.Telemetry;

var builder = WebApplication.CreateBuilder(args);
Expand All @@ -29,6 +31,9 @@
optionsBuilder.UseSqlObjects();
});

LinqToDBForEFTools.Implementation = new OutboxLinqToDBForEFToolsImpl(builder.Configuration.GetConnectionString("Outbox")!);
LinqToDBForEFTools.Initialize();

builder.Services.AddKafkaClient();

builder.Services.Configure<OutboxConfiguration>(builder.Configuration.GetSection("Outbox"));
Expand Down
1 change: 1 addition & 0 deletions src/Outbox.WebApi/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"Outbox": "Host=localhost;Port=5432;Database=outbox;Username=postgres;Password=postgres;"
},
"Outbox": {
"LockedDelay": "00:00:15",
"NoMessagesDelay": "00:00:10",
"BatchSize": 100
},
Expand Down
1 change: 1 addition & 0 deletions src/Outbox/Configurations/OutboxConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace Outbox.Configurations;

public class OutboxConfiguration
{
public TimeSpan LockedDelay { get; set; }
public TimeSpan NoMessagesDelay { get; set; }
public int BatchSize { get; set; }
}
1 change: 1 addition & 0 deletions src/Outbox/Entities/OutboxMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ public class OutboxMessage
public string Payload { get; set; } = null!;
public Dictionary<string, string> Headers { get; set; } = null!;
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset AvailableAfter { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ public void Configure(EntityTypeBuilder<OutboxMessage> builder)
builder.Property(x => x.Type).HasMaxLength(128);
builder.Property(x => x.Topic).HasMaxLength(128);
builder.Property(x => x.CreatedAt).HasDefaultValueSql("now()");
builder.Property(x => x.AvailableAfter).HasDefaultValueSql("now()");

builder.HasIndex(x => x.AvailableAfter);
}
}
115 changes: 115 additions & 0 deletions src/Outbox/Migrations/20250518111014_AddAvailableAfter.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions src/Outbox/Migrations/20250518111014_AddAvailableAfter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace Outbox.Migrations
{
/// <inheritdoc />
public partial class AddAvailableAfter : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AddColumn<DateTimeOffset>(
name: "available_after",
schema: "outbox",
table: "outbox_messages",
type: "timestamp with time zone",
nullable: false,
defaultValueSql: "now()");

migrationBuilder.CreateIndex(
name: "ix_outbox_messages_available_after",
schema: "outbox",
table: "outbox_messages",
column: "available_after");
}

/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropIndex(
name: "ix_outbox_messages_available_after",
schema: "outbox",
table: "outbox_messages");

migrationBuilder.DropColumn(
name: "available_after",
schema: "outbox",
table: "outbox_messages");
}
}
}
9 changes: 9 additions & 0 deletions src/Outbox/Migrations/AppDbContextModelSnapshot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ protected override void BuildModel(ModelBuilder modelBuilder)

NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));

b.Property<DateTimeOffset>("AvailableAfter")
.ValueGeneratedOnAdd()
.HasColumnType("timestamp with time zone")
.HasColumnName("available_after")
.HasDefaultValueSql("now()");

b.Property<DateTimeOffset>("CreatedAt")
.ValueGeneratedOnAdd()
.HasColumnType("timestamp with time zone")
Expand Down Expand Up @@ -69,6 +75,9 @@ protected override void BuildModel(ModelBuilder modelBuilder)
b.HasKey("Id")
.HasName("pk_outbox_messages");

b.HasIndex("AvailableAfter")
.HasDatabaseName("ix_outbox_messages_available_after");

b.ToTable("outbox_messages", "outbox");
});

Expand Down