From 5d807420d8401171790156d90dd2e57366e891a1 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 14 Aug 2023 15:39:40 +0100 Subject: [PATCH] Auth test output and proxy tidy-up (#113) Fixed auth test output to return just the authorization method name rather than a dump of all variables. Moved test proxy to its own file and suppressed unnecessary socket exception during connect and disconnect. --- .../NatsConnectionTest.Auth.cs | 120 +++---- tests/NATS.Client.TestUtilities/NatsProxy.cs | 305 ++++++++++++++++++ tests/NATS.Client.TestUtilities/NatsServer.cs | 292 ----------------- 3 files changed, 371 insertions(+), 346 deletions(-) create mode 100644 tests/NATS.Client.TestUtilities/NatsProxy.cs diff --git a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs index a16a51741..0ef420657 100644 --- a/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs +++ b/tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs @@ -6,91 +6,85 @@ public static IEnumerable GetAuthConfigs() { yield return new object[] { - "TOKEN", - "resources/configs/auth/token.conf", - NatsOptions.Default with - { - AuthOptions = NatsAuthOptions.Default with - { - Token = "s3cr3t", - }, - }, + new Auth( + "TOKEN", + "resources/configs/auth/token.conf", + NatsOptions.Default with { AuthOptions = NatsAuthOptions.Default with { Token = "s3cr3t", }, }), }; yield return new object[] { - "USER-PASSWORD", - "resources/configs/auth/password.conf", - NatsOptions.Default with - { - AuthOptions = NatsAuthOptions.Default with + new Auth( + "USER-PASSWORD", + "resources/configs/auth/password.conf", + NatsOptions.Default with { - Username = "a", - Password = "b", - }, - }, + AuthOptions = NatsAuthOptions.Default with { Username = "a", Password = "b", }, + }), }; yield return new object[] { - "NKEY", - "resources/configs/auth/nkey.conf", - NatsOptions.Default with - { - AuthOptions = NatsAuthOptions.Default with + new Auth( + "NKEY", + "resources/configs/auth/nkey.conf", + NatsOptions.Default with { - Nkey = "UALQSMXRSAA7ZXIGDDJBJ2JOYJVQIWM3LQVDM5KYIPG4EP3FAGJ47BOJ", - Seed = "SUAAVWRZG6M5FA5VRRGWSCIHKTOJC7EWNIT4JV3FTOIPO4OBFR5WA7X5TE", - }, - }, + AuthOptions = NatsAuthOptions.Default with + { + Nkey = "UALQSMXRSAA7ZXIGDDJBJ2JOYJVQIWM3LQVDM5KYIPG4EP3FAGJ47BOJ", + Seed = "SUAAVWRZG6M5FA5VRRGWSCIHKTOJC7EWNIT4JV3FTOIPO4OBFR5WA7X5TE", + }, + }), }; yield return new object[] { - "NKEY (FROM FILE)", - "resources/configs/auth/nkey.conf", - NatsOptions.Default with - { - AuthOptions = NatsAuthOptions.Default with + new Auth( + "NKEY (FROM FILE)", + "resources/configs/auth/nkey.conf", + NatsOptions.Default with { - NKeyFile = "resources/configs/auth/user.nk", - }, - }, + AuthOptions = NatsAuthOptions.Default with { NKeyFile = "resources/configs/auth/user.nk", }, + }), }; yield return new object[] { - "USER-CREDS", - "resources/configs/auth/operator.conf", - NatsOptions.Default with - { - AuthOptions = NatsAuthOptions.Default with + new Auth( + "USER-CREDS", + "resources/configs/auth/operator.conf", + NatsOptions.Default with { - Jwt = - "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJOVDJTRkVIN0pNSUpUTzZIQ09GNUpYRFNDUU1WRlFNV0MyWjI1TFk3QVNPTklYTjZFVlhBIiwiaWF0IjoxNjc5MTQ0MDkwLCJpc3MiOiJBREpOSlpZNUNXQlI0M0NOSzJBMjJBMkxPSkVBSzJSS1RaTk9aVE1HUEVCRk9QVE5FVFBZTUlLNSIsIm5hbWUiOiJteS11c2VyIiwic3ViIjoiVUJPWjVMUVJPTEpRRFBBQUNYSk1VRkJaS0Q0R0JaSERUTFo3TjVQS1dSWFc1S1dKM0VBMlc0UloiLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.ElYEknDixe9pZdl55S9PjduQhhqR1OQLglI1JO7YK7ECYb1mLUjGd8ntcR7ISS04-_yhygSDzX8OS8buBIxMDA", - Seed = "SUAJR32IC6D45J3URHJ5AOQZWBBO6QTID27NZQKXE3GC5U3SPFEYDJK6RQ", - }, - }, + AuthOptions = NatsAuthOptions.Default with + { + Jwt = + "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJOVDJTRkVIN0pNSUpUTzZIQ09GNUpYRFNDUU1WRlFNV0MyWjI1TFk3QVNPTklYTjZFVlhBIiwiaWF0IjoxNjc5MTQ0MDkwLCJpc3MiOiJBREpOSlpZNUNXQlI0M0NOSzJBMjJBMkxPSkVBSzJSS1RaTk9aVE1HUEVCRk9QVE5FVFBZTUlLNSIsIm5hbWUiOiJteS11c2VyIiwic3ViIjoiVUJPWjVMUVJPTEpRRFBBQUNYSk1VRkJaS0Q0R0JaSERUTFo3TjVQS1dSWFc1S1dKM0VBMlc0UloiLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.ElYEknDixe9pZdl55S9PjduQhhqR1OQLglI1JO7YK7ECYb1mLUjGd8ntcR7ISS04-_yhygSDzX8OS8buBIxMDA", + Seed = "SUAJR32IC6D45J3URHJ5AOQZWBBO6QTID27NZQKXE3GC5U3SPFEYDJK6RQ", + }, + }), }; yield return new object[] { - "USER-CREDS (FROM FILE)", - "resources/configs/auth/operator.conf", - NatsOptions.Default with - { - AuthOptions = NatsAuthOptions.Default with + new Auth( + "USER-CREDS (FROM FILE)", + "resources/configs/auth/operator.conf", + NatsOptions.Default with { - CredsFile = "resources/configs/auth/user.creds", - }, - }, + AuthOptions = NatsAuthOptions.Default with { CredsFile = "resources/configs/auth/user.creds", }, + }), }; } [Theory] [MemberData(nameof(GetAuthConfigs))] - public async Task UserCredentialAuthTest(string name, string serverConfig, NatsOptions clientOptions) + public async Task UserCredentialAuthTest(Auth auth) { + var name = auth.Name; + var serverConfig = auth.ServerConfig; + var clientOptions = auth.ClientOptions; + _output.WriteLine($"AUTH TEST {name}"); var serverOptions = new NatsServerOptionsBuilder() @@ -165,4 +159,22 @@ await Retry.Until( await natsSub.DisposeAsync(); await register; } + + public class Auth + { + public Auth(string name, string serverConfig, NatsOptions clientOptions) + { + Name = name; + ServerConfig = serverConfig; + ClientOptions = clientOptions; + } + + public string Name { get; } + + public string ServerConfig { get; } + + public NatsOptions ClientOptions { get; } + + public override string ToString() => Name; + } } diff --git a/tests/NATS.Client.TestUtilities/NatsProxy.cs b/tests/NATS.Client.TestUtilities/NatsProxy.cs new file mode 100644 index 000000000..03a7c91c3 --- /dev/null +++ b/tests/NATS.Client.TestUtilities/NatsProxy.cs @@ -0,0 +1,305 @@ +using System.Diagnostics; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Text.RegularExpressions; + +namespace NATS.Client.Core.Tests; + +public class NatsProxy : IDisposable +{ + private readonly ITestOutputHelper _outputHelper; + private readonly bool _trace; + private readonly TcpListener _tcpListener; + private readonly List _clients = new(); + private readonly List _frames = new(); + private readonly Stopwatch _watch = new(); + private int _syncCount; + + public NatsProxy(int port, ITestOutputHelper outputHelper, bool trace) + { + _outputHelper = outputHelper; + _trace = trace; + _tcpListener = new TcpListener(IPAddress.Loopback, 0); + _tcpListener.Start(); + _watch.Restart(); + + Task.Run(() => + { + var client = 0; + while (true) + { + var tcpClient1 = _tcpListener.AcceptTcpClient(); + TcpClient tcpClient2; + lock (_clients) + { + tcpClient1.NoDelay = true; + tcpClient1.ReceiveBufferSize = 0; + tcpClient1.SendBufferSize = 0; + _clients.Add(tcpClient1); + + tcpClient2 = new TcpClient("127.0.0.1", port); + tcpClient2.NoDelay = true; + tcpClient2.ReceiveBufferSize = 0; + tcpClient2.SendBufferSize = 0; + _clients.Add(tcpClient2); + } + + var n = client++; + +#pragma warning disable CS4014 + Task.Run(() => + { + var stream1 = tcpClient1.GetStream(); + var sr1 = new StreamReader(stream1, Encoding.ASCII); + var sw1 = new StreamWriter(stream1, Encoding.ASCII); + + var stream2 = tcpClient2.GetStream(); + var sr2 = new StreamReader(stream2, Encoding.ASCII); + var sw2 = new StreamWriter(stream2, Encoding.ASCII); + + Task.Run(() => + { + while (NatsProtoDump(n, "C", sr1, sw2, ClientInterceptor)) + { + } + }); + + while (NatsProtoDump(n, $"S", sr2, sw1, ServerInterceptor)) + { + } + }); + } + }); + + var stopwatch = Stopwatch.StartNew(); + while (stopwatch.Elapsed < TimeSpan.FromSeconds(10)) + { + try + { + using var tcpClient = new TcpClient(); + tcpClient.Connect(IPAddress.Loopback, Port); + Log($"Server started on localhost:{Port}"); + return; + } + catch (SocketException) + { + } + } + + throw new TimeoutException("Proxy server didn't start"); + } + + public List> ClientInterceptors { get; } = new(); + + public List> ServerInterceptors { get; } = new(); + + public int Port => ((IPEndPoint)_tcpListener.Server.LocalEndPoint!).Port; + + public IReadOnlyList AllFrames + { + get + { + lock (_frames) + { + return _frames.ToList(); + } + } + } + + public IReadOnlyList Frames + { + get + { + lock (_frames) + { + return _frames + .Where(f => !Regex.IsMatch(f.Message, @"^(INFO|CONNECT|PING|PONG|\+OK)")) + .ToList(); + } + } + } + + public IReadOnlyList ClientFrames => Frames.Where(f => f.Origin == "C").ToList(); + + public IReadOnlyList ServerFrames => Frames.Where(f => f.Origin == "S").ToList(); + + public void Reset() + { + lock (_clients) + { + foreach (var tcpClient in _clients) + { + try + { + tcpClient.Close(); + } + catch + { + // ignore + } + } + + lock (_frames) + _frames.Clear(); + + _watch.Restart(); + } + } + + public async Task FlushFramesAsync(NatsConnection nats) + { + var subject = $"_SIGNAL_SYNC_{Interlocked.Increment(ref _syncCount)}"; + + await nats.PublishAsync(subject); + + await Retry.Until( + "flush sync frame", + () => AllFrames.Any(f => f.Message == $"PUB {subject} 0␍␊")); + + lock (_frames) + _frames.Clear(); + } + + public void Dispose() => _tcpListener.Server.Dispose(); + + public string Dump(ReadOnlySpan buffer) + { + var sb = new StringBuilder(); + foreach (var c in buffer) + { + switch (c) + { + case >= ' ' and <= '~': + sb.Append(c); + break; + case '\n': + sb.Append('␊'); + break; + case '\r': + sb.Append('␍'); + break; + default: + sb.Append('.'); + break; + } + } + + return sb.ToString(); + } + + private string? ClientInterceptor(string? message) + { + foreach (var interceptor in ClientInterceptors) + { + message = interceptor(message); + } + + return message; + } + + private string? ServerInterceptor(string? message) + { + foreach (var interceptor in ServerInterceptors) + { + message = interceptor(message); + } + + return message; + } + + private bool NatsProtoDump(int client, string origin, TextReader sr, TextWriter sw, Func? interceptor) + { + void Write(string? rawFrame) + { + if (interceptor != null) + rawFrame = interceptor(rawFrame); + + if (rawFrame == null) + return; + + sw.Write(rawFrame); + sw.Flush(); + } + + string? message; + try + { + message = sr.ReadLine(); + } + catch + { + return false; + } + + if (message == null) + return false; + + if (Regex.IsMatch(message, @"^(INFO|CONNECT|PING|PONG|UNSUB|SUB|\+OK|-ERR)")) + { + if (client > 0) + AddFrame(new Frame(_watch.Elapsed, client, origin, message)); + + try + { + Write($"{message}\r\n"); + } + catch + { + return false; + } + + return true; + } + + var match = Regex.Match(message, @"^(?:PUB|HPUB|MSG|HMSG).*?(\d+)\s*$"); + if (match.Success) + { + var size = int.Parse(match.Groups[1].Value); + var buffer = new char[size + 2]; + var span = buffer.AsSpan(); + while (true) + { + var read = sr.Read(span); + if (read == 0) + break; + if (read == -1) + return false; + span = span[read..]; + } + + var bufferDump = Dump(buffer.AsSpan()[..size]); + + try + { + Write($"{message}\r\n{new string(buffer)}"); + } + catch + { + return false; + } + + if (client > 0) + AddFrame(new Frame(_watch.Elapsed, client, origin, Message: $"{message}␍␊{bufferDump}")); + + return true; + } + + if (client > 0) + AddFrame(new Frame(_watch.Elapsed, client, Origin: "ERROR", Message: $"Unknown protocol: {message}")); + + return false; + } + + private void AddFrame(Frame frame) + { + if (_trace) + Log($"TRACE {frame}"); + lock (_frames) + _frames.Add(frame); + } + + private void Log(string text) => _outputHelper.WriteLine($"[PROXY] {DateTime.Now:HH:mm:ss.fff} {text}"); + + public record Frame(TimeSpan Timestamp, int Client, string Origin, string Message); +} diff --git a/tests/NATS.Client.TestUtilities/NatsServer.cs b/tests/NATS.Client.TestUtilities/NatsServer.cs index b96130459..4e5ddb27f 100644 --- a/tests/NATS.Client.TestUtilities/NatsServer.cs +++ b/tests/NATS.Client.TestUtilities/NatsServer.cs @@ -1,8 +1,6 @@ using System.Diagnostics; -using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; -using System.Text; using System.Text.RegularExpressions; using Cysharp.Diagnostics; @@ -376,297 +374,7 @@ public async ValueTask DisposeAsync() } } -public class NatsProxy : IDisposable -{ - private readonly ITestOutputHelper _outputHelper; - private readonly bool _trace; - private readonly TcpListener _tcpListener; - private readonly List _clients = new(); - private readonly List _frames = new(); - private readonly Stopwatch _watch = new(); - private int _syncCount; - - public NatsProxy(int port, ITestOutputHelper outputHelper, bool trace) - { - _outputHelper = outputHelper; - _trace = trace; - _tcpListener = new TcpListener(IPAddress.Loopback, 0); - _tcpListener.Start(); - _watch.Restart(); - - Task.Run(() => - { - var client = 0; - while (true) - { - var tcpClient1 = _tcpListener.AcceptTcpClient(); - TcpClient tcpClient2; - lock (_clients) - { - tcpClient1.NoDelay = true; - tcpClient1.ReceiveBufferSize = 0; - tcpClient1.SendBufferSize = 0; - _clients.Add(tcpClient1); - - tcpClient2 = new TcpClient("127.0.0.1", port); - tcpClient2.NoDelay = true; - tcpClient2.ReceiveBufferSize = 0; - tcpClient2.SendBufferSize = 0; - _clients.Add(tcpClient2); - } - - var n = client++; - #pragma warning disable CS4014 - Task.Run(() => - { - var stream1 = tcpClient1.GetStream(); - var sr1 = new StreamReader(stream1, Encoding.ASCII); - var sw1 = new StreamWriter(stream1, Encoding.ASCII); - - var stream2 = tcpClient2.GetStream(); - var sr2 = new StreamReader(stream2, Encoding.ASCII); - var sw2 = new StreamWriter(stream2, Encoding.ASCII); - - Task.Run(() => - { - while (NatsProtoDump(n, "C", sr1, sw2, ClientInterceptor)) - { - } - }); - - while (NatsProtoDump(n, $"S", sr2, sw1, ServerInterceptor)) - { - } - }); - } - }); - - var stopwatch = Stopwatch.StartNew(); - while (stopwatch.Elapsed < TimeSpan.FromSeconds(10)) - { - try - { - using var tcpClient = new TcpClient(); - tcpClient.Connect(IPAddress.Loopback, Port); - Log($"Server started on localhost:{Port}"); - return; - } - catch (SocketException) - { - } - } - - throw new TimeoutException("Proxy server didn't start"); - } - - public List> ClientInterceptors { get; } = new(); - - public List> ServerInterceptors { get; } = new(); - - public int Port => ((IPEndPoint)_tcpListener.Server.LocalEndPoint!).Port; - - public IReadOnlyList AllFrames - { - get - { - lock (_frames) - { - return _frames.ToList(); - } - } - } - - public IReadOnlyList Frames - { - get - { - lock (_frames) - { - return _frames - .Where(f => !Regex.IsMatch(f.Message, @"^(INFO|CONNECT|PING|PONG|\+OK)")) - .ToList(); - } - } - } - - public IReadOnlyList ClientFrames => Frames.Where(f => f.Origin == "C").ToList(); - - public IReadOnlyList ServerFrames => Frames.Where(f => f.Origin == "S").ToList(); - - public void Reset() - { - lock (_clients) - { - foreach (var tcpClient in _clients) - { - try - { - tcpClient.Close(); - } - catch - { - // ignore - } - } - - lock (_frames) - _frames.Clear(); - - _watch.Restart(); - } - } - - public async Task FlushFramesAsync(NatsConnection nats) - { - var subject = $"_SIGNAL_SYNC_{Interlocked.Increment(ref _syncCount)}"; - - await nats.PublishAsync(subject); - - await Retry.Until( - "flush sync frame", - () => AllFrames.Any(f => f.Message == $"PUB {subject} 0␍␊")); - - lock (_frames) - _frames.Clear(); - } - - public void Dispose() => _tcpListener.Server.Dispose(); - - public string Dump(ReadOnlySpan buffer) - { - var sb = new StringBuilder(); - foreach (var c in buffer) - { - switch (c) - { - case >= ' ' and <= '~': - sb.Append(c); - break; - case '\n': - sb.Append('␊'); - break; - case '\r': - sb.Append('␍'); - break; - default: - sb.Append('.'); - break; - } - } - - return sb.ToString(); - } - - private string? ClientInterceptor(string? message) - { - foreach (var interceptor in ClientInterceptors) - { - message = interceptor(message); - } - - return message; - } - - private string? ServerInterceptor(string? message) - { - foreach (var interceptor in ServerInterceptors) - { - message = interceptor(message); - } - - return message; - } - - private bool NatsProtoDump(int client, string origin, TextReader sr, TextWriter sw, Func? interceptor) - { - void Write(string? rawFrame) - { - if (interceptor != null) - rawFrame = interceptor(rawFrame); - - if (rawFrame == null) - return; - - sw.Write(rawFrame); - sw.Flush(); - } - - string? message; - try - { - message = sr.ReadLine(); - } - catch - { - return false; - } - - if (message == null) - return false; - - if (Regex.IsMatch(message, @"^(INFO|CONNECT|PING|PONG|UNSUB|SUB|\+OK|-ERR)")) - { - if (client > 0) - AddFrame(new Frame(_watch.Elapsed, client, origin, message)); - - try - { - Write($"{message}\r\n"); - } - catch (Exception e) - { - Console.WriteLine(e); - return false; - } - - return true; - } - - var match = Regex.Match(message, @"^(?:PUB|HPUB|MSG|HMSG).*?(\d+)\s*$"); - if (match.Success) - { - var size = int.Parse(match.Groups[1].Value); - var buffer = new char[size + 2]; - var span = buffer.AsSpan(); - while (true) - { - var read = sr.Read(span); - if (read == 0) - break; - if (read == -1) - return false; - span = span[read..]; - } - - var bufferDump = Dump(buffer.AsSpan()[..size]); - - Write($"{message}\r\n{new string(buffer)}"); - - if (client > 0) - AddFrame(new Frame(_watch.Elapsed, client, origin, Message: $"{message}␍␊{bufferDump}")); - - return true; - } - - if (client > 0) - AddFrame(new Frame(_watch.Elapsed, client, Origin: "ERROR", Message: $"Unknown protocol: {message}")); - - return false; - } - - private void AddFrame(Frame frame) - { - if (_trace) - Log($"TRACE {frame}"); - lock (_frames) - _frames.Add(frame); - } - - private void Log(string text) => _outputHelper.WriteLine($"[PROXY] {DateTime.Now:HH:mm:ss.fff} {text}"); - - public record Frame(TimeSpan Timestamp, int Client, string Origin, string Message); -} public class NullOutputHelper : ITestOutputHelper {