-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* ChangeRow renamed to AllChangeRow to seperate netchanges and all changes * moves AllChangeRow creation to its own factory * adds NetChangeFactory * adds get required fields for factories * simplified factories
- Loading branch information
1 parent
051c267
commit 7329e1e
Showing
11 changed files
with
890 additions
and
391 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Numerics; | ||
using System.Text; | ||
|
||
namespace MsSqlCdc; | ||
|
||
internal static class AllChangeRowFactory | ||
{ | ||
/// <summary> | ||
/// Converts a a collection of columns represented as Dictionary<string, object> to ChangeData representation. | ||
/// </summary> | ||
/// <param name="fields">Dictionary of field name and field value.</param> | ||
/// <param name="captureInstance">The capture instance.</param> | ||
/// <returns>Returns the CDC column as a ChangeData record.</returns> | ||
/// <exception cref="Exception"></exception> | ||
public static AllChangeRow Create(IReadOnlyDictionary<string, object> fields, string captureInstance) | ||
{ | ||
if (GetRequiredFields(fields).Count() < 4) | ||
throw new ArgumentException($"The column fields does not contain all the default CDC column fields."); | ||
|
||
return new AllChangeRow( | ||
GetStartLsn(fields), | ||
GetSeqVal(fields), | ||
GetOperation(fields), | ||
GetUpdateMask(fields), | ||
captureInstance, | ||
GetAdditionalFields(fields)); | ||
} | ||
|
||
private static string GetUpdateMask(IReadOnlyDictionary<string, object> fields) => | ||
Encoding.UTF8.GetString((byte[])fields[CdcFieldName.UpdateMask]); | ||
|
||
private static BigInteger GetSeqVal(IReadOnlyDictionary<string, object> fields) => | ||
DataConvert.ConvertBinaryLsn((byte[])fields[CdcFieldName.SeqVal]); | ||
|
||
private static BigInteger GetStartLsn(IReadOnlyDictionary<string, object> fields) => | ||
DataConvert.ConvertBinaryLsn((byte[])fields[CdcFieldName.StartLsn]); | ||
|
||
private static bool IsRequiredField(string fieldName) => | ||
fieldName == CdcFieldName.StartLsn || | ||
fieldName == CdcFieldName.SeqVal || | ||
fieldName == CdcFieldName.Operation || | ||
fieldName == CdcFieldName.UpdateMask; | ||
|
||
private static IEnumerable<KeyValuePair<string, object>> GetRequiredFields( | ||
IReadOnlyDictionary<string, object> fields) => fields.Where(x => IsRequiredField(x.Key)); | ||
|
||
private static Dictionary<string, object> GetAdditionalFields(IReadOnlyDictionary<string, object> fields) | ||
=> fields.Where(x => !IsRequiredField(x.Key)).ToDictionary(x => x.Key, x => x.Value); | ||
|
||
private static AllChangeOperation GetOperation(IReadOnlyDictionary<string, object> fields) => | ||
ConvertOperation((int)fields[CdcFieldName.Operation]); | ||
|
||
private static AllChangeOperation ConvertOperation(int representation) | ||
=> representation switch | ||
{ | ||
1 => AllChangeOperation.Delete, | ||
2 => AllChangeOperation.Insert, | ||
3 => AllChangeOperation.BeforeUpdate, | ||
4 => AllChangeOperation.AfterUpdate, | ||
_ => throw new ArgumentException($"Not valid representation value '{representation}'") | ||
}; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Numerics; | ||
|
||
namespace MsSqlCdc; | ||
|
||
public enum NetChangeOperation | ||
{ | ||
Delete = 1, | ||
Insert = 2, | ||
Update = 4, | ||
InsertOrUpdate = 5 | ||
} | ||
|
||
public record NetChangeRow | ||
{ | ||
/// <summary> | ||
/// All changes committed in the same transaction share the same commit LSN. | ||
/// For example, if an update operation on the source table modifies two columns in two rows, | ||
/// the change table will contain four rows, each with the same __$start_lsnvalue. | ||
/// </summary> | ||
public BigInteger StartLineSequenceNumber { get; init; } | ||
|
||
/// <summary> | ||
///Identifies the data manipulation language (DML) operation needed to apply the row of | ||
/// change data to the target data source. | ||
/// If the value of the row_filter_option parameter is all or all with mask, | ||
/// the value in this column can be one of the following values: | ||
/// 1 = Delete | ||
/// 2 = Insert | ||
/// 4 = Update | ||
/// If the value of the row_filter_option parameter is all with merge, | ||
/// the value in this column can be one of the following values: | ||
/// 1 = Delete | ||
/// 5 = Insert or update | ||
/// </summary> | ||
public NetChangeOperation Operation { get; init; } | ||
|
||
/// <summary> | ||
/// A bit mask with a bit corresponding to each captured column identified for the capture instance. | ||
/// This value has all defined bits set to 1 when __$operation = 1 or 2. When __$operation = 3 or 4, | ||
/// only those bits corresponding to columns that changed are set to 1. | ||
/// </summary> | ||
public string? UpdateMask { get; init; } | ||
|
||
/// <summary> | ||
/// The name of the capture instance associated with the change. | ||
/// </summary> | ||
public string CaptureInstance { get; set; } | ||
|
||
/// <summary> | ||
/// The row fields. | ||
/// </summary> | ||
public IReadOnlyDictionary<string, object> Fields { get; init; } | ||
|
||
public NetChangeRow( | ||
BigInteger startLineSequenceNumber, | ||
NetChangeOperation operation, | ||
string? updateMask, | ||
string captureInstance, | ||
IReadOnlyDictionary<string, object> fields) | ||
{ | ||
if (fields is null) | ||
throw new ArgumentNullException($"{nameof(fields)} cannot be null."); | ||
if (string.IsNullOrWhiteSpace(captureInstance)) | ||
throw new ArgumentNullException($"{nameof(captureInstance)} cannot be null, empty or whitespace."); | ||
|
||
StartLineSequenceNumber = startLineSequenceNumber; | ||
Operation = operation; | ||
UpdateMask = updateMask; | ||
CaptureInstance = captureInstance; | ||
Fields = fields; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Numerics; | ||
using System.Text; | ||
|
||
namespace MsSqlCdc; | ||
|
||
internal static class NetChangeRowFactory | ||
{ | ||
/// <summary> | ||
/// Converts a a collection of columns represented as Dictionary<string, object> to ChangeData representation. | ||
/// </summary> | ||
/// <param name="fields">Dictionary of field name and field value.</param> | ||
/// <param name="captureInstance">The capture instance.</param> | ||
/// <returns>Returns the CDC column as a ChangeData record.</returns> | ||
/// <exception cref="Exception"></exception> | ||
public static NetChangeRow Create( | ||
IReadOnlyDictionary<string, object> fields, | ||
string captureInstance) | ||
{ | ||
if (GetRequiredFields(fields).Count() < 3) | ||
throw new ArgumentException($"The column fields does not contain all the default CDC column fields."); | ||
|
||
return new NetChangeRow( | ||
GetStartLsn(fields), | ||
GetOperation(fields), | ||
GetUpdateMask(fields), | ||
captureInstance, | ||
GetAdditionalFields(fields)); | ||
} | ||
|
||
private static BigInteger GetStartLsn(IReadOnlyDictionary<string, object> fields) => | ||
DataConvert.ConvertBinaryLsn((byte[])fields[CdcFieldName.StartLsn]); | ||
|
||
private static string? GetUpdateMask(IReadOnlyDictionary<string, object> fields) => | ||
fields[CdcFieldName.UpdateMask] != DBNull.Value | ||
? Encoding.UTF8.GetString((byte[])fields[CdcFieldName.UpdateMask]) | ||
: null; | ||
|
||
private static bool IsRequiredField(string fieldName) => | ||
fieldName == CdcFieldName.StartLsn || | ||
fieldName == CdcFieldName.Operation || | ||
fieldName == CdcFieldName.UpdateMask; | ||
|
||
private static IEnumerable<KeyValuePair<string, object>> GetRequiredFields( | ||
IReadOnlyDictionary<string, object> fields) => fields.Where(x => IsRequiredField(x.Key)); | ||
|
||
private static Dictionary<string, object> GetAdditionalFields(IReadOnlyDictionary<string, object> fields) | ||
=> fields.Where(x => !IsRequiredField(x.Key)).ToDictionary(x => x.Key, x => x.Value); | ||
|
||
private static NetChangeOperation GetOperation(IReadOnlyDictionary<string, object> fields) => | ||
ConvertOperation((int)fields[CdcFieldName.Operation]); | ||
|
||
private static NetChangeOperation ConvertOperation(int representation) | ||
=> representation switch | ||
{ | ||
1 => NetChangeOperation.Delete, | ||
2 => NetChangeOperation.Insert, | ||
4 => NetChangeOperation.Update, | ||
5 => NetChangeOperation.InsertOrUpdate, | ||
_ => throw new ArgumentException($"Not valid representation value '{representation}'") | ||
}; | ||
} |
Oops, something went wrong.