Skip to content

Commit

Permalink
Incorrect batch unwrapping in JSON-RPC replay tool (#7279)
Browse files Browse the repository at this point in the history
  • Loading branch information
emlautarom1 authored Aug 2, 2024
1 parent f95fe4f commit 3b36366
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 55 deletions.
18 changes: 11 additions & 7 deletions tools/Nethermind.Tools.Kute/Application.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
using Nethermind.Tools.Kute.ResponseTracer;
using System.Collections.Concurrent;
using Nethermind.Tools.Kute.FlowManager;
using System.Threading.Tasks;

namespace Nethermind.Tools.Kute;

Expand Down Expand Up @@ -54,7 +53,7 @@ public async Task Run()
{
_progressReporter.ReportStart();

BlockingCollection<(Task<HttpResponseMessage>, JsonRpc)> responseTasks = new BlockingCollection<(Task<HttpResponseMessage>, JsonRpc)>();
BlockingCollection<(Task<HttpResponseMessage?>, JsonRpc)> responseTasks = new BlockingCollection<(Task<HttpResponseMessage?>, JsonRpc)>();
var responseHandlingTask = Task.Run(async () =>
{
await Parallel.ForEachAsync(responseTasks.GetConsumingEnumerable(), async (task, ct) =>
Expand Down Expand Up @@ -121,7 +120,7 @@ await Parallel.ForEachAsync(responseTasks.GetConsumingEnumerable(), async (task,
await _metricsConsumer.ConsumeMetrics(_metrics);
}

public async Task AnalyzeRequest((Task<HttpResponseMessage>, JsonRpc) task)
public async Task AnalyzeRequest((Task<HttpResponseMessage?>, JsonRpc) task)
{
switch (task.Item2)
{
Expand All @@ -133,12 +132,14 @@ public async Task AnalyzeRequest((Task<HttpResponseMessage>, JsonRpc) task)
}
case JsonRpc.BatchJsonRpc batch:
{
HttpResponseMessage content;
HttpResponseMessage? content;
using (_metrics.TimeBatch())
{
content = await task.Item1;
}
var deserialized = JsonSerializer.Deserialize<JsonDocument>(content.Content.ReadAsStream());
var deserialized = content is not null
? JsonSerializer.Deserialize<JsonDocument>(await content.Content.ReadAsStreamAsync())
: null;

if (_validator.IsInvalid(batch, deserialized))
{
Expand All @@ -155,12 +156,15 @@ public async Task AnalyzeRequest((Task<HttpResponseMessage>, JsonRpc) task)
}
case JsonRpc.SingleJsonRpc single:
{
HttpResponseMessage content;
HttpResponseMessage? content;
using (_metrics.TimeMethod(single.MethodName))
{
content = await task.Item1;
}
var deserialized = JsonSerializer.Deserialize<JsonDocument>(content.Content.ReadAsStream());

var deserialized = content is not null
? JsonSerializer.Deserialize<JsonDocument>(await content.Content.ReadAsStreamAsync())
: null;

if (single.MethodName is null)
{
Expand Down
19 changes: 14 additions & 5 deletions tools/Nethermind.Tools.Kute/JsonRpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,29 @@ namespace Nethermind.Tools.Kute;

public abstract record JsonRpc
{
public JsonDocument Document;
private readonly JsonDocument _document;

private JsonRpc(JsonDocument document)
{
Document = document;
_document = document;
}

public string ToJsonString() => Document.RootElement.ToString();
public string ToJsonString() => _document.RootElement.ToString();

public record BatchJsonRpc : JsonRpc
{
public BatchJsonRpc(JsonDocument document) : base(document) { }

public override string ToString() => $"{nameof(BatchJsonRpc)} {ToJsonString()}";

public IEnumerable<SingleJsonRpc?> Items()
{
foreach (var element in _document.RootElement.EnumerateArray())
{
var document = JsonSerializer.Deserialize<JsonDocument>(element);
yield return document is null ? null : new SingleJsonRpc(document);
}
}
}

public record SingleJsonRpc : JsonRpc
Expand All @@ -31,11 +40,11 @@ public record SingleJsonRpc : JsonRpc
public SingleJsonRpc(JsonDocument document) : base(document)
{
_isResponse = new(() =>
Document.RootElement.TryGetProperty("response", out _)
_document.RootElement.TryGetProperty("response", out _)
);
_methodName = new(() =>
{
if (Document.RootElement.TryGetProperty("method", out var jsonMethodField))
if (_document.RootElement.TryGetProperty("method", out var jsonMethodField))
{
return jsonMethodField.GetString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@

namespace Nethermind.Tools.Kute.JsonRpcMethodFilter;

class ComposedJsonRpcMethodFilter(IEnumerable<IJsonRpcMethodFilter> filters) : IJsonRpcMethodFilter
class ComposedJsonRpcMethodFilter : IJsonRpcMethodFilter
{
private readonly IEnumerable<IJsonRpcMethodFilter> _filters = filters;
private readonly List<IJsonRpcMethodFilter> _filters;
private readonly bool _hasNoFilters;

private bool HasFilters => _filters?.Any() ?? false;
public ComposedJsonRpcMethodFilter(List<IJsonRpcMethodFilter> filters)
{
_filters = filters;
_hasNoFilters = filters.Count == 0;
}

public bool ShouldSubmit(string methodName) => !HasFilters || _filters.Any(f => f.ShouldSubmit(methodName));
public bool ShouldSubmit(string methodName) => _hasNoFilters || _filters.Any(f => f.ShouldSubmit(methodName));
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public HttpJsonRpcSubmitter(HttpClient httpClient, IAuth auth, string hostAddres
_uri = new Uri(hostAddress);
}

public async Task<HttpResponseMessage> Submit(JsonRpc rpc)
public async Task<HttpResponseMessage?> Submit(JsonRpc rpc)
{
var request = new HttpRequestMessage(HttpMethod.Post, _uri)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ namespace Nethermind.Tools.Kute.JsonRpcSubmitter;

interface IJsonRpcSubmitter
{
Task<HttpResponseMessage> Submit(JsonRpc rpc);
Task<HttpResponseMessage?> Submit(JsonRpc rpc);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,5 @@ namespace Nethermind.Tools.Kute.JsonRpcSubmitter;

class NullJsonRpcSubmitter : IJsonRpcSubmitter
{

public Task<HttpResponseMessage> Submit(JsonRpc rpc) => Task.FromResult<HttpResponseMessage>(null);

public Task<HttpResponseMessage?> Submit(JsonRpc rpc) => Task.FromResult<HttpResponseMessage?>(null);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@ namespace Nethermind.Tools.Kute.MessageProvider;
public class JsonRpcMessageProvider : IMessageProvider<JsonRpc?>
{
private readonly IMessageProvider<string> _provider;
private readonly bool _unwrapBatches;


public JsonRpcMessageProvider(IMessageProvider<string> provider, bool unwrapBatches)
public JsonRpcMessageProvider(IMessageProvider<string> provider)
{
_provider = provider;
_unwrapBatches = unwrapBatches;
}


public IAsyncEnumerable<JsonRpc?> Messages { get => MessagesImpl(); }

private async IAsyncEnumerable<JsonRpc?> MessagesImpl()
Expand All @@ -26,26 +22,12 @@ public JsonRpcMessageProvider(IMessageProvider<string> provider, bool unwrapBatc
{
var jsonDoc = JsonSerializer.Deserialize<JsonDocument>(msg);

switch (jsonDoc?.RootElement.ValueKind)
yield return jsonDoc?.RootElement.ValueKind switch
{
case JsonValueKind.Object:
yield return new JsonRpc.SingleJsonRpc(jsonDoc);
break;
case JsonValueKind.Array:
if (_unwrapBatches)
{
foreach (JsonElement single in jsonDoc.RootElement.EnumerateArray())
{
yield return new JsonRpc.SingleJsonRpc(JsonDocument.Parse(single.ToString()));
}
}
yield return new JsonRpc.BatchJsonRpc(jsonDoc);
break;
default:
yield return null;
break;

}
JsonValueKind.Object => new JsonRpc.SingleJsonRpc(jsonDoc),
JsonValueKind.Array => new JsonRpc.BatchJsonRpc(jsonDoc),
_ => null
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

namespace Nethermind.Tools.Kute.MessageProvider;

public class UnwrapBatchJsonRpcMessageProvider : IMessageProvider<JsonRpc?>
{
private readonly IMessageProvider<JsonRpc?> _provider;

public UnwrapBatchJsonRpcMessageProvider(IMessageProvider<JsonRpc?> provider)
{
_provider = provider;
}

public IAsyncEnumerable<JsonRpc?> Messages { get => MessagesImpl(); }

private async IAsyncEnumerable<JsonRpc?> MessagesImpl()
{
await foreach (var jsonRpc in _provider.Messages)
{
switch (jsonRpc)
{
case JsonRpc.SingleJsonRpc:
yield return jsonRpc;
break;
case JsonRpc.BatchJsonRpc batch:
foreach (JsonRpc.SingleJsonRpc? single in batch.Items())
{
yield return single;
}
break;
}
}
}
}
23 changes: 13 additions & 10 deletions tools/Nethermind.Tools.Kute/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ static IServiceProvider BuildServiceProvider(Config config)
collection.AddSingleton<IMessageProvider<JsonRpc?>>(serviceProvider =>
{
var messageProvider = serviceProvider.GetRequiredService<IMessageProvider<string>>();
bool unwrapBatches = config.UnwrapBatch;
return new JsonRpcMessageProvider(messageProvider, unwrapBatches);
});

var jsonMessageProvider = new JsonRpcMessageProvider(messageProvider);
return config.UnwrapBatch
? new UnwrapBatchJsonRpcMessageProvider(jsonMessageProvider)
: jsonMessageProvider;
});
collection.AddSingleton<IJsonRpcValidator>(
config.DryRun
? new NullJsonRpcValidator()
Expand All @@ -69,7 +70,7 @@ static IServiceProvider BuildServiceProvider(Config config)
collection.AddSingleton<IJsonRpcMethodFilter>(
new ComposedJsonRpcMethodFilter(
config.MethodFilters
.Select(pattern => new PatternJsonRpcMethodFilter(pattern))
.Select(pattern => new PatternJsonRpcMethodFilter(pattern) as IJsonRpcMethodFilter)
.ToList()
)
);
Expand All @@ -87,7 +88,7 @@ static IServiceProvider BuildServiceProvider(Config config)
// For dry runs we still want to trigger the generation of an AuthToken
// This is to ensure that all parameters required for the generation are correct,
// and not require a real run to verify that this is the case.
string _ = provider.GetRequiredService<IAuth>().AuthToken;
string _ = provider.GetRequiredService<IAuth>().AuthToken;
return new NullJsonRpcSubmitter();
});
collection.AddSingleton<IResponseTracer>(
Expand All @@ -99,13 +100,16 @@ static IServiceProvider BuildServiceProvider(Config config)
{
if (config.ShowProgress)
{
// TODO:
// NOTE:
// Terrible, terrible hack since it forces a double enumeration:
// - A first one to count the number of messages.
// - A second one to actually process each message.
// We can reduce the cost by not parsing each message on the first enumeration
// At the same time, this optimization relies on implementation details.
var messagesProvider = provider.GetRequiredService<IMessageProvider<JsonRpc?>>();
// only when we're not unwrapping batches. If we are, we need to parse.
// This optimization relies on implementation details.
IMessageProvider<object?> messagesProvider = config.UnwrapBatch
? provider.GetRequiredService<IMessageProvider<JsonRpc?>>()
: provider.GetRequiredService<IMessageProvider<string>>();
var totalMessages = messagesProvider.Messages.ToEnumerable().Count();
return new ConsoleProgressReporter(totalMessages);
}
Expand All @@ -121,7 +125,6 @@ static IServiceProvider BuildServiceProvider(Config config)
_ => throw new ArgumentOutOfRangeException(),
}
);

collection.AddSingleton<IJsonRpcFlowManager>(new JsonRpcFlowManager(config.RequestsPerSecond, config.UnwrapBatch));

return collection.BuildServiceProvider();
Expand Down

0 comments on commit 3b36366

Please sign in to comment.