From 8f38d9cbd0669dac88d6a4187df4ef6241bfe943 Mon Sep 17 00:00:00 2001 From: Carlos Miranda Date: Thu, 24 Aug 2023 10:09:17 +0100 Subject: [PATCH] feat: added ability to schema sql server operations (#136) Added ability to schema SQL Server operations --- .../Settings/SqlServerRepositorySettings.cs | 1 + .../Repositories/SqlServerRepository.cs | 4 +-- .../CreateSchemaCreatorTests.cs | 25 +++++++------- .../conf/appsettings.json | 3 +- .../DbConnectionContext.cs | 2 ++ .../IDbConnection.cs | 2 ++ .../IRetryQueueItemMessageHeaderRepository.cs | 4 +-- .../IRetryQueueItemMessageRepository.cs | 4 +-- .../Repositories/IRetryQueueItemRepository.cs | 2 +- .../Repositories/IRetryQueueRepository.cs | 2 +- .../RetryQueueItemMessageHeaderRepository.cs | 8 ++--- .../RetryQueueItemMessageRepository.cs | 8 ++--- .../Repositories/RetryQueueItemRepository.cs | 34 +++++++++---------- .../Repositories/RetryQueueRepository.cs | 24 ++++++------- .../RetryDurableDefinitionBuilderExtension.cs | 26 ++++++++++++-- .../SqlServerDbSettings.cs | 29 ++++++++++++++-- .../SqlServer/ConnectionProviderTests.cs | 4 +-- ...yDurableDefinitionBuilderExtensionTests.cs | 13 +++++++ 18 files changed, 130 insertions(+), 65 deletions(-) diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Settings/SqlServerRepositorySettings.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Settings/SqlServerRepositorySettings.cs index 89db565c..45c418f1 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Settings/SqlServerRepositorySettings.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Settings/SqlServerRepositorySettings.cs @@ -5,5 +5,6 @@ internal class SqlServerRepositorySettings public string ConnectionString { get; set; } public string DatabaseName { get; set; } + public string Schema { get; set; } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs index bf5fefa5..d02dbbb6 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs @@ -17,9 +17,9 @@ internal class SqlServerRepository : IRepository { + private const string schema = "dbo"; private const int TimeoutSec = 60; private readonly ConnectionProvider connectionProvider; - private readonly IRetryQueueItemMessageHeaderRepository retryQueueItemMessageHeaderRepository; private readonly IRetryQueueItemMessageRepository retryQueueItemMessageRepository; private readonly IRetryQueueItemRepository retryQueueItemRepository; @@ -31,7 +31,7 @@ public SqlServerRepository( string connectionString, string dbName) { - this.sqlServerDbSettings = new SqlServerDbSettings(connectionString, dbName); + this.sqlServerDbSettings = new SqlServerDbSettings(connectionString, dbName, schema); this.RetryQueueDataProvider = new SqlServerDbDataProviderFactory().Create(this.sqlServerDbSettings); diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CreateSchemaCreatorTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CreateSchemaCreatorTests.cs index c9b7b210..ca2d88da 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CreateSchemaCreatorTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/CreateSchemaCreatorTests.cs @@ -14,31 +14,32 @@ public CreateSchemaCreatorTests(BootstrapperRepositoryFixture bootstrapperReposi } [Fact] - public async Task SqlServerDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully() + public async Task PostgresDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully() { - var sqlDataProviderFactory = new SqlServerDbDataProviderFactory(); + var postgresDataProviderFactory = new PostgresDbDataProviderFactory(); - var connectionString = this.bootstrapperRepositoryFixture.SqlServerSettings.ConnectionString; - var databaseName = this.bootstrapperRepositoryFixture.SqlServerSettings.DatabaseName; + var connectionString = this.bootstrapperRepositoryFixture.PostgresSettings.ConnectionString; + var databaseName = this.bootstrapperRepositoryFixture.PostgresSettings.DatabaseName; - var sqlSettings = new SqlServerDbSettings(connectionString, databaseName); + var postgresSettings = new PostgresDbSettings(connectionString, databaseName); - var retrySchemaCreator = sqlDataProviderFactory.CreateSchemaCreator(sqlSettings); + var retrySchemaCreator = postgresDataProviderFactory.CreateSchemaCreator(postgresSettings); await retrySchemaCreator.CreateOrUpdateSchemaAsync(databaseName); } [Fact] - public async Task PostgresDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully() + public async Task SqlServerDbDataProviderFactory_CreateSchemaCreator_ExecuteSuccessfully() { - var postgresDataProviderFactory = new PostgresDbDataProviderFactory(); + var sqlDataProviderFactory = new SqlServerDbDataProviderFactory(); - var connectionString = this.bootstrapperRepositoryFixture.PostgresSettings.ConnectionString; - var databaseName = this.bootstrapperRepositoryFixture.PostgresSettings.DatabaseName; + var connectionString = this.bootstrapperRepositoryFixture.SqlServerSettings.ConnectionString; + var databaseName = this.bootstrapperRepositoryFixture.SqlServerSettings.DatabaseName; + var schema = this.bootstrapperRepositoryFixture.SqlServerSettings.Schema; - var postgresSettings = new PostgresDbSettings(connectionString, databaseName); + var sqlSettings = new SqlServerDbSettings(connectionString, databaseName, schema); - var retrySchemaCreator = postgresDataProviderFactory.CreateSchemaCreator(postgresSettings); + var retrySchemaCreator = sqlDataProviderFactory.CreateSchemaCreator(sqlSettings); await retrySchemaCreator.CreateOrUpdateSchemaAsync(databaseName); } diff --git a/src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json b/src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json index 506232aa..b4f7043a 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json +++ b/src/KafkaFlow.Retry.IntegrationTests/conf/appsettings.json @@ -11,7 +11,8 @@ }, "SqlServerRepository": { "ConnectionString": "Server=localhost; User ID=SA; Password=SqlSever123123; Pooling=true; Trusted_Connection=true; Integrated Security=true; Min Pool Size=1; Max Pool Size=100; MultipleActiveResultSets=true; Application Name=KafkaFlow Retry Tests;", - "DatabaseName": "kafka_flow_retry_durable_test" + "DatabaseName": "kafka_flow_retry_durable_test", + "Schema": "dbo" }, "PostgresRepository": { "ConnectionString": "Server=localhost;Database=postgres;User Id=postgres;Password=Postgres123123;Port=5432;Application Name=KafkaFlow Retry Tests;", diff --git a/src/KafkaFlow.Retry.SqlServer/DbConnectionContext.cs b/src/KafkaFlow.Retry.SqlServer/DbConnectionContext.cs index 6f44b184..2607ffe5 100644 --- a/src/KafkaFlow.Retry.SqlServer/DbConnectionContext.cs +++ b/src/KafkaFlow.Retry.SqlServer/DbConnectionContext.cs @@ -20,6 +20,8 @@ public DbConnectionContext(SqlServerDbSettings sqlServerDbSettings, bool withinT this.withinTransaction = withinTransaction; } + public string Schema => this.sqlServerDbSettings.Schema; + public void Commit() { if (this.sqlTransaction is object) diff --git a/src/KafkaFlow.Retry.SqlServer/IDbConnection.cs b/src/KafkaFlow.Retry.SqlServer/IDbConnection.cs index 6a53bbd7..e0658e6c 100644 --- a/src/KafkaFlow.Retry.SqlServer/IDbConnection.cs +++ b/src/KafkaFlow.Retry.SqlServer/IDbConnection.cs @@ -5,6 +5,8 @@ internal interface IDbConnection : IDisposable { + string Schema { get; } + SqlCommand CreateCommand(); } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageHeaderRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageHeaderRepository.cs index cf783db3..4533f6e1 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageHeaderRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageHeaderRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System.Collections.Generic; using System.Threading.Tasks; @@ -10,4 +10,4 @@ internal interface IRetryQueueItemMessageHeaderRepository Task> GetOrderedAsync(IDbConnection dbConnection, IEnumerable retryQueueItemMessagesDbo); } -} +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageRepository.cs index 9d650e73..92ee6444 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemMessageRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System.Collections.Generic; using System.Threading.Tasks; @@ -10,4 +10,4 @@ internal interface IRetryQueueItemMessageRepository Task> GetMessagesOrderedAsync(IDbConnection dbConnection, IEnumerable retryQueueItemsDbo); } -} +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemRepository.cs index 351eb2d0..3537c0eb 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueItemRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System; using System.Collections.Generic; diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs index 7f63f5cc..14f53cdd 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System; using System.Collections.Generic; diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageHeaderRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageHeaderRepository.cs index 6161c843..8f4184e5 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageHeaderRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageHeaderRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System.Collections.Generic; using System.Data.SqlClient; @@ -29,8 +29,8 @@ public async Task> GetOrderedAsync(IDbConn { command.CommandType = System.Data.CommandType.Text; command.CommandText = $@"SELECT * - FROM [RetryItemMessageHeaders] h - INNER JOIN [RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage + FROM [{dbConnection.Schema}].[RetryItemMessageHeaders] h + INNER JOIN [{dbConnection.Schema}].[RetryQueueItems] rqi ON rqi.Id = h.IdItemMessage WHERE h.IdItemMessage IN ({string.Join(",", retryQueueItemMessagesDbo.Select(x => $"'{x.IdRetryQueueItem}'"))}) ORDER BY rqi.IdRetryQueue, h.IdItemMessage"; @@ -46,7 +46,7 @@ private async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageHea using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"INSERT INTO [RetryItemMessageHeaders] + command.CommandText = $@"INSERT INTO [{dbConnection.Schema}].[RetryItemMessageHeaders] (IdItemMessage, [Key], Value) VALUES (@IdItemMessage, @Key, @Value)"; diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageRepository.cs index 521c09a5..4bd755ad 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemMessageRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System.Collections.Generic; using System.Data.SqlClient; @@ -16,7 +16,7 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemMessageDbo using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"INSERT INTO [ItemMessages] + command.CommandText = $@"INSERT INTO [{dbConnection.Schema}].[ItemMessages] (IdRetryQueueItem, [Key], Value, TopicName, Partition, Offset, UtcTimeStamp) VALUES (@idRetryQueueItem, @key, @value, @topicName, @partition, @offSet, @utcTimeStamp)"; @@ -50,11 +50,11 @@ public async Task> GetMessagesOrderedAsync(IDbCo } var parameter = new SqlParameter("@RetryQueueItemsIds", entriesToLoad); parameter.Direction = System.Data.ParameterDirection.Input; - parameter.TypeName = "dbo.TY_RetryQueueItemsIds"; + parameter.TypeName = $"{dbConnection.Schema}.TY_RetryQueueItemsIds"; command.Parameters.Add(parameter); command.CommandType = System.Data.CommandType.Text; - command.CommandText = $@"EXEC P_LoadItemMessages @RetryQueueItemsIds"; + command.CommandText = $@"EXEC {dbConnection.Schema}.P_LoadItemMessages @RetryQueueItemsIds"; return await this.ExecuteReaderAsync(command).ConfigureAwait(false); } diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemRepository.cs index 02f4f8c4..46bcff95 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueItemRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System; using System.Collections.Generic; @@ -21,11 +21,11 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueItemDbo r using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"INSERT INTO [RetryQueueItems] + command.CommandText = $@"INSERT INTO [{dbConnection.Schema}].[RetryQueueItems] (IdDomain, IdRetryQueue, IdDomainRetryQueue, IdItemStatus, IdSeverityLevel, AttemptsCount, Sort, CreationDate, LastExecution, ModifiedStatusDate, Description) VALUES (@idDomain, @idRetryQueue, @idDomainRetryQueue, @idItemStatus, @idSeverityLevel, @attemptsCount, - (SELECT COUNT(1) FROM [RetryQueueItems] WHERE IdDomainRetryQueue = @idDomainRetryQueue), + (SELECT COUNT(1) FROM [{dbConnection.Schema}].[RetryQueueItems] WHERE IdDomainRetryQueue = @idDomainRetryQueue), @creationDate, @lastExecution, @modifiedStatusDate, @description); SELECT SCOPE_IDENTITY()"; @@ -53,8 +53,8 @@ public async Task AnyItemStillActiveAsync(IDbConnection dbConnection, Guid using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT 1 WHERE EXISTS( - SELECT TOP 1 * FROM [RetryQueueItems] + command.CommandText = $@"SELECT 1 WHERE EXISTS( + SELECT TOP 1 * FROM [{dbConnection.Schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue = @IdDomainRetryQueue AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry))"; @@ -77,8 +77,8 @@ public async Task GetItemAsync(IDbConnection dbConnection, Gu using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT * - FROM [RetryQueueItems] + command.CommandText = $@"SELECT * + FROM [{dbConnection.Schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomain = @IdDomain"; @@ -96,8 +96,8 @@ public async Task> GetItemsByQueueOrderedAsync(IDbConne using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT * - FROM [RetryQueueItems] + command.CommandText = $@"SELECT * + FROM [{dbConnection.Schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue = @IdDomainRetryQueue ORDER BY Sort ASC"; @@ -133,20 +133,20 @@ public async Task> GetItemsOrderedAsync( } query = string.Concat(query, $@" * - FROM [RetryQueueItems] + FROM [{dbConnection.Schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue IN ({string.Join(",", retryQueueIds.Select(x => $"'{x}'"))})"); if (stuckStatusFilter is null) { - query = string.Concat(query, $" AND IdItemStatus IN({ string.Join(",", statuses.Select(x => (byte)x))})"); + query = string.Concat(query, $" AND IdItemStatus IN({string.Join(",", statuses.Select(x => (byte)x))})"); } else { query = string.Concat(query, $@" AND( - IdItemStatus IN({ string.Join(",", statuses.Select(x => (byte)x))}) + IdItemStatus IN({string.Join(",", statuses.Select(x => (byte)x))}) OR( - IdItemStatus = { (byte)stuckStatusFilter.ItemStatus} + IdItemStatus = {(byte)stuckStatusFilter.ItemStatus} AND DATEADD(SECOND, {Math.Floor(stuckStatusFilter.ExpirationInterval.TotalSeconds)}, ModifiedStatusDate) < @DateTimeUtcNow ) )"); @@ -173,7 +173,7 @@ public async Task> GetNewestItemsAsync(IDbConnection db { command.CommandType = System.Data.CommandType.Text; command.CommandText = $@"SELECT * - FROM [RetryQueueItems] + FROM [{dbConnection.Schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue = @IdDomainRetryQueue AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry) @@ -198,7 +198,7 @@ public async Task> GetPendingItemsAsync(IDbConnection d { command.CommandType = System.Data.CommandType.Text; command.CommandText = $@"SELECT * - FROM [RetryQueueItems] + FROM [{dbConnection.Schema}].[RetryQueueItems] WITH (NOLOCK) WHERE IdDomainRetryQueue = @IdDomainRetryQueue AND IdItemStatus IN (@IdItemStatusWaiting, @IdItemStatusInRetry) @@ -237,7 +237,7 @@ public async Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, Re using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueueItems] + command.CommandText = $@"UPDATE [{dbConnection.Schema}].[RetryQueueItems] SET IdItemStatus = @IdItemStatus, AttemptsCount = @AttemptsCount, LastExecution = @LastExecution, @@ -265,7 +265,7 @@ public async Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDoma using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueueItems] + command.CommandText = $@"UPDATE [{dbConnection.Schema}].[RetryQueueItems] SET IdItemStatus = @IdItemStatus, ModifiedStatusDate = @DateTimeUtcNow WHERE IdDomain = @IdDomain"; diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs index 54e5d3ea..6eed95b7 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Retry.SqlServer.Repositories +namespace KafkaFlow.Retry.SqlServer.Repositories { using System; using System.Collections.Generic; @@ -15,7 +15,7 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueDbo retry using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"INSERT INTO [RetryQueues] + command.CommandText = $@"INSERT INTO {dbConnection.Schema}.[RetryQueues] (IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution) VALUES (@idDomain, @idStatus, @searchGroupKey, @queueGroupKey, @creationDate, @lastExecution); @@ -38,9 +38,9 @@ public async Task DeleteQueuesAsync(IDbConnection dbConnection, string sear using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"DELETE FROM [RetryQueues] WHERE Id IN + command.CommandText = $@"DELETE FROM [{dbConnection.Schema}].[RetryQueues] WHERE Id IN ( - SELECT Id FROM [RetryQueues] rq + SELECT Id FROM [{dbConnection.Schema}].[RetryQueues] rq WHERE rq.SearchGroupKey = @SearchGroupKey AND rq.LastExecution < @MaxLastExecutionDateToBeKept AND rq.IdStatus = @IdStatus @@ -63,8 +63,8 @@ public async Task ExistsActiveAsync(IDbConnection dbConnection, string que using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT COUNT(1) - FROM [RetryQueues] + command.CommandText = $@"SELECT COUNT(1) + FROM [{dbConnection.Schema}].[RetryQueues] WHERE QueueGroupKey = @QueueGroupKey AND IdStatus <> @IdStatus"; command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey); @@ -79,8 +79,8 @@ public async Task GetQueueAsync(IDbConnection dbConnection, strin using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"SELECT Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution - FROM [RetryQueues] + command.CommandText = $@"SELECT Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution + FROM [{dbConnection.Schema}].[RetryQueues] WHERE QueueGroupKey = @QueueGroupKey ORDER BY Id"; @@ -97,7 +97,7 @@ public async Task> GetTopSortedQueuesOrderedAsync(IDbConnec command.CommandType = System.Data.CommandType.Text; var innerQuery = $@" SELECT TOP({top}) Id, IdDomain, IdStatus, SearchGroupKey, QueueGroupKey, CreationDate, LastExecution - FROM [RetryQueues] + FROM [{dbConnection.Schema}].[RetryQueues] WHERE IdStatus = @IdStatus"; if (searchGroupKey is object) @@ -122,7 +122,7 @@ public async Task UpdateAsync(IDbConnection dbConnection, Guid idDomain, Re using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueues] + command.CommandText = $@"UPDATE [{dbConnection.Schema}].[RetryQueues] SET IdStatus = @IdStatus, LastExecution = @LastExecution WHERE IdDomain = @IdDomain"; @@ -140,7 +140,7 @@ public async Task UpdateLastExecutionAsync(IDbConnection dbConnection, Guid using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueues] + command.CommandText = $@"UPDATE [{dbConnection.Schema}].[RetryQueues] SET LastExecution = @LastExecution WHERE IdDomain = @IdDomain"; @@ -156,7 +156,7 @@ public async Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDoma using (var command = dbConnection.CreateCommand()) { command.CommandType = System.Data.CommandType.Text; - command.CommandText = @"UPDATE [RetryQueues] + command.CommandText = $@"UPDATE [{dbConnection.Schema}].[RetryQueues] SET IdStatus = @IdStatus WHERE IdDomain = @IdDomain"; diff --git a/src/KafkaFlow.Retry.SqlServer/RetryDurableDefinitionBuilderExtension.cs b/src/KafkaFlow.Retry.SqlServer/RetryDurableDefinitionBuilderExtension.cs index e3ee1897..231bd8dc 100644 --- a/src/KafkaFlow.Retry.SqlServer/RetryDurableDefinitionBuilderExtension.cs +++ b/src/KafkaFlow.Retry.SqlServer/RetryDurableDefinitionBuilderExtension.cs @@ -2,17 +2,39 @@ { public static class RetryDurableDefinitionBuilderExtension { + private const string schemaDefault = "dbo"; + public static RetryDurableDefinitionBuilder WithSqlServerDataProvider( this RetryDurableDefinitionBuilder retryDurableDefinitionBuilder, string connectionString, - string databaseName) + string databaseName, + string schema) + { + retryDurableDefinitionBuilder.WithRepositoryProvider( + new SqlServerDbDataProviderFactory() + .Create( + new SqlServerDbSettings( + connectionString, + databaseName, + schema) + ) + ); + + return retryDurableDefinitionBuilder; + } + + public static RetryDurableDefinitionBuilder WithSqlServerDataProvider( + this RetryDurableDefinitionBuilder retryDurableDefinitionBuilder, + string connectionString, + string databaseName) { retryDurableDefinitionBuilder.WithRepositoryProvider( new SqlServerDbDataProviderFactory() .Create( new SqlServerDbSettings( connectionString, - databaseName) + databaseName, + schemaDefault) ) ); diff --git a/src/KafkaFlow.Retry.SqlServer/SqlServerDbSettings.cs b/src/KafkaFlow.Retry.SqlServer/SqlServerDbSettings.cs index 00b13b4d..5b901861 100644 --- a/src/KafkaFlow.Retry.SqlServer/SqlServerDbSettings.cs +++ b/src/KafkaFlow.Retry.SqlServer/SqlServerDbSettings.cs @@ -1,22 +1,45 @@ namespace KafkaFlow.Retry.SqlServer { - using Dawn; using System.Diagnostics.CodeAnalysis; + using Dawn; + + /// + /// Defines the Sql Server database settings. + /// [ExcludeFromCodeCoverage] public class SqlServerDbSettings { - public SqlServerDbSettings(string connectionString, string databaseName) + /// + /// Creates a Sql Server database settings + /// + /// The connection string of the Sql Server. + /// The database name. + /// The schema name. + public SqlServerDbSettings(string connectionString, string databaseName, string schema) { Guard.Argument(connectionString).NotNull().NotEmpty(); Guard.Argument(databaseName).NotNull().NotEmpty(); + Guard.Argument(schema).NotNull().NotEmpty(); ConnectionString = connectionString; DatabaseName = databaseName; + Schema = schema; } + /// + /// Gets the connection string of the Sql Server database. + /// public string ConnectionString { get; } + /// + /// Gets the database name. + /// public string DatabaseName { get; } + + /// + /// Gets the schema name. + /// + public string Schema { get; } } -} +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/ConnectionProviderTests.cs b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/ConnectionProviderTests.cs index 8600338a..1f588846 100644 --- a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/ConnectionProviderTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/ConnectionProviderTests.cs @@ -13,7 +13,7 @@ public class ConnectionProviderTests public void ConnectionProvider_Create_Success() { // Act - var result = provider.Create(new SqlServerDbSettings("connectionString", "databaseName")); + var result = provider.Create(new SqlServerDbSettings("connectionString", "databaseName", "schema")); // Arrange result.Should().NotBeNull(); @@ -34,7 +34,7 @@ public void ConnectionProvider_Create_WithoutSqlServerDbSettings_ThrowsException public void ConnectionProvider_CreateWithinTransaction_Success() { // Act - var result = provider.CreateWithinTransaction(new SqlServerDbSettings("connectionString", "databaseName")); + var result = provider.CreateWithinTransaction(new SqlServerDbSettings("connectionString", "databaseName", "schema")); // Arrange result.Should().NotBeNull(); diff --git a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/RetryDurableDefinitionBuilderExtensionTests.cs b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/RetryDurableDefinitionBuilderExtensionTests.cs index b7599af1..945ec784 100644 --- a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/RetryDurableDefinitionBuilderExtensionTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/RetryDurableDefinitionBuilderExtensionTests.cs @@ -18,5 +18,18 @@ public void RetryDurableDefinitionBuilderExtension_WithSqlServerDataProvider_Suc // Arrange result.Should().NotBeNull(); } + + [Fact] + public void RetryDurableDefinitionBuilderExtension_WithSqlServerDataProviderAndSchema_Success() + { + // Arrange + var builder = new RetryDurableDefinitionBuilder(); + + // Act + var result = builder.WithSqlServerDataProvider("connectionString", "databaseName", "schema"); + + // Arrange + result.Should().NotBeNull(); + } } } \ No newline at end of file