Skip to content

Commit

Permalink
Stream CreateConsumer renamed CreateOrUpdateConsumer (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk authored Nov 27, 2023
1 parent ae5d852 commit 7f78a1c
Show file tree
Hide file tree
Showing 18 changed files with 25 additions and 32 deletions.
2 changes: 1 addition & 1 deletion sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

var js = new NatsJSContext(nats);

var consumer = await js.CreateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.Explicit });
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.Explicit });

var idle = TimeSpan.FromSeconds(5);
var expires = TimeSpan.FromSeconds(10);
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(
CancellationToken cancellationToken = default);

/// <summary>
/// Creates new consumer if it doesn't exists or returns an existing one with the same name.
/// Creates new consumer if it doesn't exists or updates an existing one with the same name.
/// </summary>
/// <param name="stream">Name of the stream to create consumer under.</param>
/// <param name="config">Consumer configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<INatsJSConsumer> CreateConsumerAsync(
ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(
string stream,
ConsumerConfig config,
CancellationToken cancellationToken = default);
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/INatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ ValueTask UpdateAsync(
CancellationToken cancellationToken = default);

/// <summary>
/// Creates new consumer for this stream if it doesn't exists or returns an existing one with the same name.
/// Creates new consumer if it doesn't exists or updates an existing one with the same name.
/// </summary>
/// <param name="config">Consumer configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<INatsJSConsumer> CreateConsumerAsync(ConsumerConfig config, CancellationToken cancellationToken = default);
ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(ConsumerConfig config, CancellationToken cancellationToken = default);

ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private async ValueTask CreatePushConsumer(string origin)
config.OptStartSeq = sequence + 1;
}

await _context.CreateConsumerAsync(
await _context.CreateOrUpdateConsumerAsync(
_stream,
config,
cancellationToken: _cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/NatsJSContext.Consumers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(
}

/// <inheritdoc />>
public async ValueTask<INatsJSConsumer> CreateConsumerAsync(
public async ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(
string stream,
ConsumerConfig config,
CancellationToken cancellationToken = default)
Expand Down
13 changes: 3 additions & 10 deletions src/NATS.Client.JetStream/NatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,11 @@ public ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(NatsJSOrderedConsum
return _context.CreateOrderedConsumerAsync(_name, opts, cancellationToken);
}

/// <summary>
/// Creates new consumer for this stream if it doesn't exists or returns an existing one with the same name.
/// </summary>
/// <param name="config">Consumer configuration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
public ValueTask<INatsJSConsumer> CreateConsumerAsync(ConsumerConfig config, CancellationToken cancellationToken = default)
/// <inheritdoc />
public ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(ConsumerConfig config, CancellationToken cancellationToken = default)
{
ThrowIfDeleted();
return _context.CreateConsumerAsync(_name, config, cancellationToken);
return _context.CreateOrUpdateConsumerAsync(_name, config, cancellationToken);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private async ValueTask CreatePushConsumer(string origin)
config.OptStartSeq = sequence + 1;
}

await _context.CreateConsumerAsync(
await _context.CreateOrUpdateConsumerAsync(
_stream,
config,
cancellationToken: _cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.CheckNativeAot/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async Task JetStreamTests()
AssertEqual("events", stream.Info.Config.Name);

// Create consumer
var consumer = await js.CreateConsumerAsync(
var consumer = await js.CreateOrUpdateConsumerAsync(
"events",
new ConsumerConfig
{
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.MemoryTests/NatsConsumeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void Subscription_should_not_be_collected_when_in_consume_async_enumerato
{
await js.CreateStreamAsync(new StreamConfig { Name = "s1", Subjects = new[] { "s1.*" } });
var consumer = await js.CreateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.Explicit });
var consumer = await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig { Name = "c1", DurableName = "c1", AckPolicy = ConsumerConfigAckPolicy.Explicit });
var count = 0;
await foreach (var msg in consumer.ConsumeAsync<int>(opts: new NatsJSConsumeOpts { MaxMsgs = 100 }))
Expand Down
8 changes: 4 additions & 4 deletions tests/NATS.Client.JetStream.Tests/ErrorHandlerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public async Task Consumer_fetch_error_handling()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token);
var consumer = await stream.CreateConsumerAsync(new ConsumerConfig("c1"), cts.Token);
var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("c1"), cts.Token);

(await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess();

Expand Down Expand Up @@ -58,7 +58,7 @@ public async Task Consumer_fetch_error_handling()

// Create an empty stream to potentially reduce the chance of having a message.
var stream2 = await js.CreateStreamAsync(new StreamConfig("s2", new[] { "s2.*" }), cts.Token);
var consumer2 = await stream2.CreateConsumerAsync(new ConsumerConfig("c2"), cts.Token);
var consumer2 = await stream2.CreateOrUpdateConsumerAsync(new ConsumerConfig("c2"), cts.Token);

var next2 = await consumer2.NextAsync<int>(opts: opts, cancellationToken: cts.Token);
Assert.Null(next2);
Expand All @@ -76,7 +76,7 @@ public async Task Consumer_consume_handling()
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var stream = await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token);
var consumer = await stream.CreateConsumerAsync(new ConsumerConfig("c1"), cts.Token);
var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("c1"), cts.Token);

(await js.PublishAsync("s1.1", 1, cancellationToken: cts.Token)).EnsureSuccess();

Expand Down Expand Up @@ -276,7 +276,7 @@ public async Task Exception_propagation_handling()

try
{
var consumer = await stream.CreateConsumerAsync(new ConsumerConfig("c1"), cts.Token);
var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("c1"), cts.Token);
await foreach (var unused in consumer.ConsumeAsync<int>(opts: opts, cancellationToken: cts.Token))
{
}
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public async Task Create_stream_test()
Assert.Equal("events", stream.Info.Config.Name);

// Create consumer
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync(
var consumer = (NatsJSConsumer)await js.CreateOrUpdateConsumerAsync(
"events",
new ConsumerConfig
{
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/ListTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public async Task List_consumers()

for (var i = 0; i < total; i++)
{
await js.CreateConsumerAsync("s1", new ConsumerConfig($"c{i:D5}"), cts.Token);
await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig($"c{i:D5}"), cts.Token);
}

// List names
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public async Task Create_get_consumer()

// Create
{
var consumer = await js.CreateConsumerAsync(
var consumer = await js.CreateOrUpdateConsumerAsync(
"s1",
new ConsumerConfig("c1"),
cts.Token);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace NATS.Client.JetStream.Tests;
public static class Utils
{
public static ValueTask<INatsJSConsumer> CreateConsumerAsync(this NatsJSContext context, string stream, string consumer, CancellationToken cancellationToken = default)
=> context.CreateConsumerAsync(stream, new ConsumerConfig(consumer), cancellationToken);
=> context.CreateOrUpdateConsumerAsync(stream, new ConsumerConfig(consumer), cancellationToken);

public static ValueTask<INatsJSStream> CreateStreamAsync(this NatsJSContext context, string stream, string[] subjects, CancellationToken cancellationToken = default)
=> context.CreateStreamAsync(new StreamConfig { Name = stream, Subjects = subjects }, cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Net.DocsExamples/IntroPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public async Task Run()
}

// Create a consumer to receive the messages
var consumer = await js.CreateConsumerAsync("orders", new ConsumerConfig("order_processor"));
var consumer = await js.CreateOrUpdateConsumerAsync("orders", new ConsumerConfig("order_processor"));

await foreach (var jsMsg in consumer.ConsumeAsync<string>())
{
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Net.DocsExamples/JetStream/ConsumePage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public async Task Run()

await js.CreateStreamAsync(new StreamConfig(name: "orders", subjects: new[] { "orders.>" }));

var consumer = await js.CreateConsumerAsync(stream: "orders", new ConsumerConfig("order_processor"));
var consumer = await js.CreateOrUpdateConsumerAsync(stream: "orders", new ConsumerConfig("order_processor"));

// Use generated JSON serializer
var orderSerializer = new NatsJsonContextSerializer<Order>(OrderJsonSerializerContext.Default);
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Net.DocsExamples/JetStream/IntroPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public async Task Run()
#endregion

#region js-consumer
var consumer = await js.CreateConsumerAsync(stream: "shop_orders", new ConsumerConfig("order_processor"));
var consumer = await js.CreateOrUpdateConsumerAsync(stream: "shop_orders", new ConsumerConfig("order_processor"));
#endregion

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Net.DocsExamples/JetStream/ManagingPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public async Task Run()
{
#region consumer-create
// Create or get a consumer
var consumer = await js.CreateConsumerAsync(stream: "orders", new ConsumerConfig("order_processor"));
var consumer = await js.CreateOrUpdateConsumerAsync(stream: "orders", new ConsumerConfig("order_processor"));
#endregion
}

Expand Down

0 comments on commit 7f78a1c

Please sign in to comment.