Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private async Task<int> ProcessMessagesAsync(AppDbContext dbContext, Cancellatio
var outboxMessages = await dataConnection.GetTable<OutboxMessage>()
.Where(x => x.Topic == offset.Topic &&
x.Partition == offset.Partition &&
x.Id > offset.LastProcessedId)
x.Number > offset.LastProcessedNumber)
.OrderBy(x => x.Id)
.Take(_outboxOptions.Value.BatchSize)
.ToArrayAsyncLinqToDB(cancellationToken);
Expand All @@ -111,7 +111,7 @@ await dataConnection.GetTable<OutboxOffset>()
var lastMessage = outboxMessages.Last();
await dataConnection.GetTable<OutboxOffset>()
.Where(x => x.Id == offset.Id)
.Set(x => x.LastProcessedId, lastMessage.Id)
.Set(x => x.LastProcessedNumber, lastMessage.Number)
.Set(x => x.AvailableAfter, DateTimeOffset.UtcNow)
.UpdateAsync(cancellationToken);

Expand Down
7 changes: 7 additions & 0 deletions src/Outbox.WebApi/EFCore/DbDesignTimeServices.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
using EFCore.MigrationExtensions.PostgreSQL;

namespace Outbox.WebApi.EFCore;

public class DbDesignTimeServices : CustomNpgsqlDesignTimeServices
{
}
23 changes: 23 additions & 0 deletions src/Outbox.WebApi/EFCore/DesignTimeDbContextFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using EFCore.MigrationExtensions.PostgreSQL;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Design;

namespace Outbox.WebApi.EFCore;

public class DesignTimeDbContextFactory : IDesignTimeDbContextFactory<AppDbContext>
{
public AppDbContext CreateDbContext(string[] args)
{
Console.WriteLine("Using DesignTimeDbContextFactory");

var builder = new DbContextOptionsBuilder<AppDbContext>();

builder.UseSqlObjects();
builder.UseSnakeCaseNamingConvention();

builder.UseNpgsql("Host=localhost;Port=5432;Database=outbox;Username=postgres;Password=postgres;",
options => options.MigrationsHistoryTable("_migrations", "outbox"));

return new AppDbContext(builder.Options);
}
}
6 changes: 3 additions & 3 deletions src/Outbox.WebApi/EFCore/IOutboxMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public async Task SaveChangesAsync(DbTransaction transaction, CancellationToken
foreach (var message in _messages)
{
var command = batch.CreateBatchCommand();
command.CommandText = "insert into outbox.outbox_messages(topic, partition, type, key, payload, headers) values (@topic, @partition, @type, @key, @payload, @headers)";
command.Parameters.AddWithValue("@topic", message.Topic);
command.Parameters.AddWithValue("@partition", message.Partition);
command.CommandText = "call outbox.insert_outbox_message(@ptopic, @ppartition, @type, @key, @payload, @headers)";
command.Parameters.AddWithValue("@ptopic", message.Topic);
command.Parameters.AddWithValue("@ppartition", message.Partition);
command.Parameters.AddWithValue("@type", message.Type);
command.Parameters.AddWithValue("@key", message.Key);
command.Parameters.AddWithValue("@payload", NpgsqlDbType.Jsonb, message.Payload);
Expand Down
4 changes: 4 additions & 0 deletions src/Outbox/AppDbContext.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using EFCore.MigrationExtensions.SqlObjects;
using Microsoft.EntityFrameworkCore;
using Outbox.Entities;

Expand All @@ -11,11 +12,14 @@ public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)

public DbSet<OutboxMessage> OutboxMessages { get; set; }
public DbSet<OutboxOffset> OutboxOffsets { get; set; }
public DbSet<OutboxOffsetSequence> OutboxOffsetSequences { get; set; }

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.HasDefaultSchema("outbox");

modelBuilder.ApplyConfigurationsFromAssembly(GetType().Assembly);

modelBuilder.AddSqlObjects(folder: "Sql", assembly: GetType().Assembly);
}
}
1 change: 1 addition & 0 deletions src/Outbox/Entities/OutboxMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public class OutboxMessage
public int Id { get; set; }
public string Topic { get; set; } = null!;
public int Partition { get; set; }
public int Number { get; set; }
public string? Key { get; set; }
public string Type { get; set; } = null!; //metadata
public string Payload { get; set; } = null!;
Expand Down
2 changes: 1 addition & 1 deletion src/Outbox/Entities/OutboxOffset.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ public class OutboxOffset
public int Id { get; set; }
public string Topic { get; set; } = null!;
public int Partition { get; set; }
public int LastProcessedId { get; set; }
public int LastProcessedNumber { get; set; }
public DateTimeOffset AvailableAfter { get; set; }

}
9 changes: 9 additions & 0 deletions src/Outbox/Entities/OutboxOffsetSequence.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Outbox.Entities;

public class OutboxOffsetSequence
{
public int Id { get; set; }
public string Topic { get; set; } = null!;
public int Partition { get; set; }
public int Value { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ public void Configure(EntityTypeBuilder<OutboxMessage> builder)
builder.Property(x => x.Topic).HasMaxLength(128);
builder.Property(x => x.CreatedAt).HasDefaultValueSql("now()");

builder.HasIndex(x => new {x.Topic, x.Partition, x.Id});
builder.HasIndex(x => new {x.Topic, x.Partition, x.Number});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ public void Configure(EntityTypeBuilder<OutboxOffset> builder)
Id = 2,
Topic = "topic-1",
Partition = 0,
LastProcessedId = 0,
LastProcessedNumber = 0,
AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00")
}, new OutboxOffset
{
Id = 3,
Topic = "topic-1",
Partition = 1,
LastProcessedId = 0,
LastProcessedNumber = 0,
AvailableAfter = DateTimeOffset.Parse("2025-05-18T12:00")
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using Outbox.Entities;

namespace Outbox.EntityTypeConfigurations;

public class OutboxOffsetSequenceEntityTypeConfiguration : IEntityTypeConfiguration<OutboxOffsetSequence>
{
public void Configure(EntityTypeBuilder<OutboxOffsetSequence> builder)
{
builder.Property(x => x.Topic).HasMaxLength(128);

builder.HasData(new OutboxOffsetSequence
{
Id = 1,
Topic = "topic-1",
Partition = 0,
},new OutboxOffsetSequence
{
Id = 2,
Topic = "topic-1",
Partition = 1
});
}
}
202 changes: 202 additions & 0 deletions src/Outbox/Migrations/20250519062044_AddPartitionNumber.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading