Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IPC data handling #7714

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static async Task<int> CountNumberOfMessages(Socket socket, CancellationToken to
while (true)
{
ReceiveResult? result = await stream.ReceiveAsync(buffer).ConfigureAwait(false);
if (result?.EndOfMessage == true)
if (result is not null && result.EndOfMessage && (!result.Closed || result.Read != 0))
rubo marked this conversation as resolved.
Show resolved Hide resolved
{
messages++;
}
Expand Down Expand Up @@ -162,13 +162,13 @@ async Task<int> ReadMessages(Socket socket, IList<byte[]> receivedMessages, Canc
if (result is not null)
{
msg.AddRange(buffer.Take(result.Read));
}

if (result?.EndOfMessage == true)
{
messages++;
receivedMessages.Add(msg.ToArray());
msg = [];
if (result.EndOfMessage == true && (!result.Closed || result.Read != 0))
{
messages++;
receivedMessages.Add(msg.ToArray());
msg = [];
}
}

if (result is null || result.Closed)
Expand Down Expand Up @@ -216,8 +216,8 @@ async Task<int> ReadMessages(Socket socket, IList<byte[]> receivedMessages, Canc
messageCount++;
var msg = Enumerable.Range(11, i).Select(x => (byte)x).ToArray();
sentMessages.Add(msg);
await stream.WriteAsync(msg).ConfigureAwait(false);
await stream.WriteEndOfMessageAsync().ConfigureAwait(false);
await stream.WriteAsync(msg.Append((byte)'\n').ToArray()).ConfigureAwait(false);

if (i % 10 == 0)
{
await Task.Delay(1).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@
"Docker": {
"commandName": "Docker",
"commandLineArgs": "-c holesky --data-dir .data /data --jsonrpc-enginehost 0.0.0.0 --jsonrpc-engineport 8551 --jsonrpc-host 0.0.0.0"
},
"WSL": {
"commandName": "WSL",
"commandLineArgs": "\"{OutDir}/nethermind.dll\" -c holesky --data-dir .data",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"distributionName": ""
}
}
}
93 changes: 43 additions & 50 deletions src/Nethermind/Nethermind.Sockets/IpcSocketMessageStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,76 +11,69 @@ namespace Nethermind.Sockets;

public class IpcSocketMessageStream(Socket socket) : NetworkStream(socket), IMessageBorderPreservingStream
{
private static readonly byte Delimiter = Convert.ToByte('\n');
private const byte Delimiter = (byte)'\n';

public byte[] bufferedData = [];
public int bufferedDataLength = 0;
private byte[] _bufferedData = [];
private int _bufferedDataLength = 0;

public async Task<ReceiveResult?> ReceiveAsync(ArraySegment<byte> buffer)
{
ReceiveResult? result = null;
if (Socket.Connected)
if (!Socket.Connected)
return null;

if (_bufferedDataLength > 0)
{
if (bufferedDataLength > 0)
if (_bufferedDataLength > buffer.Count)
throw new NotSupportedException($"Passed {nameof(buffer)} should be larger than internal one");

try
{
if (bufferedDataLength > buffer.Count)
{
throw new NotSupportedException($"Passed {nameof(buffer)} should be larger than internal one");
}
try
{
Buffer.BlockCopy(bufferedData, 0, buffer.Array!, buffer.Offset, bufferedDataLength);
}
catch (Exception)
{

}
Buffer.BlockCopy(_bufferedData, 0, buffer.Array!, buffer.Offset, _bufferedDataLength);
}
catch { }
}

int read = bufferedDataLength + await Socket.ReceiveAsync(buffer[bufferedDataLength..], SocketFlags.None);
int read = _bufferedDataLength + await Socket.ReceiveAsync(buffer[_bufferedDataLength..], SocketFlags.None);

int delimiter = ((IList<byte>)buffer[..read]).IndexOf(Delimiter);
int delimiter = ((IList<byte>)buffer[..read]).IndexOf(Delimiter);
bool endOfMessage;

bool endOfMessage;
if (delimiter != -1 && (delimiter + 1) < read)
{
bufferedDataLength = read - delimiter - 1;

if (bufferedData.Length < buffer.Count)
{
if (bufferedData.Length != 0)
{
ArrayPool<byte>.Shared.Return(bufferedData);
}
bufferedData = ArrayPool<byte>.Shared.Rent(buffer.Count);
}
endOfMessage = true;
buffer[(delimiter + 1)..read].CopyTo(bufferedData);
read = delimiter + 1;
}
else
if (delimiter != -1 && (delimiter + 1) < read)
{
_bufferedDataLength = read - delimiter - 1;

if (_bufferedData.Length < buffer.Count)
{
endOfMessage = delimiter != -1;
bufferedDataLength = 0;
if (_bufferedData.Length != 0)
ArrayPool<byte>.Shared.Return(_bufferedData);

_bufferedData = ArrayPool<byte>.Shared.Rent(buffer.Count);
}

result = new ReceiveResult()
{
Closed = read == 0,
Read = read > 0 && buffer[read - 1] == Delimiter ? read - 1 : read,
EndOfMessage = endOfMessage,
CloseStatusDescription = null
};
endOfMessage = true;
buffer[(delimiter + 1)..read].CopyTo(_bufferedData);
read = delimiter + 1;
}
else
{
endOfMessage = delimiter != -1 || Socket.Available == 0;
rubo marked this conversation as resolved.
Show resolved Hide resolved
_bufferedDataLength = 0;
}

return result;
return new()
{
Closed = read == 0,
Read = read > 0 && buffer[read - 1] == Delimiter ? read - 1 : read,
EndOfMessage = endOfMessage,
CloseStatusDescription = null
};
}

protected override void Dispose(bool disposing)
{
if (disposing && bufferedData.Length != 0)
if (disposing && _bufferedData.Length != 0)
{
ArrayPool<byte>.Shared.Return(bufferedData);
ArrayPool<byte>.Shared.Return(_bufferedData);
}
base.Dispose(disposing);
}
Expand Down
Loading