diff --git a/examples/Example/Program.cs b/examples/Example/Program.cs index 415f052..45e3ab4 100644 --- a/examples/Example/Program.cs +++ b/examples/Example/Program.cs @@ -22,7 +22,7 @@ public static void Main(string[] args) var cdcCancellation = new CancellationTokenSource(); var cdcCancellationToken = cdcCancellation.Token; - var changeDataChannel = Channel.CreateUnbounded>(); + var changeDataChannel = Channel.CreateUnbounded>(); _ = Task.Factory.StartNew(async () => { var lowBoundLsn = await GetStartLsn(connectionString); @@ -47,7 +47,7 @@ public static void Main(string[] args) { Console.WriteLine($"Polling with from '{lowBoundLsn}' to '{highBoundLsn}"); - var changes = new List(); + var changes = new List(); foreach (var table in tables) { var changeSets = await Cdc.GetAllChanges( diff --git a/src/MsSqlCdc/ChangeRow.cs b/src/MsSqlCdc/AllChangeRow.cs similarity index 92% rename from src/MsSqlCdc/ChangeRow.cs rename to src/MsSqlCdc/AllChangeRow.cs index 58fb102..91e64c9 100644 --- a/src/MsSqlCdc/ChangeRow.cs +++ b/src/MsSqlCdc/AllChangeRow.cs @@ -4,7 +4,7 @@ namespace MsSqlCdc; -public enum Operation +public enum AllChangeOperation { Delete = 1, Insert = 2, @@ -12,7 +12,7 @@ public enum Operation AfterUpdate = 4 } -public record ChangeRow +public record AllChangeRow { /// /// Commit LSN associated with the change that preserves the commit order of the change. @@ -29,7 +29,7 @@ public record ChangeRow /// Identifies the data manipulation language (DML) operation needed /// to apply the row of change data to the target data source. /// - public Operation Operation { get; init; } + public AllChangeOperation Operation { get; init; } /// /// A bit mask with a bit corresponding to each captured column identified for the capture instance. @@ -48,10 +48,10 @@ public record ChangeRow /// public IReadOnlyDictionary Fields { get; init; } - public ChangeRow( + public AllChangeRow( BigInteger startLineSequenceNumber, BigInteger sequenceValue, - Operation operation, + AllChangeOperation operation, string updateMask, string captureInstance, IReadOnlyDictionary fields) diff --git a/src/MsSqlCdc/AllChangeRowFactory.cs b/src/MsSqlCdc/AllChangeRowFactory.cs new file mode 100644 index 0000000..0bd12df --- /dev/null +++ b/src/MsSqlCdc/AllChangeRowFactory.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Numerics; +using System.Text; + +namespace MsSqlCdc; + +internal static class AllChangeRowFactory +{ + /// + /// Converts a a collection of columns represented as Dictionary to ChangeData representation. + /// + /// Dictionary of field name and field value. + /// The capture instance. + /// Returns the CDC column as a ChangeData record. + /// + public static AllChangeRow Create(IReadOnlyDictionary fields, string captureInstance) + { + if (GetRequiredFields(fields).Count() < 4) + throw new ArgumentException($"The column fields does not contain all the default CDC column fields."); + + return new AllChangeRow( + GetStartLsn(fields), + GetSeqVal(fields), + GetOperation(fields), + GetUpdateMask(fields), + captureInstance, + GetAdditionalFields(fields)); + } + + private static string GetUpdateMask(IReadOnlyDictionary fields) => + Encoding.UTF8.GetString((byte[])fields[CdcFieldName.UpdateMask]); + + private static BigInteger GetSeqVal(IReadOnlyDictionary fields) => + DataConvert.ConvertBinaryLsn((byte[])fields[CdcFieldName.SeqVal]); + + private static BigInteger GetStartLsn(IReadOnlyDictionary fields) => + DataConvert.ConvertBinaryLsn((byte[])fields[CdcFieldName.StartLsn]); + + private static bool IsRequiredField(string fieldName) => + fieldName == CdcFieldName.StartLsn || + fieldName == CdcFieldName.SeqVal || + fieldName == CdcFieldName.Operation || + fieldName == CdcFieldName.UpdateMask; + + private static IEnumerable> GetRequiredFields( + IReadOnlyDictionary fields) => fields.Where(x => IsRequiredField(x.Key)); + + private static Dictionary GetAdditionalFields(IReadOnlyDictionary fields) + => fields.Where(x => !IsRequiredField(x.Key)).ToDictionary(x => x.Key, x => x.Value); + + private static AllChangeOperation GetOperation(IReadOnlyDictionary fields) => + ConvertOperation((int)fields[CdcFieldName.Operation]); + + private static AllChangeOperation ConvertOperation(int representation) + => representation switch + { + 1 => AllChangeOperation.Delete, + 2 => AllChangeOperation.Insert, + 3 => AllChangeOperation.BeforeUpdate, + 4 => AllChangeOperation.AfterUpdate, + _ => throw new ArgumentException($"Not valid representation value '{representation}'") + }; +} diff --git a/src/MsSqlCdc/Cdc.cs b/src/MsSqlCdc/Cdc.cs index 34d24cf..6af4853 100644 --- a/src/MsSqlCdc/Cdc.cs +++ b/src/MsSqlCdc/Cdc.cs @@ -267,7 +267,7 @@ public static async Task GetNextLsn(SqlConnection connection, BigInt /// /// Returns one net change row for each source row changed within the specified Log Sequence Numbers (LSN) range. /// - public static async Task> GetNetChanges( + public static async Task> GetNetChanges( SqlConnection connection, string captureInstance, BigInteger fromLsn, @@ -279,7 +279,7 @@ public static async Task> GetNetChanges( var filterOption = DataConvert.ConvertNetChangesRowFilterOption(netChangesRowFilterOption); var cdcColumns = await CdcDatabase.GetNetChanges( connection, captureInstance, beginLsnBinary, endLsnBinary, filterOption); - return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList(); + return cdcColumns.Select(x => NetChangeRowFactory.Create(x, captureInstance)).ToList(); } /// @@ -294,7 +294,7 @@ public static async Task> GetNetChanges( /// Returns one row for each change applied to the source table within the specified log sequence number (LSN) range. /// If a source row had multiple changes during the interval, each change is represented in the returned result set. /// - public static async Task> GetAllChanges( + public static async Task> GetAllChanges( SqlConnection connection, string captureInstance, BigInteger beginLsn, @@ -306,6 +306,6 @@ public static async Task> GetAllChanges( var filterOption = DataConvert.ConvertAllChangesRowFilterOption(allChangesRowFilterOption); var cdcColumns = await CdcDatabase.GetAllChanges( connection, captureInstance, beginLsnBinary, endLsnBinary, filterOption); - return cdcColumns.Select(x => DataConvert.ConvertCdcColumn(x, captureInstance)).ToList(); + return cdcColumns.Select(x => AllChangeRowFactory.Create(x, captureInstance)).ToList(); } } diff --git a/src/MsSqlCdc/DataConvert.cs b/src/MsSqlCdc/DataConvert.cs index db6d002..0b8629f 100644 --- a/src/MsSqlCdc/DataConvert.cs +++ b/src/MsSqlCdc/DataConvert.cs @@ -1,68 +1,11 @@ using System; -using System.Collections.Generic; using System.Linq; using System.Numerics; -using System.Text; namespace MsSqlCdc; internal static class DataConvert { - /// - /// Converts a a collection of columns represented as Tuple to ChangeData representation. - /// - /// List of tuples with Item1 being the name column and Item2 being the column value - /// The tablename of the column. - /// Returns the CDC column as a ChangeData record. - /// - public static ChangeRow ConvertCdcColumn( - IReadOnlyDictionary columnFields, - string captureInstance) - { - bool isDefaultCdcField(string fieldName) => - fieldName == CdcFieldName.StartLsn || - fieldName == CdcFieldName.SeqVal || - fieldName == CdcFieldName.Operation || - fieldName == CdcFieldName.UpdateMask; - - if (columnFields.Where(x => isDefaultCdcField(x.Key)).Count() < 4) - throw new ArgumentException( - $"The column fields does not contain all the default CDC column fields."); - - var nonDefaultCdcFields = columnFields - .Where(x => !isDefaultCdcField(x.Key)) - .ToDictionary(x => x.Key, x => x.Value); - - var startLsn = ConvertBinaryLsn((byte[])columnFields[CdcFieldName.StartLsn]); - var seqVal = ConvertBinaryLsn((byte[])columnFields[CdcFieldName.SeqVal]); - var operation = ConvertOperation((int)columnFields[CdcFieldName.Operation]); - var updateMask = Encoding.UTF8.GetString((byte[])columnFields[CdcFieldName.UpdateMask]); - - return new ChangeRow( - startLsn, - seqVal, - operation, - updateMask, - captureInstance, - nonDefaultCdcFields); - } - - /// - /// Converts the number representation to an Enum representation of the value. - /// - /// The number representation of the Operation. - /// Enum representation of the number representation. - /// - public static Operation ConvertOperation(int representation) - => representation switch - { - 1 => Operation.Delete, - 2 => Operation.Insert, - 3 => Operation.BeforeUpdate, - 4 => Operation.AfterUpdate, - _ => throw new ArgumentException($"Not valid representation value '{representation}'") - }; - /// /// Converts RelationOperator enum to a string representation to be used in MS-SQL. /// diff --git a/src/MsSqlCdc/NetChangeRow.cs b/src/MsSqlCdc/NetChangeRow.cs new file mode 100644 index 0000000..ec784b6 --- /dev/null +++ b/src/MsSqlCdc/NetChangeRow.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Numerics; + +namespace MsSqlCdc; + +public enum NetChangeOperation +{ + Delete = 1, + Insert = 2, + Update = 4, + InsertOrUpdate = 5 +} + +public record NetChangeRow +{ + /// + /// All changes committed in the same transaction share the same commit LSN. + /// For example, if an update operation on the source table modifies two columns in two rows, + /// the change table will contain four rows, each with the same __$start_lsnvalue. + /// + public BigInteger StartLineSequenceNumber { get; init; } + + /// + ///Identifies the data manipulation language (DML) operation needed to apply the row of + /// change data to the target data source. + /// If the value of the row_filter_option parameter is all or all with mask, + /// the value in this column can be one of the following values: + /// 1 = Delete + /// 2 = Insert + /// 4 = Update + /// If the value of the row_filter_option parameter is all with merge, + /// the value in this column can be one of the following values: + /// 1 = Delete + /// 5 = Insert or update + /// + public NetChangeOperation Operation { get; init; } + + /// + /// A bit mask with a bit corresponding to each captured column identified for the capture instance. + /// This value has all defined bits set to 1 when __$operation = 1 or 2. When __$operation = 3 or 4, + /// only those bits corresponding to columns that changed are set to 1. + /// + public string? UpdateMask { get; init; } + + /// + /// The name of the capture instance associated with the change. + /// + public string CaptureInstance { get; set; } + + /// + /// The row fields. + /// + public IReadOnlyDictionary Fields { get; init; } + + public NetChangeRow( + BigInteger startLineSequenceNumber, + NetChangeOperation operation, + string? updateMask, + string captureInstance, + IReadOnlyDictionary fields) + { + if (fields is null) + throw new ArgumentNullException($"{nameof(fields)} cannot be null."); + if (string.IsNullOrWhiteSpace(captureInstance)) + throw new ArgumentNullException($"{nameof(captureInstance)} cannot be null, empty or whitespace."); + + StartLineSequenceNumber = startLineSequenceNumber; + Operation = operation; + UpdateMask = updateMask; + CaptureInstance = captureInstance; + Fields = fields; + } +} diff --git a/src/MsSqlCdc/NetChangeRowFactory.cs b/src/MsSqlCdc/NetChangeRowFactory.cs new file mode 100644 index 0000000..3bbeb91 --- /dev/null +++ b/src/MsSqlCdc/NetChangeRowFactory.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Numerics; +using System.Text; + +namespace MsSqlCdc; + +internal static class NetChangeRowFactory +{ + /// + /// Converts a a collection of columns represented as Dictionary to ChangeData representation. + /// + /// Dictionary of field name and field value. + /// The capture instance. + /// Returns the CDC column as a ChangeData record. + /// + public static NetChangeRow Create( + IReadOnlyDictionary fields, + string captureInstance) + { + if (GetRequiredFields(fields).Count() < 3) + throw new ArgumentException($"The column fields does not contain all the default CDC column fields."); + + return new NetChangeRow( + GetStartLsn(fields), + GetOperation(fields), + GetUpdateMask(fields), + captureInstance, + GetAdditionalFields(fields)); + } + + private static BigInteger GetStartLsn(IReadOnlyDictionary fields) => + DataConvert.ConvertBinaryLsn((byte[])fields[CdcFieldName.StartLsn]); + + private static string? GetUpdateMask(IReadOnlyDictionary fields) => + fields[CdcFieldName.UpdateMask] != DBNull.Value + ? Encoding.UTF8.GetString((byte[])fields[CdcFieldName.UpdateMask]) + : null; + + private static bool IsRequiredField(string fieldName) => + fieldName == CdcFieldName.StartLsn || + fieldName == CdcFieldName.Operation || + fieldName == CdcFieldName.UpdateMask; + + private static IEnumerable> GetRequiredFields( + IReadOnlyDictionary fields) => fields.Where(x => IsRequiredField(x.Key)); + + private static Dictionary GetAdditionalFields(IReadOnlyDictionary fields) + => fields.Where(x => !IsRequiredField(x.Key)).ToDictionary(x => x.Key, x => x.Value); + + private static NetChangeOperation GetOperation(IReadOnlyDictionary fields) => + ConvertOperation((int)fields[CdcFieldName.Operation]); + + private static NetChangeOperation ConvertOperation(int representation) + => representation switch + { + 1 => NetChangeOperation.Delete, + 2 => NetChangeOperation.Insert, + 4 => NetChangeOperation.Update, + 5 => NetChangeOperation.InsertOrUpdate, + _ => throw new ArgumentException($"Not valid representation value '{representation}'") + }; +} diff --git a/test/MsSqlCdc.Tests/AllChangeRowFactoryTests.cs b/test/MsSqlCdc.Tests/AllChangeRowFactoryTests.cs new file mode 100644 index 0000000..ff14783 --- /dev/null +++ b/test/MsSqlCdc.Tests/AllChangeRowFactoryTests.cs @@ -0,0 +1,284 @@ +using System; +using System.Collections.Generic; +using static FluentAssertions.FluentActions; +using Xunit; +using FluentAssertions; +using System.Text; +using System.Linq; + +namespace MsSqlCdc.Tests; + +public class AllChangeRowFactoryTests +{ + public static IEnumerable AllChangesFieldsData() + { + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)AllChangeOperation.AfterUpdate}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 10}, + {"Name", "Rune"}, + {"Salary", 20000.00}, + }, + "dbo_Employee", + new AllChangeRow( + 25000L, + 25002L, + AllChangeOperation.AfterUpdate, + "MASK", + "dbo_Employee", + new Dictionary { + {"Id", 10}, + {"Name", "Rune"}, + {"Salary", 20000.00} + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)AllChangeOperation.BeforeUpdate}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 1}, + {"Name", "Simon"}, + }, + "dbo_Employee", + new AllChangeRow( + 25000L, + 25002L, + AllChangeOperation.BeforeUpdate, + "MASK", + "dbo_Employee", + new Dictionary { + {"Id", 1}, + {"Name", "Simon"}, + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)AllChangeOperation.Delete}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 0}, + {"Name", "Jesper"}, + }, + "dbo_Employee", + new AllChangeRow( + 25000L, + 25002L, + AllChangeOperation.Delete, + "MASK", + "dbo_Employee", + new Dictionary{ + {"Id", 0}, + {"Name", "Jesper"}, + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)AllChangeOperation.Insert}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 10}, + }, + "dbo_Animal", + new AllChangeRow( + 25000L, + 25002L, + AllChangeOperation.Insert, + "MASK", + "dbo_Animal", + new Dictionary{ + {"Id", 10}, + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$operation", (int)AllChangeOperation.Insert}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + }, + "dbo_Animal", + new AllChangeRow( + 25000L, + 25002L, + AllChangeOperation.Insert, + "MASK", + "dbo_Animal", + new Dictionary{ + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$operation", (int)AllChangeOperation.Insert}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + }, + "dbo_Animal", + new AllChangeRow( + 25000L, + 25002L, + AllChangeOperation.Insert, + "MASK", + "dbo_Animal", + new Dictionary{ + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$operation", (int)AllChangeOperation.Insert}, + }, + "dbo_Animal", + new AllChangeRow( + 25000L, + 25002L, + AllChangeOperation.Insert, + "MASK", + "dbo_Animal", + new Dictionary{ + }) + }; + + yield return new object[] + { + new Dictionary + { + {"Id", 0}, + {"__$operation", (int)AllChangeOperation.Delete}, + {"Name", "Jesper"}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + }, + "dbo_Employee", + new AllChangeRow( + 25000L, + 25002L, + AllChangeOperation.Delete, + "MASK", + "dbo_Employee", + new Dictionary{ + {"Id", 0}, + {"Name", "Jesper"}, + }) + }; + } + + public static IEnumerable CdcDefaultFieldsInvalidData() + { + yield return new object[] + { + new Dictionary(), + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + }, + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + }, + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + }, + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"Id", 0}, + {"Name", "Rune"} + }, + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"Address", "Streetvalley 20"}, + {"Salary", 2000.00}, + {"Id", 0}, + {"Name", "Rune"} + }, + "dbo_Employee", + }; + } + + [Theory] + [Trait("Category", "Unit")] + [MemberData(nameof(AllChangesFieldsData))] + public void Conversion_cdc_column_to_change_row( + Dictionary columnFields, + string captureInstance, + AllChangeRow expected) + { + var result = AllChangeRowFactory.Create(columnFields, captureInstance); + result.Should().BeEquivalentTo(expected); + } + + [Theory] + [Trait("Category", "Unit")] + [MemberData(nameof(CdcDefaultFieldsInvalidData))] + public void Conversion_cdc_column_without_default_fields_is_invalid( + Dictionary columnFields, + string captureInstance) + { + Invoking(() => AllChangeRowFactory.Create(columnFields, captureInstance)) + .Should() + .Throw() + .WithMessage($"The column fields does not contain all the default CDC column fields."); + } +} diff --git a/test/MsSqlCdc.Tests/CdcTests.cs b/test/MsSqlCdc.Tests/CdcTests.cs index be6eb6e..8f09729 100644 --- a/test/MsSqlCdc.Tests/CdcTests.cs +++ b/test/MsSqlCdc.Tests/CdcTests.cs @@ -1,9 +1,7 @@ using System; -using System.Linq; using System.Numerics; using System.Threading.Tasks; using FluentAssertions; -using FluentAssertions.Execution; using Microsoft.Data.SqlClient; using Xunit; @@ -112,14 +110,14 @@ public async Task Get_all_changes_rowfilter_all() var minLsn = await Cdc.GetMinLsn(connection, captureInstance); var maxLsn = await Cdc.GetMaxLsn(connection); - var netChanges = await Cdc.GetAllChanges( + var allChanges = await Cdc.GetAllChanges( connection, captureInstance, minLsn, maxLsn, AllChangesRowFilterOption.All); - netChanges + allChanges .Should() .HaveCount(2).And .SatisfyRespectively( @@ -128,7 +126,7 @@ public async Task Get_all_changes_rowfilter_all() insert.CaptureInstance.Should().Be(captureInstance); insert.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); insert.SequenceValue.Should().BeGreaterThan(default(BigInteger)); - insert.Operation.Should().Be(Operation.Insert); + insert.Operation.Should().Be(AllChangeOperation.Insert); insert.Fields["id"].Should().NotBeNull(); insert.Fields["first_name"].Should().Be("Rune"); insert.Fields["last_name"].Should().Be("Nielsen"); @@ -138,7 +136,7 @@ public async Task Get_all_changes_rowfilter_all() afterUpdate.CaptureInstance.Should().Be(captureInstance); afterUpdate.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); afterUpdate.SequenceValue.Should().BeGreaterThan(default(BigInteger)); - afterUpdate.Operation.Should().Be(Operation.AfterUpdate); + afterUpdate.Operation.Should().Be(AllChangeOperation.AfterUpdate); afterUpdate.Fields["id"].Should().NotBeNull(); afterUpdate.Fields["first_name"].Should().Be("Rune"); afterUpdate.Fields["last_name"].Should().Be("Jensen"); @@ -155,14 +153,14 @@ public async Task Get_all_changes_rowfilter_all_update_old() var minLsn = await Cdc.GetMinLsn(connection, captureInstance); var maxLsn = await Cdc.GetMaxLsn(connection); - var netChanges = (await Cdc.GetAllChanges( + var allChanges = await Cdc.GetAllChanges( connection, captureInstance, minLsn, maxLsn, - AllChangesRowFilterOption.AllUpdateOld)).ToArray(); + AllChangesRowFilterOption.AllUpdateOld); - netChanges + allChanges .Should() .HaveCount(3).And .SatisfyRespectively( @@ -171,7 +169,7 @@ public async Task Get_all_changes_rowfilter_all_update_old() insert.CaptureInstance.Should().Be(captureInstance); insert.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); insert.SequenceValue.Should().BeGreaterThan(default(BigInteger)); - insert.Operation.Should().Be(Operation.Insert); + insert.Operation.Should().Be(AllChangeOperation.Insert); insert.Fields["id"].Should().NotBeNull(); insert.Fields["first_name"].Should().Be("Rune"); insert.Fields["last_name"].Should().Be("Nielsen"); @@ -181,7 +179,7 @@ public async Task Get_all_changes_rowfilter_all_update_old() beforeUpdate.CaptureInstance.Should().Be(captureInstance); beforeUpdate.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); beforeUpdate.SequenceValue.Should().BeGreaterThan(default(BigInteger)); - beforeUpdate.Operation.Should().Be(Operation.BeforeUpdate); + beforeUpdate.Operation.Should().Be(AllChangeOperation.BeforeUpdate); beforeUpdate.Fields["id"].Should().NotBeNull(); beforeUpdate.Fields["first_name"].Should().Be("Rune"); beforeUpdate.Fields["last_name"].Should().Be("Nielsen"); @@ -191,32 +189,110 @@ public async Task Get_all_changes_rowfilter_all_update_old() afterUpdate.CaptureInstance.Should().Be(captureInstance); afterUpdate.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); afterUpdate.SequenceValue.Should().BeGreaterThan(default(BigInteger)); - afterUpdate.Operation.Should().Be(Operation.AfterUpdate); + afterUpdate.Operation.Should().Be(AllChangeOperation.AfterUpdate); afterUpdate.Fields["id"].Should().NotBeNull(); afterUpdate.Fields["first_name"].Should().Be("Rune"); afterUpdate.Fields["last_name"].Should().Be("Jensen"); }); } - private async Task CreateOpenSqlConnection() + [Fact] + [Trait("Category", "Integration")] + public async Task Get_net_changes_all() { - var connection = new SqlConnection(_databaseFixture.ConnectionString); - await connection.OpenAsync(); - return connection; + var captureInstance = "dbo_Employee"; + using var connection = await CreateOpenSqlConnection(); + var minLsn = await Cdc.GetMinLsn(connection, captureInstance); + var maxLsn = await Cdc.GetMaxLsn(connection); + + var netChanges = await Cdc.GetNetChanges( + connection, + "dbo_Employee", + minLsn, + maxLsn, + NetChangesRowFilterOption.All); + + netChanges + .Should() + .HaveCount(1).And + .SatisfyRespectively( + netChange => + { + netChange.CaptureInstance.Should().Be(captureInstance); + netChange.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); + netChange.UpdateMask.Should().BeNull(); + netChange.Operation.Should().Be(NetChangeOperation.Insert); + netChange.Fields["first_name"].Should().Be("Rune"); + netChange.Fields["last_name"].Should().Be("Jensen"); + }); } - private async Task InsertEmployee(Guid id, string firstName, string lastName) + [Fact] + [Trait("Category", "Integration")] + public async Task Get_net_changes_all_with_mask() { + var captureInstance = "dbo_Employee"; using var connection = await CreateOpenSqlConnection(); - var sql = @" - INSERT INTO [dbo].[employee] ([id], [first_name], [last_name]) - VALUES(@id, @first_name, @last_name)"; + var minLsn = await Cdc.GetMinLsn(connection, captureInstance); + var maxLsn = await Cdc.GetMaxLsn(connection); - using var cmd = new SqlCommand(sql, connection); - cmd.Parameters.AddWithValue("@id", id); - cmd.Parameters.AddWithValue("@first_name", firstName); - cmd.Parameters.AddWithValue("@last_name", lastName); + var netChanges = await Cdc.GetNetChanges( + connection, + "dbo_Employee", + minLsn, + maxLsn, + NetChangesRowFilterOption.AllWithMask); - await cmd.ExecuteNonQueryAsync(); + netChanges + .Should() + .HaveCount(1).And + .SatisfyRespectively( + netChange => + { + netChange.CaptureInstance.Should().Be(captureInstance); + netChange.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); + netChange.UpdateMask.Should().NotBeEmpty(); + netChange.Operation.Should().Be(NetChangeOperation.Insert); + netChange.Fields["first_name"].Should().Be("Rune"); + netChange.Fields["last_name"].Should().Be("Jensen"); + }); + } + + [Fact] + [Trait("Category", "Integration")] + public async Task Get_net_changes_all_with_merge() + { + var captureInstance = "dbo_Employee"; + using var connection = await CreateOpenSqlConnection(); + var minLsn = await Cdc.GetMinLsn(connection, captureInstance); + var maxLsn = await Cdc.GetMaxLsn(connection); + + var netChanges = await Cdc.GetNetChanges( + connection, + "dbo_Employee", + minLsn, + maxLsn, + NetChangesRowFilterOption.AllWithMerge); + + netChanges + .Should() + .HaveCount(1).And + .SatisfyRespectively( + netChange => + { + netChange.CaptureInstance.Should().Be(captureInstance); + netChange.StartLineSequenceNumber.Should().BeGreaterThan(default(BigInteger)); + netChange.UpdateMask.Should().NotBeEmpty(); + netChange.Operation.Should().Be(NetChangeOperation.InsertOrUpdate); + netChange.Fields["first_name"].Should().Be("Rune"); + netChange.Fields["last_name"].Should().Be("Jensen"); + }); + } + + private async Task CreateOpenSqlConnection() + { + var connection = new SqlConnection(_databaseFixture.ConnectionString); + await connection.OpenAsync(); + return connection; } } diff --git a/test/MsSqlCdc.Tests/DataConvertTest.cs b/test/MsSqlCdc.Tests/DataConvertTest.cs index 01f720a..ce6ce67 100644 --- a/test/MsSqlCdc.Tests/DataConvertTest.cs +++ b/test/MsSqlCdc.Tests/DataConvertTest.cs @@ -1,308 +1,10 @@ using Xunit; using FluentAssertions; -using static FluentAssertions.FluentActions; -using System; -using System.Collections.Generic; -using System.Text; -using System.Linq; namespace MsSqlCdc.Tests; public class DataConverTest { - public static IEnumerable CdcColumnFieldsData() - { - yield return new object[] - { - new Dictionary - { - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - {"__$operation", (int)Operation.AfterUpdate}, - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"Id", 10}, - {"Name", "Rune"}, - {"Salary", 20000.00}, - }, - "dbo_Employee", - new ChangeRow( - 25000L, - 25002L, - Operation.AfterUpdate, - "MASK", - "dbo_Employee", - new Dictionary { - {"Id", 10}, - {"Name", "Rune"}, - {"Salary", 20000.00} - }) - }; - - yield return new object[] - { - new Dictionary - { - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - {"__$operation", (int)Operation.BeforeUpdate}, - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"Id", 1}, - {"Name", "Simon"}, - }, - "dbo_Employee", - new ChangeRow( - 25000L, - 25002L, - Operation.BeforeUpdate, - "MASK", - "dbo_Employee", - new Dictionary { - {"Id", 1}, - {"Name", "Simon"}, - }) - }; - - yield return new object[] - { - new Dictionary - { - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - {"__$operation", (int)Operation.Delete}, - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"Id", 0}, - {"Name", "Jesper"}, - }, - "dbo_Employee", - new ChangeRow( - 25000L, - 25002L, - Operation.Delete, - "MASK", - "dbo_Employee", - new Dictionary{ - {"Id", 0}, - {"Name", "Jesper"}, - }) - }; - - yield return new object[] - { - new Dictionary - { - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - {"__$operation", (int)Operation.Insert}, - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"Id", 10}, - }, - "dbo_Animal", - new ChangeRow( - 25000L, - 25002L, - Operation.Insert, - "MASK", - "dbo_Animal", - new Dictionary{ - {"Id", 10}, - }) - }; - - yield return new object[] - { - new Dictionary - { - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - {"__$operation", (int)Operation.Insert}, - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - }, - "dbo_Animal", - new ChangeRow( - 25000L, - 25002L, - Operation.Insert, - "MASK", - "dbo_Animal", - new Dictionary{ - }) - }; - - yield return new object[] - { - new Dictionary - { - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"__$operation", (int)Operation.Insert}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - }, - "dbo_Animal", - new ChangeRow( - 25000L, - 25002L, - Operation.Insert, - "MASK", - "dbo_Animal", - new Dictionary{ - }) - }; - - yield return new object[] - { - new Dictionary - { - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - {"__$operation", (int)Operation.Insert}, - }, - "dbo_Animal", - new ChangeRow( - 25000L, - 25002L, - Operation.Insert, - "MASK", - "dbo_Animal", - new Dictionary{ - }) - }; - - yield return new object[] - { - new Dictionary - { - {"Id", 0}, - {"__$operation", (int)Operation.Delete}, - {"Name", "Jesper"}, - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - }, - "dbo_Employee", - new ChangeRow( - 25000L, - 25002L, - Operation.Delete, - "MASK", - "dbo_Employee", - new Dictionary{ - {"Id", 0}, - {"Name", "Jesper"}, - }) - }; - } - - public static IEnumerable CdcDefaultFieldsInvalidData() - { - yield return new object[] - { - new Dictionary(), - "dbo_Employee", - }; - - yield return new object[] - { - new Dictionary - { - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - }, - "dbo_Employee", - }; - - yield return new object[] - { - new Dictionary - { - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - }, - "dbo_Employee", - }; - - yield return new object[] - { - new Dictionary - { - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - }, - "dbo_Employee", - }; - - yield return new object[] - { - new Dictionary - { - {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, - {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, - {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, - {"Id", 0}, - {"Name", "Rune"} - }, - "dbo_Employee", - }; - - yield return new object[] - { - new Dictionary - { - {"Address", "Streetvalley 20"}, - {"Salary", 2000.00}, - {"Id", 0}, - {"Name", "Rune"} - }, - "dbo_Employee", - }; - } - - [Theory] - [MemberData(nameof(CdcColumnFieldsData))] - public void Conversion_cdc_column_to_change_row( - Dictionary columnFields, - string captureInstance, - ChangeRow expected) - { - var result = DataConvert.ConvertCdcColumn(columnFields, captureInstance); - result.Should().BeEquivalentTo(expected); - } - - [Theory] - [MemberData(nameof(CdcDefaultFieldsInvalidData))] - public void Conversion_cdc_column_without_default_fields_is_invalid( - Dictionary columnFields, - string captureInstance) - { - Invoking(() => DataConvert.ConvertCdcColumn(columnFields, captureInstance)) - .Should() - .Throw() - .WithMessage($"The column fields does not contain all the default CDC column fields."); - } - - [Theory] - [InlineData(1, Operation.Delete)] - [InlineData(2, Operation.Insert)] - [InlineData(3, Operation.BeforeUpdate)] - [InlineData(4, Operation.AfterUpdate)] - public void Operation_valid_number_representation_should_be_converted(int input, Operation expected) - { - var operation = DataConvert.ConvertOperation(input); - operation.Should().Be(expected); - } - - [Theory] - [InlineData(-1)] - [InlineData(-10)] - [InlineData(0)] - [InlineData(100)] - [InlineData(int.MinValue)] - [InlineData(int.MaxValue)] - public void Operation_invalid_number_representation_should_not_be_converted(int input) - { - Invoking(() => DataConvert.ConvertOperation(input)).Should().Throw(); - } - [Theory] [InlineData(RelationalOperator.LargestLessThan, "largest less than")] [InlineData(RelationalOperator.LargestLessThanOrEqual, "largest less than or equal")] diff --git a/test/MsSqlCdc.Tests/NetChangeRowFactoryTests.cs b/test/MsSqlCdc.Tests/NetChangeRowFactoryTests.cs new file mode 100644 index 0000000..347fa1b --- /dev/null +++ b/test/MsSqlCdc.Tests/NetChangeRowFactoryTests.cs @@ -0,0 +1,291 @@ +using System; +using System.Collections.Generic; +using static FluentAssertions.FluentActions; +using Xunit; +using FluentAssertions; +using System.Text; +using System.Linq; + +namespace MsSqlCdc.Tests; + +public class NetChangeRowFactoryTests +{ + public static IEnumerable NetChangesFieldData() + { + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$operation", (int)NetChangeOperation.Update}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 10}, + {"Name", "Rune"}, + {"Salary", 20000.00}, + }, + "dbo_Employee", + new NetChangeRow( + 25000L, + NetChangeOperation.Update, + "MASK", + "dbo_Employee", + new Dictionary { + {"Id", 10}, + {"Name", "Rune"}, + {"Salary", 20000.00} + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$operation", (int)NetChangeOperation.Insert}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 1}, + {"Name", "Simon"}, + }, + "dbo_Employee", + new NetChangeRow( + 25000L, + NetChangeOperation.Insert, + "MASK", + "dbo_Employee", + new Dictionary { + {"Id", 1}, + {"Name", "Simon"}, + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$operation", (int)NetChangeOperation.Delete}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 0}, + {"Name", "Jesper"}, + }, + "dbo_Employee", + new NetChangeRow( + 25000L, + NetChangeOperation.Delete, + "MASK", + "dbo_Employee", + new Dictionary{ + {"Id", 0}, + {"Name", "Jesper"}, + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$operation", (int)NetChangeOperation.Insert}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"Id", 10}, + }, + "dbo_Animal", + new NetChangeRow( + 25000L, + NetChangeOperation.Insert, + "MASK", + "dbo_Animal", + new Dictionary{ + {"Id", 10}, + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$operation", (int)NetChangeOperation.InsertOrUpdate}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + }, + "dbo_Animal", + new NetChangeRow( + 25000L, + NetChangeOperation.InsertOrUpdate, + "MASK", + "dbo_Animal", + new Dictionary{ + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$operation", (int)NetChangeOperation.Insert}, + }, + "dbo_Animal", + new NetChangeRow( + 25000L, + NetChangeOperation.Insert, + "MASK", + "dbo_Animal", + new Dictionary{ + }) + }; + + yield return new object[] + { + new Dictionary + { + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"__$operation", (int)NetChangeOperation.InsertOrUpdate}, + }, + "dbo_Animal", + new NetChangeRow( + 25000L, + NetChangeOperation.InsertOrUpdate, + "MASK", + "dbo_Animal", + new Dictionary{ + }) + }; + + yield return new object[] + { + new Dictionary + { + {"Id", 0}, + {"__$operation", (int)NetChangeOperation.Delete}, + {"Name", "Jesper"}, + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + }, + "dbo_Employee", + new NetChangeRow( + 25000L, + NetChangeOperation.Delete, + "MASK", + "dbo_Employee", + new Dictionary{ + {"Id", 0}, + {"Name", "Jesper"}, + }) + }; + + // The important part with this one is that update_mask can be DBNull.Value + yield return new object[] + { + new Dictionary + { + {"__$operation", (int)NetChangeOperation.Delete}, + {"__$update_mask", DBNull.Value}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"Id", 0}, + {"Name", "Jesper"}, + }, + "dbo_Employee", + new NetChangeRow( + 25000L, + NetChangeOperation.Delete, + null, + "dbo_Employee", + new Dictionary{ + {"Id", 0}, + {"Name", "Jesper"}, + }) + }; + } + + public static IEnumerable InvalidNetChangesFieldData() + { + yield return new object[] + { + new Dictionary(), + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + }, + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + }, + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + }, + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"__$update_mask", Encoding.ASCII.GetBytes("MASK")}, + {"__$seqval", BitConverter.GetBytes(25002L).Reverse().ToArray()}, + {"__$start_lsn", BitConverter.GetBytes(25000L).Reverse().ToArray()}, + {"Id", 0}, + {"Name", "Rune"} + }, + "dbo_Employee", + }; + + yield return new object[] + { + new Dictionary + { + {"Address", "Streetvalley 20"}, + {"Salary", 2000.00}, + {"Id", 0}, + {"Name", "Rune"} + }, + "dbo_Employee", + }; + } + + [Theory] + [Trait("Category", "Unit")] + [MemberData(nameof(NetChangesFieldData))] + public void Conversion_cdc_column_to_change_row( + Dictionary columnFields, + string captureInstance, + NetChangeRow expected) + { + var result = NetChangeRowFactory.Create(columnFields, captureInstance); + result.Should().BeEquivalentTo(expected); + } + + [Theory] + [Trait("Category", "Unit")] + [MemberData(nameof(InvalidNetChangesFieldData))] + public void Conversion_cdc_column_without_default_fields_is_invalid( + Dictionary columnFields, + string captureInstance) + { + Invoking(() => NetChangeRowFactory.Create(columnFields, captureInstance)) + .Should() + .Throw() + .WithMessage($"The column fields does not contain all the default CDC column fields."); + } +}