diff --git a/src/Nethermind/ConsoleApp1/ConsoleApp1.csproj b/src/Nethermind/ConsoleApp1/ConsoleApp1.csproj new file mode 100644 index 000000000000..d2ec8287f8c1 --- /dev/null +++ b/src/Nethermind/ConsoleApp1/ConsoleApp1.csproj @@ -0,0 +1,15 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + diff --git a/src/Nethermind/ConsoleApp1/Program.cs b/src/Nethermind/ConsoleApp1/Program.cs new file mode 100644 index 000000000000..698e28d14f5a --- /dev/null +++ b/src/Nethermind/ConsoleApp1/Program.cs @@ -0,0 +1,167 @@ +using System.Net.Sockets; +using System.Net; +using Nethermind.Sockets; +using Nethermind.JsonRpc.WebSockets; +using Nethermind.JsonRpc; +using Nethermind.JsonRpc.Modules; +using Nethermind.Serialization.Json; +using Nethermind.Evm.Tracing.GethStyle; +using System.Diagnostics; + +static async Task CountNumberOfMessages(Socket socket, CancellationToken token) +{ + using IpcSocketMessageStream stream = new(socket); + + + + int messages = 0; + try + { + byte[] buffer = new byte[10]; + //int i = 0; + while (true) + { + ReceiveResult? result = await stream.ReceiveAsync(buffer).ConfigureAwait(false); + + if (Stopwatch.GetTimestamp() % 1001 == 0) + { + await Task.Delay(1, token); + } + + + if (result is not null && IsEndOfIpcMessage(result)) + { + //var data = buffer.Take(result.Read).ToArray(); + //var str = System.Text.Encoding.UTF8.GetString(data); + + //Console.WriteLine(i++); + + //Console.WriteLine($"{Convert.ToHexString(data)}"); + //Console.WriteLine($"{str}: {data.Length}"); + + //if (data.Length != 0) + messages++; + } + + if (result is null || result.Closed) + { + break; + } + } + } + catch (OperationCanceledException) { } + + return messages; +} + +var messageCount = 50; + +CancellationTokenSource cts = new(); +IPEndPoint ipEndPoint = IPEndPoint.Parse("127.0.0.1:1337"); + +Task receiveMessages = OneShotServer( + ipEndPoint, + async socket => await CountNumberOfMessages(socket, cts.Token) +); + +Task sendMessages = Task.Run(async () => +{ + using Socket socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + await socket.ConnectAsync(ipEndPoint).ConfigureAwait(false); + + using IpcSocketMessageStream stream = new(socket); + using JsonRpcSocketsClient client = new( + clientName: "TestClient", + stream: stream, + endpointType: RpcEndpoint.Ws, + jsonRpcProcessor: null!, + jsonRpcLocalStats: new NullJsonRpcLocalStats(), + jsonSerializer: new EthereumJsonSerializer() + ); + int disposeCount = 0; + + for (int i = 0; i < messageCount; i++) + { + using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(1000, () => disposeCount++), default); + await client.SendJsonRpcResult(result).ConfigureAwait(false); + await Task.Delay(1).ConfigureAwait(false); + } + + //disposeCount.Should().Be(messageCount); + await cts.CancelAsync().ConfigureAwait(false); + + return messageCount; +}); + +await Task.WhenAll(sendMessages, receiveMessages).ConfigureAwait(false); +int sent = sendMessages.Result; +int received = receiveMessages.Result; + +Console.WriteLine($"Sent: {sent}, Received: {received}"); + +static async Task OneShotServer(IPEndPoint ipEndPoint, Func> func) +{ + using Socket socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + socket.Bind(ipEndPoint); + socket.Listen(); + + Socket handler = await socket.AcceptAsync(); + + return await func(handler); +} + + static JsonRpcSuccessResponse RandomSuccessResponse(int size, Action? disposeAction = null) +{ + return new JsonRpcSuccessResponse(disposeAction) + { + MethodName = "mock", + Id = "42", + Result = RandomObject(size) + }; +} + + + + static object RandomObject(int size) +{ + string[] strings = RandomStringArray(size / 2); + object obj = new GethLikeTxTrace() + { + Entries = + { + new GethTxTraceEntry + { + Stack = strings, Memory = strings, + } + } + }; + return obj; +} + + static string[] RandomStringArray(int length, bool runGc = true) +{ + string[] array = new string[length]; + for (int i = 0; i < length; i++) + { + array[i] = RandomString(length); + if (runGc && i % 100 == 0) + { + GC.Collect(); + } + } + return array; +} + + static string RandomString(int length) +{ + const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + char[] stringChars = new char[length]; + Random random = new(); + + for (int i = 0; i < stringChars.Length; i++) + { + stringChars[i] = chars[random.Next(chars.Length)]; + } + return new string(stringChars); +} + static bool IsEndOfIpcMessage(ReceiveResult result) => result.EndOfMessage && (!result.Closed || result.Read != 0); diff --git a/src/Nethermind/ConsoleApp1/Properties/launchSettings.json b/src/Nethermind/ConsoleApp1/Properties/launchSettings.json new file mode 100644 index 000000000000..253020fe52f5 --- /dev/null +++ b/src/Nethermind/ConsoleApp1/Properties/launchSettings.json @@ -0,0 +1,11 @@ +{ + "profiles": { + "ConsoleApp1": { + "commandName": "Project" + }, + "WSL": { + "commandName": "WSL2", + "distributionName": "" + } + } +} \ No newline at end of file diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs index 08e984da27d2..e9640899c9b8 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs @@ -83,7 +83,7 @@ static async Task CountNumberOfMessages(Socket socket, CancellationToken to ReceiveResult? result = await stream.ReceiveAsync(buffer); // Imitate random delays - if (Stopwatch.GetTimestamp() % 1001 == 0) + if (Stopwatch.GetTimestamp() % 101 == 0) await Task.Delay(1); if (result is not null && IsEndOfIpcMessage(result)) @@ -128,7 +128,7 @@ static async Task CountNumberOfMessages(Socket socket, CancellationToken to for (int i = 0; i < messageCount; i++) { - using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(1000, () => disposeCount++), default); + using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(100, () => disposeCount++), default); await client.SendJsonRpcResult(result); await Task.Delay(1); } diff --git a/src/Nethermind/Nethermind.Runner/JsonRpc/JsonRpcIpcRunner.cs b/src/Nethermind/Nethermind.Runner/JsonRpc/JsonRpcIpcRunner.cs index a27b741a0e96..8995ed799b06 100644 --- a/src/Nethermind/Nethermind.Runner/JsonRpc/JsonRpcIpcRunner.cs +++ b/src/Nethermind/Nethermind.Runner/JsonRpc/JsonRpcIpcRunner.cs @@ -55,11 +55,11 @@ public void Start(CancellationToken cancellationToken) { if (_logger.IsInfo) _logger.Info($"Starting IPC JSON RPC service over '{_path}'"); - Task.Factory.StartNew(_ => StartServer(_path), cancellationToken, TaskCreationOptions.LongRunning); + Task.Factory.StartNew(_ => StartServer(_path, cancellationToken), cancellationToken, TaskCreationOptions.LongRunning); } } - private void StartServer(string path) + private async Task StartServer(string path, CancellationToken cancellationToken) { try { @@ -74,29 +74,40 @@ private void StartServer(string path) while (true) { - _resetEvent.Reset(); - if (_logger.IsInfo) _logger.Info("Waiting for a IPC connection..."); - _server.BeginAccept(AcceptCallback, null); - _resetEvent.WaitOne(); + Socket socket = await _server.AcceptAsync(cancellationToken); + + socket.ReceiveTimeout = _jsonRpcConfig.Timeout; + socket.SendTimeout = _jsonRpcConfig.Timeout; + + using JsonRpcSocketsClient? socketsClient = new( + string.Empty, + new IpcSocketMessageStream(socket), + RpcEndpoint.IPC, + _jsonRpcProcessor, + _jsonRpcLocalStats, + _jsonSerializer, + maxBatchResponseBodySize: _jsonRpcConfig.MaxBatchResponseBodySize); + + await socketsClient.ReceiveLoopAsync(); } } - catch (IOException exc) when (exc.InnerException is not null && exc.InnerException is SocketException se && se.SocketErrorCode == SocketError.ConnectionReset) + catch (IOException ex) when (ex.InnerException is SocketException { SocketErrorCode: SocketError.ConnectionReset }) { LogDebug("Client disconnected."); } - catch (SocketException exc) when (exc.SocketErrorCode == SocketError.ConnectionReset) + catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset || ex.ErrorCode == OperationCancelledError) { LogDebug("Client disconnected."); } - catch (SocketException exc) + catch (SocketException ex) { - if (_logger.IsError) _logger.Error($"Error ({exc.ErrorCode}) when starting IPC server over '{_path}' path.", exc); + if (_logger.IsError) _logger.Error($"Error {ex.ErrorCode} starting IPC server over '{_path}'.", ex); } - catch (Exception exc) + catch (Exception ex) { - if (_logger.IsError) _logger.Error($"Error when starting IPC server over '{_path}' path.", exc); + if (_logger.IsError) _logger.Error($"Error starting IPC server over '{_path}'.", ex); } finally { @@ -104,44 +115,44 @@ private void StartServer(string path) } } - private async void AcceptCallback(IAsyncResult ar) - { - try - { - Socket socket = _server.EndAccept(ar); - socket.ReceiveTimeout = _jsonRpcConfig.Timeout; - socket.SendTimeout = _jsonRpcConfig.Timeout; - - _resetEvent.Set(); - - using JsonRpcSocketsClient? socketsClient = new( - string.Empty, - new IpcSocketMessageStream(socket), - RpcEndpoint.IPC, - _jsonRpcProcessor, - _jsonRpcLocalStats, - _jsonSerializer, - maxBatchResponseBodySize: _jsonRpcConfig.MaxBatchResponseBodySize); - - await socketsClient.ReceiveLoopAsync(); - } - catch (IOException exc) when (exc.InnerException is SocketException { SocketErrorCode: SocketError.ConnectionReset }) - { - LogDebug("Client disconnected."); - } - catch (SocketException exc) when (exc.SocketErrorCode == SocketError.ConnectionReset || exc.ErrorCode == OperationCancelledError) - { - LogDebug("Client disconnected."); - } - catch (SocketException exc) - { - if (_logger.IsWarn) _logger.Warn($"Error {exc.ErrorCode}:{exc.Message}"); - } - catch (Exception exc) - { - if (_logger.IsError) _logger.Error("Error when handling IPC communication with a client.", exc); - } - } + //private async void AcceptCallback(IAsyncResult ar) + //{ + // try + // { + // Socket socket = _server.EndAccept(ar); + // socket.ReceiveTimeout = _jsonRpcConfig.Timeout; + // socket.SendTimeout = _jsonRpcConfig.Timeout; + + // _resetEvent.Set(); + + // using JsonRpcSocketsClient? socketsClient = new( + // string.Empty, + // new IpcSocketMessageStream(socket), + // RpcEndpoint.IPC, + // _jsonRpcProcessor, + // _jsonRpcLocalStats, + // _jsonSerializer, + // maxBatchResponseBodySize: _jsonRpcConfig.MaxBatchResponseBodySize); + + // await socketsClient.ReceiveLoopAsync(); + // } + // catch (IOException exc) when (exc.InnerException is SocketException { SocketErrorCode: SocketError.ConnectionReset }) + // { + // LogDebug("Client disconnected."); + // } + // catch (SocketException exc) when (exc.SocketErrorCode == SocketError.ConnectionReset || exc.ErrorCode == OperationCancelledError) + // { + // LogDebug("Client disconnected."); + // } + // catch (SocketException exc) + // { + // if (_logger.IsWarn) _logger.Warn($"Error {exc.ErrorCode}:{exc.Message}"); + // } + // catch (Exception exc) + // { + // if (_logger.IsError) _logger.Error("Error when handling IPC communication with a client.", exc); + // } + //} private void DeleteSocketFileIfExists(string path) { diff --git a/src/Nethermind/Nethermind.Runner/Properties/launchSettings.json b/src/Nethermind/Nethermind.Runner/Properties/launchSettings.json index b576593a76c8..18b967d58bf0 100644 --- a/src/Nethermind/Nethermind.Runner/Properties/launchSettings.json +++ b/src/Nethermind/Nethermind.Runner/Properties/launchSettings.json @@ -125,7 +125,7 @@ }, "WSL": { "commandName": "WSL", - "commandLineArgs": "\"{OutDir}/nethermind.dll\" -c holesky --data-dir .data", + "commandLineArgs": "\"{OutDir}/nethermind.dll\" -c holesky --data-dir .data --JsonRpc.IpcUnixDomainSocketPath /home/rubo/ipc.f", "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development" },