Skip to content

Commit

Permalink
Change ConsumerConfig AckPolicy default to Explicit (#490)
Browse files Browse the repository at this point in the history
* Change ConsumerConfig constructor to always set default AckPolicy

* Update default AckPolicy for ConsumerConfig

The default constructor for the ConsumerConfig class no longer sets
the AckPolicy property to Explicit. Also, the values of the
ConsumerConfigAckPolicy enumeration have been rearranged, making
Explicit the default value (0).

* Removed redundant default ACK policy assignments

* Fixed consumer ack policy default

* fixed test

when ack policy is set to explicit, the default
ack wait seems to be set to 30s by the server.

---------

Co-authored-by: Ziya Suzen <ziya@suzen.net>
  • Loading branch information
hanlong-chen-1047 and mtmk authored May 21, 2024
1 parent f6c6419 commit 46a07b9
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 14 deletions.
2 changes: 1 addition & 1 deletion sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

var js = new NatsJSContext(nats);

var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.Explicit });
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1" });

var idle = TimeSpan.FromSeconds(5);
var expires = TimeSpan.FromSeconds(10);
Expand Down
3 changes: 1 addition & 2 deletions src/NATS.Client.JetStream/Models/ConsumerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public ConsumerConfig(string name)
{
Name = name;
DurableName = name;
AckPolicy = ConsumerConfigAckPolicy.Explicit;
}

[System.Text.Json.Serialization.JsonPropertyName("deliver_policy")]
Expand Down Expand Up @@ -91,7 +90,7 @@ public ConsumerConfig(string name)
#if NET6_0
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonStringEnumConverter<ConsumerConfigAckPolicy>))]
#endif
public ConsumerConfigAckPolicy AckPolicy { get; set; } = ConsumerConfigAckPolicy.None;
public ConsumerConfigAckPolicy AckPolicy { get; set; } = ConsumerConfigAckPolicy.Explicit;

/// <summary>
/// How long (in nanoseconds) to allow messages to remain un-acknowledged before attempting redelivery
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/Models/ConsumerConfigAckPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace NATS.Client.JetStream.Models;

public enum ConsumerConfigAckPolicy
{
None = 0,
Explicit = 0,
All = 1,
Explicit = 2,
None = 2,
}
3 changes: 0 additions & 3 deletions tests/NATS.Client.CheckNativeAot/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ async Task JetStreamTests()
{
Name = "consumer1",
DurableName = "consumer1",

// Turn on ACK so we can test them below
AckPolicy = ConsumerConfigAckPolicy.Explicit,
},
cts1.Token);
AssertEqual("events", consumer.Info.StreamName);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.MemoryTests/NatsConsumeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void Subscription_should_not_be_collected_when_in_consume_async_enumerato
{
await js.CreateStreamAsync(new StreamConfig { Name = "s1", Subjects = new[] { "s1.*" } });
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.Explicit });
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1" });
var count = 0;
await foreach (var msg in consumer.ConsumeAsync<int>(opts: new NatsJSConsumeOpts { MaxMsgs = 100 }))
Expand Down
3 changes: 0 additions & 3 deletions tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ public async Task Create_stream_test()
{
Name = "consumer1",
DurableName = "consumer1",

// Turn on ACK so we can test them below
AckPolicy = ConsumerConfigAckPolicy.Explicit,
},
cts1.Token);
Assert.Equal("events", consumer.Info.StreamName);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public async Task Consumer_create_update_action()
// Update consumer
{
var c1 = await js.GetConsumerAsync("s1", "c1");
Assert.Equal(default, c1.Info.Config.AckWait);
Assert.Equal(TimeSpan.FromSeconds(30), c1.Info.Config.AckWait);

var changedConsumerConfig = new ConsumerConfig { Name = "c1", AckWait = TimeSpan.FromSeconds(10) };
await js.UpdateConsumerAsync("s1", changedConsumerConfig);
Expand Down
12 changes: 12 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ParseJsonTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,16 @@ public void Placement_properties_should_be_optional()
Assert.Null(result.Cluster);
Assert.Null(result.Tags);
}

[Fact]
public void Default_consumer_ack_policy_should_be_explicit()
{
var serializer = NatsJSJsonSerializer<ConsumerConfig>.Default;

var bw = new NatsBufferWriter<byte>();
serializer.Serialize(bw, new ConsumerConfig());

var json = Encoding.UTF8.GetString(bw.WrittenSpan);
Assert.Matches("\"ack_policy\":\"explicit\"", json);
}
}
1 change: 0 additions & 1 deletion tests/NATS.Net.DocsExamples/JetStream/ManagingPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public async Task Run()
{
Name = "durable_processor",
DurableName = "durable_processor",
AckPolicy = ConsumerConfigAckPolicy.Explicit,
};

var consumer = await js.CreateOrUpdateConsumerAsync(stream: "orders", durableConfig);
Expand Down

0 comments on commit 46a07b9

Please sign in to comment.