Skip to content

Commit

Permalink
Merge pull request #3402 from AElfProject/feature/new-stable-stream
Browse files Browse the repository at this point in the history
improve grpc stream stability and performance
  • Loading branch information
jason-aelf authored Jun 14, 2023
2 parents 841eedc + 5b3fb96 commit c0f24b9
Show file tree
Hide file tree
Showing 18 changed files with 471 additions and 137 deletions.
6 changes: 3 additions & 3 deletions src/AElf.OS.Core/AElf.OS.Core.csproj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\common.props"/>
<Import Project="..\..\common.props" />
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>AElf.OS</RootNamespace>
Expand All @@ -8,8 +8,8 @@
<Description>Core module for the OS layer.</Description>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\AElf.Kernel.Node\AElf.Kernel.Node.csproj"/>
<ProjectReference Include="..\AElf.Kernel.Token\AElf.Kernel.Token.csproj"/>
<ProjectReference Include="..\AElf.Kernel.Node\AElf.Kernel.Node.csproj" />
<ProjectReference Include="..\AElf.Kernel.Token\AElf.Kernel.Token.csproj" />
</ItemGroup>
<ItemGroup>
<CommonMessage Include="..\..\protobuf\network_types.proto">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ namespace AElf.OS.Network.Events;

public class StreamMessageReceivedEvent
{
public StreamMessageReceivedEvent(ByteString message, string clientPubkey)
public StreamMessageReceivedEvent(ByteString message, string clientPubkey, string requestId)
{
Message = message;
ClientPubkey = clientPubkey;
RequestId = requestId;
}

public ByteString Message { get; }

public string ClientPubkey { get; }

public string RequestId { get; }
}
16 changes: 16 additions & 0 deletions src/AElf.OS.Core/Network/Events/StreamPeerExceptionEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using AElf.OS.Network.Application;
using AElf.OS.Network.Infrastructure;

namespace AElf.OS.Network.Events;

public class StreamPeerExceptionEvent
{
public NetworkException Exception { get; }
public IPeer Peer { get; }

public StreamPeerExceptionEvent(NetworkException exception, IPeer peer)
{
Exception = exception;
Peer = peer;
}
}
47 changes: 33 additions & 14 deletions src/AElf.OS.Network.Grpc/Connection/PeerDialer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public class PeerDialer : IPeerDialer
{
private readonly IAccountService _accountService;
private readonly IHandshakeProvider _handshakeProvider;
private KeyCertificatePair _clientKeyCertificatePair;
private IStreamTaskResourcePool _streamTaskResourcePool;
private readonly IStreamTaskResourcePool _streamTaskResourcePool;
public ILocalEventBus EventBus { get; set; }

public PeerDialer(IAccountService accountService,
Expand All @@ -46,8 +45,6 @@ public PeerDialer(IAccountService accountService,
EventBus = NullLocalEventBus.Instance;

Logger = NullLogger<PeerDialer>.Instance;

CreateClientKeyCertificatePair();
}

private NetworkOptions NetworkOptions => NetworkOptionsSnapshot.Value;
Expand Down Expand Up @@ -142,7 +139,17 @@ public async Task<GrpcPeer> DialBackPeerByStreamAsync(DnsEndPoint remoteEndpoint
};
Logger.LogWarning("DialBackPeerByStreamAsync meta={meta}", meta);
var peer = new GrpcStreamBackPeer(remoteEndpoint, info, responseStream, _streamTaskResourcePool, meta);

peer.SetStreamSendCallBack(async (ex, streamMessage, callTimes) =>
{
if (ex == null)
Logger.LogDebug("streamRequest write success {times}-{requestId}-{messageType}-{this}-{latency}", callTimes, streamMessage.RequestId, streamMessage.MessageType, peer,
CommonHelper.GetRequestLatency(streamMessage.RequestId));
else
{
Logger.LogError(ex, "streamRequest write fail, {requestId}-{messageType}-{this}", streamMessage.RequestId, streamMessage.MessageType, peer);
await EventBus.PublishAsync(new StreamPeerExceptionEvent(ex, peer), false);
}
});
peer.UpdateLastReceivedHandshake(handshake);

return peer;
Expand Down Expand Up @@ -191,11 +198,6 @@ public async Task<GrpcPeer> DialBackPeerAsync(DnsEndPoint remoteEndpoint, Handsh
return peer;
}

private void CreateClientKeyCertificatePair()
{
_clientKeyCertificatePair = TlsHelper.GenerateKeyCertificatePair();
}

/// <summary>
/// Calls the server side DoHandshake RPC method, in order to establish a 2-way connection.
/// </summary>
Expand Down Expand Up @@ -245,6 +247,17 @@ private async Task<GrpcStreamPeer> DailStreamPeerAsync(GrpcClient client, DnsEnd
{ GrpcConstants.PubkeyMetadataKey, nodePubkey },
{ GrpcConstants.PeerInfoMetadataKey, connectionInfo.ToString() }
});
streamPeer.SetStreamSendCallBack(async (ex, streamMessage, callTimes) =>
{
if (ex == null)
Logger.LogDebug("streamRequest write success {times}-{requestId}-{messageType}-{this}-{latency}", callTimes, streamMessage.RequestId, streamMessage.MessageType, streamPeer,
CommonHelper.GetRequestLatency(streamMessage.RequestId));
else
{
Logger.LogError(ex, "streamRequest write fail, {requestId}-{messageType}-{this}", streamMessage.RequestId, streamMessage.MessageType, streamPeer);
await EventBus.PublishAsync(new StreamPeerExceptionEvent(ex, streamPeer), false);
}
});
var success = await BuildStreamForPeerAsync(streamPeer, call);
return success ? streamPeer : null;
}
Expand All @@ -266,12 +279,17 @@ public async Task<bool> BuildStreamForPeerAsync(GrpcStreamPeer streamPeer, Async
{
try
{
await call.ResponseStream.ForEachAsync(async req => await
EventBus.PublishAsync(new StreamMessageReceivedEvent(req.ToByteString(), streamPeer.Info.Pubkey), false));
await call.ResponseStream.ForEachAsync(async req =>
{
Logger.LogDebug("listenReceive request={requestId} {streamType}-{messageType} latency={latency}", req.RequestId, req.StreamType, req.MessageType, CommonHelper.GetRequestLatency(req.RequestId));
await EventBus.PublishAsync(new StreamMessageReceivedEvent(req.ToByteString(), streamPeer.Info.Pubkey, req.RequestId), false);
});
Logger.LogWarning("listen end and complete {remoteEndPoint}", streamPeer.RemoteEndpoint.ToString());
}
catch (Exception e)
{
if (e is RpcException exception)
await EventBus.PublishAsync(new StreamPeerExceptionEvent(streamPeer.HandleRpcException(exception, "listen err {remoteEndPoint}"), streamPeer));
Logger.LogError(e, "listen err {remoteEndPoint}", streamPeer.RemoteEndpoint.ToString());
}
}, tokenSource.Token);
Expand Down Expand Up @@ -329,8 +347,9 @@ private async Task<GrpcClient> CreateClientAsync(DnsEndPoint remoteEndpoint)
return null;

Logger.LogDebug($"Upgrading connection to TLS: {certificate}.");
var clientKeyCertificatePair = TlsHelper.GenerateKeyCertificatePair();
ChannelCredentials credentials =
new SslCredentials(TlsHelper.ObjectToPem(certificate), _clientKeyCertificatePair);
new SslCredentials(TlsHelper.ObjectToPem(certificate), clientKeyCertificatePair);

var channel = new Channel(remoteEndpoint.ToString(), credentials, new List<ChannelOption>
{
Expand All @@ -340,7 +359,7 @@ private async Task<GrpcClient> CreateClientAsync(DnsEndPoint remoteEndpoint)
new(GrpcConstants.GrpcArgKeepalivePermitWithoutCalls, GrpcConstants.GrpcArgKeepalivePermitWithoutCallsOpen),
new(GrpcConstants.GrpcArgHttp2MaxPingsWithoutData, GrpcConstants.GrpcArgHttp2MaxPingsWithoutDataVal),
new(GrpcConstants.GrpcArgKeepaliveTimeoutMs, GrpcConstants.GrpcArgKeepaliveTimeoutMsVal),
new(GrpcConstants.GrpcArgKeepaliveTimeMs, GrpcConstants.GrpcArgKeepaliveTimeMsVal)
new(GrpcConstants.GrpcArgKeepaliveTimeMs, GrpcConstants.GrpcArgKeepaliveTimeMsVal),
});

var nodePubkey = AsyncHelper.RunSync(() => _accountService.GetPublicKeyAsync()).ToHex();
Expand Down
2 changes: 2 additions & 0 deletions src/AElf.OS.Network.Grpc/GrpcConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ public static class GrpcConstants
public const string GrpcArgHttp2MaxPingsWithoutData = "grpc.http2_max_pings_without_data";
public const string GrpcArgKeepaliveTimeoutMs = "grpc.keepalive_timeout_ms";
public const string GrpcArgKeepaliveTimeMs = "grpc.keepalive_time_ms";
// public const string GrpcArgHttp2WriteBufferSize = "grpc.http2.write_buffer_size";

public const int GrpcArgKeepalivePermitWithoutCallsOpen = 1;
public const int GrpcArgHttp2MaxPingsWithoutDataVal = 0;
public const int GrpcArgKeepaliveTimeoutMsVal = 60 * 1000;
public const int GrpcArgKeepaliveTimeMsVal = 2 * 60 * 60 * 1000;
// public const int GrpcArgHttp2WriteBufferSizeVal = 6 * 1024;

public const string GrpcGzipConst = "gzip";

Expand Down
2 changes: 1 addition & 1 deletion src/AElf.OS.Network.Grpc/GrpcNetworkModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public override void ConfigureServices(ServiceConfigurationContext context)
context.Services.AddSingleton<PeerService.PeerServiceBase, GrpcServerService>();

// Internal dependencies
context.Services.AddTransient<IPeerDialer, PeerDialer>();
context.Services.AddSingleton<IPeerDialer, PeerDialer>();
context.Services.AddSingleton<GrpcServerService>();

context.Services.AddSingleton<AuthInterceptor>();
Expand Down
7 changes: 7 additions & 0 deletions src/AElf.OS.Network.Grpc/Helpers/CommonHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ public static string GenerateRequestId()
return timeMs.ToString() + '_' + guid;
}

public static long GetRequestLatency(string requestId)
{
var sp = requestId.Split("_");
if (sp.Length != 2) return -1;
return long.TryParse(sp[0], out var start) ? DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start : -1;
}

public static bool GreaterThanSupportStreamMinVersion(this string version, string minVersion)
{
return Version.Parse(version).CompareTo(Version.Parse(minVersion)) >= 0;
Expand Down
8 changes: 4 additions & 4 deletions src/AElf.OS.Network.Grpc/Peer/GrpcPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ namespace AElf.OS.Network.Grpc;
public class GrpcPeer : IPeer
{
private const int MaxMetricsPerMethod = 100;
protected const int BlockRequestTimeout = 700;
protected const int CheckHealthTimeout = 1000;
protected const int BlockRequestTimeout = 2000;
protected const int CheckHealthTimeout = 2000;
protected const int BlocksRequestTimeout = 5000;
protected const int GetNodesTimeout = 500;
protected const int GetNodesTimeout = 2000;
protected const int UpdateHandshakeTimeout = 3000;
protected const int StreamRecoveryWaitTime = 500;

Expand Down Expand Up @@ -394,7 +394,7 @@ protected virtual void RecordMetric(GrpcRequest grpcRequest, Timestamp requestSt
/// This method handles the case where the peer is potentially down. If the Rpc call
/// put the channel in TransientFailure or Connecting, we give the connection a certain time to recover.
/// </summary>
protected virtual NetworkException HandleRpcException(RpcException exception, string errorMessage)
public virtual NetworkException HandleRpcException(RpcException exception, string errorMessage)
{
var message = $"Failed request to {this}: {errorMessage}";
var type = NetworkExceptionType.Rpc;
Expand Down
33 changes: 29 additions & 4 deletions src/AElf.OS.Network.Grpc/Peer/GrpcStreamBackPeer.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using AElf.OS.Network.Application;
using AElf.OS.Network.Grpc.Helpers;
using AElf.OS.Network.Protocol.Types;
using AElf.Types;
using Grpc.Core;

namespace AElf.OS.Network.Grpc;
Expand All @@ -22,13 +25,35 @@ public GrpcStreamBackPeer(DnsEndPoint remoteEndpoint, PeerConnectionInfo peerCon

public override async Task CheckHealthAsync()
{
var request = new GrpcRequest { ErrorMessage = "Check health failed." };
var requestId = CommonHelper.GenerateRequestId();
var request = new GrpcRequest { ErrorMessage = $"Check health failed.requestId={requestId}" };

var data = new Metadata
{
{ GrpcConstants.TimeoutMetadataKey, CheckHealthTimeout.ToString() },
};
await RequestAsync(() => StreamRequestAsync(MessageType.HealthCheck, new HealthCheckRequest(), data), request);
await RequestAsync(() => StreamRequestAsync(MessageType.HealthCheck, new HealthCheckRequest(), data, requestId), request);
}

public override async Task<List<BlockWithTransactions>> GetBlocksAsync(Hash firstHash, int count)
{
var blockRequest = new BlocksRequest { PreviousBlockHash = firstHash, Count = count };
var blockInfo = $"{{ first: {firstHash}, count: {count} }}";

var requestId = CommonHelper.GenerateRequestId();
var request = new GrpcRequest
{
ErrorMessage = $"Get blocks for {blockInfo} failed.requestId={requestId}",
MetricName = nameof(MetricNames.GetBlocks),
MetricInfo = $"Get blocks for {blockInfo}"
};

var data = new Metadata
{
{ GrpcConstants.TimeoutMetadataKey, BlocksRequestTimeout.ToString() },
};
var listMessage = await RequestAsync(() => StreamRequestAsync(MessageType.RequestBlocks, blockRequest, data, requestId), request);
return listMessage != null ? BlockList.Parser.ParseFrom(listMessage.Message).Blocks.ToList() : new List<BlockWithTransactions>();
}

public override async Task DisconnectAsync(bool gracefulDisconnect)
Expand Down Expand Up @@ -57,10 +82,10 @@ public override Task<bool> TryRecoverAsync()
}


protected override NetworkException HandleRpcException(RpcException exception, string errorMessage)
public override NetworkException HandleRpcException(RpcException exception, string errorMessage)
{
var message = $"Failed request to {this}: {errorMessage}";
var type = NetworkExceptionType.Rpc;
var type = NetworkExceptionType.Rpc;
if (exception.StatusCode ==
// there was an exception, not related to connectivity.
StatusCode.Cancelled)
Expand Down
Loading

0 comments on commit c0f24b9

Please sign in to comment.