Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change feed: Adds id and pk to ChangeFeedMetadata for delete operations #4922

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -57,21 +57,21 @@ namespace Microsoft.Azure.Cosmos
class ChangeFeedItem<T>
{
/// <summary>
/// The full fidelity change feed current item.
/// The current version of the item for all versions and deletes change feed mode.
/// </summary>
[JsonProperty(PropertyName = "current")]
[JsonPropertyName("current")]
public T Current { get; set; }

/// <summary>
/// The full fidelity change feed metadata.
/// The item metadata for all versions and deletes change feed mode.
/// </summary>
[JsonProperty(PropertyName = "metadata", NullValueHandling = NullValueHandling.Ignore)]
[JsonPropertyName("metadata")]
public ChangeFeedMetadata Metadata { get; set; }

/// <summary>
/// For delete operations, previous image is always going to be provided. The previous image on replace operations is not going to be exposed by default and requires account-level or container-level opt-in.
/// The previous version of the item for all versions and deletes change feed mode. The previous version on delete and replace operations is not exposed by default and requires container-level opt-in. Refer to https://aka.ms/cosmosdb-change-feed-deletes for more information.
/// </summary>
[JsonProperty(PropertyName = "previous", NullValueHandling = NullValueHandling.Ignore)]
[JsonPropertyName("previous")]
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Text.Json;
using Microsoft.Azure.Cosmos.Resource.FullFidelity;
using Microsoft.Azure.Cosmos.Resource.FullFidelity.Converters;
@@ -21,7 +22,7 @@ namespace Microsoft.Azure.Cosmos
#else
internal
#endif
class ChangeFeedMetadata
class ChangeFeedMetadata
{
/// <summary>
/// The change's conflict resolution timestamp.
@@ -50,9 +51,21 @@ class ChangeFeedMetadata
public long PreviousLsn { get; internal set; }

/// <summary>
/// Used to distinquish explicit deletes (e.g. via DeleteItem) from deletes caused by TTL expiration (a collection may define time-to-live policy for documents).
/// Used to distinguish explicit deletes (e.g. via DeleteItem) from deletes caused by TTL expiration (a collection may define time-to-live policy for documents).
/// </summary>
[JsonProperty(PropertyName = ChangeFeedMetadataFields.TimeToLiveExpired, NullValueHandling = NullValueHandling.Ignore)]
public bool IsTimeToLiveExpired { get; internal set; }

/// <summary>
/// The id of the previous item version. Used for delete operations only.
/// </summary>
[JsonProperty(PropertyName = ChangeFeedMetadataFields.Id, NullValueHandling = NullValueHandling.Ignore)]
public string Id { get; internal set; }

/// <summary>
/// The partition key of the previous item version. Dictionary Key is the partition key property name and Dictionary Value is the partition key property value. Used for delete operations only.
/// </summary>
[JsonProperty(PropertyName = ChangeFeedMetadataFields.PartitionKey, NullValueHandling = NullValueHandling.Ignore)]
public List<(string, object)> PartitionKey { get; internal set; }
}
}
Original file line number Diff line number Diff line change
@@ -11,5 +11,7 @@ internal class ChangeFeedMetadataFields
public const string OperationType = "operationType";
public const string PreviousImageLSN = "previousImageLSN";
public const string TimeToLiveExpired = "timeToLiveExpired";
public const string Id = "id";
public const string PartitionKey = "partitionKey";
}
}
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.Resource.FullFidelity.Converters
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text.Json;
using System.Text.Json.Serialization;
@@ -56,6 +57,19 @@ public override ChangeFeedMetadata Read(ref Utf8JsonReader reader, Type typeToCo
{
metadata.PreviousLsn = property.Value.GetInt64();
}
else if (property.NameEquals(ChangeFeedMetadataFields.Id))
{
metadata.Id = property.Value.GetString();
}
else if (property.NameEquals(ChangeFeedMetadataFields.PartitionKey))
{
List<(string, object)> partitionKey = new List<(string, object)>();
foreach (JsonProperty pk in property.Value.EnumerateObject())
{
partitionKey.Add((pk.Name, pk.Value));
}
metadata.PartitionKey = partitionKey;
}
}

return metadata;
@@ -76,6 +90,49 @@ public override void Write(Utf8JsonWriter writer, ChangeFeedMetadata value, Json
writer.WriteString(ChangeFeedMetadataFields.OperationType, value.OperationType.ToString());
writer.WriteNumber(ChangeFeedMetadataFields.PreviousImageLSN, value.PreviousLsn);

if (value.Id != null)
{
writer.WriteString(ChangeFeedMetadataFields.Id, value.Id);
}

if (value.PartitionKey != null)
{
writer.WriteStartObject(ChangeFeedMetadataFields.PartitionKey);

foreach ((string, object) pk in value.PartitionKey)
{
JsonElement pkValue = (JsonElement)pk.Item2;

switch (pkValue.ValueKind)
{
case JsonValueKind.String:
writer.WriteString(pk.Item1, pkValue.GetString());
break;

case JsonValueKind.Number:
writer.WriteNumber(pk.Item1, pkValue.GetDouble());
break;

case JsonValueKind.True:
case JsonValueKind.False:
writer.WriteBoolean(pk.Item1, pkValue.GetBoolean());
break;

case JsonValueKind.Null:
writer.WriteNull(pk.Item1);
break;

case JsonValueKind.Undefined:
break;

default:
throw new JsonException(string.Format(CultureInfo.CurrentCulture, RMResources.JsonUnexpectedToken));
}
}

writer.WriteEndObject();
}

writer.WriteEndObject();
}

Original file line number Diff line number Diff line change
@@ -84,6 +84,8 @@ public async Task WhenADocumentIsCreatedWithTtlSetThenTheDocumentIsDeletedTestsA
Assert.IsTrue(change.Metadata.IsTimeToLiveExpired);

// previous
Assert.AreEqual(expected: "1", actual: change.Metadata.Id.ToString());
Assert.AreEqual(expected: "1", actual: change.Metadata.PartitionKey.FirstOrDefault().Item2);
Assert.AreEqual(expected: "1", actual: change.Previous.id.ToString());
Assert.AreEqual(expected: "1", actual: change.Previous.pk.ToString());
Assert.AreEqual(expected: "Testing TTL on CFP.", actual: change.Previous.description.ToString());
@@ -155,6 +157,8 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
ChangeFeedProcessor processor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> docs, CancellationToken token) =>
{
string metadataId = default;
string metadataPk = default;
string id = default;
string pk = default;
string description = default;
@@ -171,6 +175,8 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()
}
else
{
metadataId = change.Metadata.Id.ToString();
metadataPk = change.Metadata.PartitionKey.FirstOrDefault().Item2.ToString();
id = change.Previous.id.ToString();
pk = change.Previous.pk.ToString();
description = change.Previous.description.ToString();
@@ -211,6 +217,8 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync()

ChangeFeedItem<dynamic> deleteChange = docs.ElementAt(2);
Assert.IsNull(deleteChange.Current.id);
Assert.AreEqual(expected: "1", actual: deleteChange.Metadata.Id.ToString());
Assert.AreEqual(expected: "1", actual: deleteChange.Metadata.PartitionKey.FirstOrDefault().Item2);
Assert.AreEqual(expected: deleteChange.Metadata.OperationType, actual: ChangeFeedOperationType.Delete);
Assert.AreEqual(expected: replaceChange.Metadata.Lsn, actual: deleteChange.Metadata.PreviousLsn);
Assert.IsNotNull(deleteChange.Previous);
Original file line number Diff line number Diff line change
@@ -39,7 +39,11 @@ public void ValidateNSJAndSTJSerializationOfChangeFeedItemDeleteTimeToLiveExpire
""crts"": 1722511591,
""operationType"": ""delete"",
""timeToLiveExpired"": true,
""previousImageLSN"": 16
""previousImageLSN"": 16,
""id"": ""1"",
""partitionKey"": {
""pk"": ""1""
}
},
""previous"": {
""id"": ""1"",
@@ -92,6 +96,8 @@ static void ValidateDeserialization(List<ChangeFeedItem<ToDoActivity>> activitie
Assert.IsTrue(deletedChange.Metadata.IsTimeToLiveExpired);
Assert.IsNotNull(deletedChange.Previous);
Assert.AreEqual(expected: "Testing TTL on CFP.", actual: deletedChange.Previous.description);
Assert.AreEqual(expected: "1", actual: deletedChange.Metadata.Id.ToString());
Assert.AreEqual(expected: "1", actual: deletedChange.Metadata.PartitionKey.FirstOrDefault().Item2.ToString());
Assert.AreEqual(expected: "1", actual: deletedChange.Previous.id);
Assert.AreEqual(expected: 5, actual: deletedChange.Previous.ttl);
}
@@ -216,7 +222,11 @@ public void ValidateNSJAndSTJSerializationOfChangeFeedItemTest(bool propertyName
""lsn"": 376,
""operationType"": ""delete"",
""previousImageLSN"": 375,
""timeToLiveExpired"": false
""timeToLiveExpired"": false,
""id"": ""1"",
""partitionKey"": {
""pk"": ""1""
}
},
""previous"": {
""id"": ""1"",
@@ -295,6 +305,8 @@ static void ValidateDeserialization(List<ChangeFeedItem<ToDoActivity>> activitie
Assert.IsFalse(deletedChange.Metadata.IsTimeToLiveExpired);
Assert.IsNotNull(deletedChange.Previous);
Assert.AreEqual(expected: "test after replace", actual: deletedChange.Previous.description);
Assert.AreEqual(expected: "1", actual: deletedChange.Metadata.Id.ToString());
Assert.AreEqual(expected: "1", actual: deletedChange.Metadata.PartitionKey.FirstOrDefault().Item2.ToString());
Assert.AreEqual(expected: "1", actual: deletedChange.Previous.id);
Assert.AreEqual(expected: 0, actual: deletedChange.Previous.ttl);
}
@@ -376,7 +388,7 @@ public async Task WhenADocumentIsCreatedWithTtlSetThenTheDocumentIsDeletedTestsA
Container leaseContainer = await database.CreateContainerIfNotExistsAsync(containerProperties: new ContainerProperties(id: "leases", partitionKeyPath: "/id"));
ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes, database);
Exception exception = default;
int ttlInSeconds = 5;
int ttlInSeconds = 1;
Stopwatch stopwatch = new();
ManualResetEvent allDocsProcessed = new ManualResetEvent(false);

@@ -414,6 +426,8 @@ public async Task WhenADocumentIsCreatedWithTtlSetThenTheDocumentIsDeletedTestsA
Assert.IsTrue(DateTime.TryParse(s: change.Metadata.ConflictResolutionTimestamp.ToString(), out _), message: "Invalid csrt must be a datetime value.");
Assert.IsTrue(change.Metadata.Lsn > 0, message: "Invalid lsn must be a long value.");
Assert.IsTrue(change.Metadata.IsTimeToLiveExpired);
Assert.AreEqual(expected: "1", actual: change.Metadata.Id.ToString());
Assert.AreEqual(expected: "1", actual: change.Metadata.PartitionKey.FirstOrDefault().Item2);

// previous
Assert.AreEqual(expected: "1", actual: change.Previous.id.ToString());
@@ -508,6 +522,8 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync(bool pr
{
Logger.LogLine($"@ {DateTime.Now}, {nameof(docs)} -> {System.Text.Json.JsonSerializer.Serialize(docs)}");

string metadataId = default;
string metadataPk = default;
string id = default;
string pk = default;
string description = default;
@@ -522,6 +538,8 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync(bool pr
}
else
{
metadataId = change.Metadata.Id.ToString();
metadataPk = change.Metadata.PartitionKey.FirstOrDefault().Item2.ToString();
id = change.Previous.id.ToString();
pk = change.Previous.pk.ToString();
description = change.Previous.description.ToString();
@@ -565,6 +583,8 @@ public async Task WhenADocumentIsCreatedThenUpdatedThenDeletedTestsAsync(bool pr
Assert.AreEqual(expected: deleteChange.Metadata.OperationType, actual: ChangeFeedOperationType.Delete);
Assert.AreEqual(expected: replaceChange.Metadata.Lsn, actual: deleteChange.Metadata.PreviousLsn);
Assert.IsNotNull(deleteChange.Previous);
Assert.AreEqual(expected: "1", actual: deleteChange.Metadata.Id.ToString());
Assert.AreEqual(expected: "1", actual: deleteChange.Metadata.PartitionKey.FirstOrDefault().Item2);
Assert.AreEqual(expected: "1", actual: deleteChange.Previous.id.ToString());
Assert.AreEqual(expected: "1", actual: deleteChange.Previous.pk.ToString());
Assert.AreEqual(expected: "test after replace", actual: deleteChange.Previous.description.ToString());
Original file line number Diff line number Diff line change
@@ -896,6 +896,7 @@ private async Task ValidateChangeFeedIteratorCore_WithQuery(

foreach (ChangeFeedItem<Document> item in feedResponse)
{
Assert.AreEqual(expected: "id3", actual: item.Metadata.Id.ToString());
Assert.AreEqual("id3", item.Previous.Id);
Assert.AreEqual(ChangeFeedOperationType.Delete, item.Metadata.OperationType);
}
@@ -1094,6 +1095,7 @@ public async Task ChangeFeedIteratorCore_FeedRange_VerifyingWireFormatTests()
Assert.AreNotEqual(notExpected: default, actual: deleteOperation.Metadata.Lsn);
Assert.AreNotEqual(notExpected: default, actual: deleteOperation.Metadata.PreviousLsn);
Assert.IsNotNull(deleteOperation.Previous);
Assert.AreEqual(expected: id, actual: deleteOperation.Metadata.Id.ToString());
Assert.AreEqual(expected: id, actual: deleteOperation.Previous.Id);
Assert.AreEqual(expected: "205 16th St NW", actual: deleteOperation.Previous.Line1);
Assert.AreEqual(expected: "Atlanta", actual: deleteOperation.Previous.City);
Original file line number Diff line number Diff line change
@@ -148,6 +148,20 @@
],
"MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedOperationType OperationType;CanRead:True;CanWrite:True;Microsoft.Azure.Cosmos.ChangeFeedOperationType get_OperationType();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"System.Collections.Generic.Dictionary`2[System.String,System.String] DeletedItemPartitionKey[Newtonsoft.Json.JsonPropertyAttribute(NullValueHandling = 1, PropertyName = \"partitionKey\")]": {
"Type": "Property",
"Attributes": [
"JsonPropertyAttribute"
],
"MethodInfo": "System.Collections.Generic.Dictionary`2[System.String,System.String] DeletedItemPartitionKey;CanRead:True;CanWrite:True;System.Collections.Generic.Dictionary`2[System.String,System.String] get_DeletedItemPartitionKey();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"System.Collections.Generic.Dictionary`2[System.String,System.String] get_DeletedItemPartitionKey()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": {
"Type": "Method",
"Attributes": [
"CompilerGeneratedAttribute"
],
"MethodInfo": "System.Collections.Generic.Dictionary`2[System.String,System.String] get_DeletedItemPartitionKey();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"System.DateTime ConflictResolutionTimestamp[Newtonsoft.Json.JsonPropertyAttribute(NullValueHandling = 1, PropertyName = \"crts\")]-[Newtonsoft.Json.JsonConverterAttribute(typeof(Microsoft.Azure.Documents.UnixDateTimeConverter))]": {
"Type": "Property",
"Attributes": [
@@ -163,6 +177,20 @@
],
"MethodInfo": "System.DateTime get_ConflictResolutionTimestamp();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"System.String DeletedItemId[Newtonsoft.Json.JsonPropertyAttribute(NullValueHandling = 1, PropertyName = \"id\")]": {
"Type": "Property",
"Attributes": [
"JsonPropertyAttribute"
],
"MethodInfo": "System.String DeletedItemId;CanRead:True;CanWrite:True;System.String get_DeletedItemId();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"System.String get_DeletedItemId()[System.Runtime.CompilerServices.CompilerGeneratedAttribute()]": {
"Type": "Method",
"Attributes": [
"CompilerGeneratedAttribute"
],
"MethodInfo": "System.String get_DeletedItemId();IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"Void .ctor()": {
"Type": "Constructor",
"Attributes": [],
Loading