Skip to content

Commit

Permalink
Postgres support (#108)
Browse files Browse the repository at this point in the history
* Copy pasted code from SQL server to new 'KafkaFlow.Retry.Postgres' project

* Adjusted data types to Npgsql

* Migrated '01 - Create_Tables.sql' & '02 - Populate_Tables.sql' to postgres

* Prepared infrastructure for integration tests

* Fixes after running integration tests

* Postgres improvements - no dbo schema, postgres naming convention for tables, unified columns casing

* Added unit tests (based on sql server)

* Added readme for Postgres

* Fixes for Codacy Static Code Analysis

* PR review - usings inside namespace

* PR review - added .ConfigureAwait(false)

* Fixes after merge from master
  • Loading branch information
alek-github authored Apr 3, 2023
1 parent f18a426 commit 20b242d
Show file tree
Hide file tree
Showing 91 changed files with 3,916 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ jobs:
ACCEPT_EULA: Y
SQLSERVER_SA_PASSWORD: SqlSever123123
SQLSERVER_INTEGRATED_SECURITY: 'False'
POSTGRES_SA_USER: postgres
POSTGRES_SA_PASSWORD: Postgres123123

steps:

Expand Down Expand Up @@ -56,6 +58,9 @@ jobs:

- name: Start SqlServer
run: docker run -d -p 1433:1433 -e ACCEPT_EULA=${{ env.ACCEPT_EULA }} -e SA_PASSWORD=${{ env.SQLSERVER_SA_PASSWORD }} -e MSSQL_PID=Developer mcr.microsoft.com/mssql/server:2017-latest

- name: Start Postgres
run: docker run -d -p 5432:5432 -e POSTGRES_USER=${{ env.POSTGRES_SA_USER }} -e POSTGRES_PASSWORD=${{ env.POSTGRES_SA_PASSWORD }} postgres:latest

- name: Start MongoDB
uses: supercharge/mongodb-github-action@1.6.0
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ jobs:
- name: Pack KafkaFlow.Retry.API
run: dotnet pack src/KafkaFlow.Retry.API/KafkaFlow.Retry.API.csproj -c Release --include-symbols /p:Version=${{ env.BUILD_VERSION }}

- name: Pack KafkaFlow.Retry.Postgres
run: dotnet pack src/KafkaFlow.Retry.Postgres/KafkaFlow.Retry.Postgres.csproj -c Release --include-symbols /p:Version=${{ env.BUILD_VERSION }}

- name: Pack KafkaFlow.Retry.SqlServer
run: dotnet pack src/KafkaFlow.Retry.SqlServer/KafkaFlow.Retry.SqlServer.csproj -c Release --include-symbols /p:Version=${{ env.BUILD_VERSION }}

Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ Want to give it a try? Check out our [Quickstart](https://farfetch.github.io/kaf

### Resilience policies

| Policy | Description | Aka | Required Packages |
| ------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------: | ----------------- |
| **Simple Retry** <br/>(policy family)<br/><sub>([quickstart](#simple) ; deep)</sub> | Many faults are transient and may self-correct after a short delay. | "Maybe it's just a blip" | KafkaFlow.Retry |
| **Forever Retry**<br/>(policy family)<br/><sub>([quickstart](#forever) ; deep)</sub> | Many faults are semi-transient and may self-correct after multiple retries. | "Never give up" | KafkaFlow.Retry |
| **Durable Retry**<br/><sub>([quickstart](#durable) ; deep)</sub> | Beyond a certain amount of retries and waiting, you want to keep processing next-in-line messages but you can't lose the current offset message. As persistence databases, MongoDb or SqlServer is available. And you can manage in-retry messages through HTTP API. | "I can't stop processing messages but I can't lose messages" | KafkaFlow.Retry <br/>KafkaFlow.Retry.API<br/><br/>KafkaFlow.Retry.SqlServer<br/>or<br/>KafkaFlow.Retry.MongoDb |
| Policy | Description | Aka | Required Packages |
| ------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------: |-------------------------------------------------------------------------------------------------------------------------------------------------|
| **Simple Retry** <br/>(policy family)<br/><sub>([quickstart](#simple) ; deep)</sub> | Many faults are transient and may self-correct after a short delay. | "Maybe it's just a blip" | KafkaFlow.Retry |
| **Forever Retry**<br/>(policy family)<br/><sub>([quickstart](#forever) ; deep)</sub> | Many faults are semi-transient and may self-correct after multiple retries. | "Never give up" | KafkaFlow.Retry |
| **Durable Retry**<br/><sub>([quickstart](#durable) ; deep)</sub> | Beyond a certain amount of retries and waiting, you want to keep processing next-in-line messages but you can't lose the current offset message. As persistence databases, MongoDb, Postgres or SqlServer is available. And you can manage in-retry messages through HTTP API. | "I can't stop processing messages but I can't lose messages" | KafkaFlow.Retry <br/>KafkaFlow.Retry.API<br/><br/>KafkaFlow.Retry.SqlServer<br/> or<br/>KafkaFlow.Retry.Postgres or<br/>KafkaFlow.Retry.MongoDb |

## Installation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using KafkaFlow.Retry.IntegrationTests.Core.Messages;
using KafkaFlow.Retry.IntegrationTests.Core.Producers;
using KafkaFlow.Retry.MongoDb;
using KafkaFlow.Retry.Postgres;
using KafkaFlow.Retry.SqlServer;
using KafkaFlow.Serializer;
using KafkaFlow.TypedHandler;
Expand All @@ -26,10 +27,14 @@ internal static class BootstrapperKafka
"test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db-retry",
"test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server",
"test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server-retry",
"test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres",
"test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres-retry",
"test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db",
"test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db-retry",
"test-kafka-flow-retry-retry-durable-latest-consumption-sql-server",
"test-kafka-flow-retry-retry-durable-latest-consumption-sql-server-retry"
"test-kafka-flow-retry-retry-durable-latest-consumption-sql-server-retry",
"test-kafka-flow-retry-retry-durable-latest-consumption-postgres",
"test-kafka-flow-retry-retry-durable-latest-consumption-postgres-retry"
};

internal static IClusterConfigurationBuilder CreatAllTestTopicsIfNotExist(this IClusterConfigurationBuilder cluster)
Expand Down Expand Up @@ -194,6 +199,80 @@ internal static IClusterConfigurationBuilder SetupRetryDurableGuaranteeOrderedCo
return cluster;
}

internal static IClusterConfigurationBuilder SetupRetryDurableGuaranteeOrderedConsumptionPostgresCluster(
this IClusterConfigurationBuilder cluster,
string postgresConnectionString,
string postgresDatabaseName)
{
cluster
.AddProducer<RetryDurableGuaranteeOrderedConsumptionPostgresProducer>(
producer => producer
.DefaultTopic("test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres")
.WithCompression(Confluent.Kafka.CompressionType.Gzip)
.AddMiddlewares(
middlewares => middlewares
.AddSingleTypeSerializer<RetryDurableTestMessage, NewtonsoftJsonSerializer>()))
.AddConsumer(
consumer => consumer
.Topic("test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres")
.WithGroupId("test-consumer-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres")
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(KafkaFlow.AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddSingleTypeSerializer<NewtonsoftJsonSerializer>(typeof(RetryDurableTestMessage))
.RetryDurable(
(configure) => configure
.Handle<RetryDurableTestException>()
.WithMessageType(typeof(RetryDurableTestMessage))
.WithMessageSerializeSettings(
new JsonSerializerSettings
{
DateTimeZoneHandling = DateTimeZoneHandling.Utc,
TypeNameHandling = TypeNameHandling.Auto
})
.WithEmbeddedRetryCluster(
cluster,
configure => configure
.Enabled(true)
.WithRetryTopicName("test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-postgres-retry")
.WithRetryConsumerBufferSize(100)
.WithRetryConsumerWorkersCount(10)
.WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption)
.WithRetryTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Transient)
.AddHandler<RetryDurableTestMessageHandler>()))
.WithPollingJobsConfiguration(
configure => configure
.WithSchedulerId("custom_search_key_durable_guarantee_ordered_consumption_postgres")
.WithRetryDurablePollingConfiguration(
configure => configure
.Enabled(true)
.WithCronExpression("0/30 * * ? * * *")
.WithExpirationIntervalFactor(1)
.WithFetchSize(256))
)
.WithPostgresDataProvider(
postgresConnectionString,
postgresDatabaseName)
.WithRetryPlanBeforeRetryDurable(
configure => configure
.TryTimes(3)
.WithTimeBetweenTriesPlan(
TimeSpan.FromMilliseconds(250),
TimeSpan.FromMilliseconds(500),
TimeSpan.FromMilliseconds(1000))
.ShouldPauseConsumer(false)))
.AddTypedHandlers(
handlers =>
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler<RetryDurableTestMessageHandler>())));
return cluster;
}

internal static IClusterConfigurationBuilder SetupRetryDurableLatestConsumptionMongoDbCluster(
this IClusterConfigurationBuilder cluster,
string mongoDbConnectionString,
Expand Down Expand Up @@ -347,6 +426,80 @@ internal static IClusterConfigurationBuilder SetupRetryDurableLatestConsumptionS
return cluster;
}

internal static IClusterConfigurationBuilder SetupRetryDurableLatestConsumptionPostgresCluster(
this IClusterConfigurationBuilder cluster,
string postgresConnectionString,
string postgresDatabaseName)
{
cluster
.AddProducer<RetryDurableLatestConsumptionPostgresProducer>(
producer => producer
.DefaultTopic("test-kafka-flow-retry-retry-durable-latest-consumption-postgres")
.WithCompression(Confluent.Kafka.CompressionType.Gzip)
.AddMiddlewares(
middlewares => middlewares
.AddSingleTypeSerializer<RetryDurableTestMessage, NewtonsoftJsonSerializer>()))
.AddConsumer(
consumer => consumer
.Topic("test-kafka-flow-retry-retry-durable-latest-consumption-postgres")
.WithGroupId("test-consumer-kafka-flow-retry-retry-durable-latest-consumption-postgres")
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset((KafkaFlow.AutoOffsetReset)AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddSingleTypeSerializer<NewtonsoftJsonSerializer>(typeof(RetryDurableTestMessage))
.RetryDurable(
(configure) => configure
.Handle<RetryDurableTestException>()
.WithMessageType(typeof(RetryDurableTestMessage))
.WithMessageSerializeSettings(
new JsonSerializerSettings
{
DateTimeZoneHandling = DateTimeZoneHandling.Utc,
TypeNameHandling = TypeNameHandling.Auto
})
.WithEmbeddedRetryCluster(
cluster,
configure => configure
.Enabled(true)
.WithRetryTopicName("test-kafka-flow-retry-retry-durable-latest-consumption-postgres-retry")
.WithRetryConsumerBufferSize(100)
.WithRetryConsumerWorkersCount(10)
.WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption)
.WithRetryTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Transient)
.AddHandler<RetryDurableTestMessageHandler>()))
.WithPollingJobsConfiguration(
configure => configure
.WithSchedulerId("custom_search_key_durable_latest_consumption_postgres")
.WithRetryDurablePollingConfiguration(
configure => configure
.Enabled(true)
.WithCronExpression("0/30 * * ? * * *")
.WithExpirationIntervalFactor(1)
.WithFetchSize(256))
)
.WithPostgresDataProvider(
postgresConnectionString,
postgresDatabaseName)
.WithRetryPlanBeforeRetryDurable(
configure => configure
.TryTimes(3)
.WithTimeBetweenTriesPlan(
TimeSpan.FromMilliseconds(250),
TimeSpan.FromMilliseconds(500),
TimeSpan.FromMilliseconds(1000))
.ShouldPauseConsumer(false)))
.AddTypedHandlers(
handlers =>
handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler<RetryDurableTestMessageHandler>())));
return cluster;
}

internal static IClusterConfigurationBuilder SetupRetryForeverCluster(this IClusterConfigurationBuilder cluster)
{
cluster
Expand Down
Loading

0 comments on commit 20b242d

Please sign in to comment.