forked from confluentinc/confluent-kafka-dotnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
OauthBearerToken_Delegate.cs
63 lines (57 loc) · 2.17 KB
/
OauthBearerToken_Delegate.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
using System;
using Xunit;
namespace Confluent.Kafka.IntegrationTests
{
public partial class Tests
{
/// <summary>
/// Tests that token refresh callback is called when <see cref="ClientConfig.SaslMechanism"/> is set to <see cref="SaslMechanism.OAuthBearer"/>.
/// </summary>
[Theory, MemberData(nameof(OAuthBearerKafkaParameters))]
public void OAuthBearerToken_Delegate(string bootstrapServers)
{
LogToFileStartTest();
if (string.IsNullOrEmpty(bootstrapServers))
{
// skip test if oauth enabled broker is not specified.
return;
}
var config = new ClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.OAuthBearer,
SaslOauthbearerConfig = $"{Guid.NewGuid()}"
};
// test Consumer
var consumerConfig = new ConsumerConfig(config)
{
GroupId = $"{Guid.NewGuid()}"
};
var consumerCallsCount = 0;
var consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetOAuthBearerTokenRefreshHandler((client, cfg) =>
{
Assert.Equal(config.SaslOauthbearerConfig, cfg);
consumerCallsCount++;
})
.Build();
consumer.Subscribe(singlePartitionTopic);
consumer.Consume(0);
Assert.True(consumerCallsCount > 0);
// test Producer
var producerConfig = new ProducerConfig(config);
var producerCallsCount = 0;
var producer = new ProducerBuilder<string, string>(producerConfig)
.SetOAuthBearerTokenRefreshHandler((client, cfg) =>
{
Assert.Equal(config.SaslOauthbearerConfig, cfg);
producerCallsCount++;
})
.Build();
producer.Flush();
Assert.True(producerCallsCount > 0);
LogToFileEndTest();
}
}
}