Skip to content

Commit

Permalink
Merge pull request #3430 from AElfProject/release/1.4.1
Browse files Browse the repository at this point in the history
Release v1.4.1
  • Loading branch information
jason-aelf authored Jun 28, 2023
2 parents ff547f2 + 2077337 commit a5cc8dd
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 8 deletions.
3 changes: 2 additions & 1 deletion src/AElf.OS.Network.Grpc/Connection/PeerDialer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ private async Task<GrpcStreamPeer> DailStreamPeerAsync(GrpcClient client, DnsEnd
try
{
var nodePubkey = (await _accountService.GetPublicKeyAsync()).ToHex();
var call = client.Client.RequestByStream(new CallOptions().WithDeadline(DateTime.MaxValue));
var headers = new Metadata { new(GrpcConstants.GrpcRequestCompressKey, GrpcConstants.GrpcGzipConst) };
var call = client.Client.RequestByStream(new CallOptions().WithHeaders(headers).WithDeadline(DateTime.MaxValue));
var streamPeer = new GrpcStreamPeer(client, remoteEndpoint, connectionInfo, call, null, _streamTaskResourcePool,
new Dictionary<string, string>()
{
Expand Down
10 changes: 8 additions & 2 deletions src/AElf.OS.Network.Grpc/Peer/GrpcStreamBackPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public override async Task DisconnectAsync(bool gracefulDisconnect)
{
if (!IsConnected) return;
IsConnected = false;
IsClosed = true;
_sendStreamJobs.Complete();
// send disconnect message if the peer is still connected and the connection
// is stable.
Expand All @@ -78,14 +79,14 @@ await RequestAsync(() => StreamRequestAsync(MessageType.Disconnect,

public override Task<bool> TryRecoverAsync()
{
return Task.FromResult(true);
return Task.FromResult(false);
}


public override NetworkException HandleRpcException(RpcException exception, string errorMessage)
{
var message = $"Failed request to {this}: {errorMessage}";
var type = NetworkExceptionType.Rpc;
var type = NetworkExceptionType.Rpc;
if (exception.StatusCode ==
// there was an exception, not related to connectivity.
StatusCode.Cancelled)
Expand All @@ -101,4 +102,9 @@ public override NetworkException HandleRpcException(RpcException exception, stri

return new NetworkException(message, exception, type);
}

public override string ToString()
{
return $"{{ streamBackPeer listening-port: {RemoteEndpoint}, key: {Info.Pubkey.Substring(0, 45)}... }}";
}
}
25 changes: 21 additions & 4 deletions src/AElf.OS.Network.Grpc/Peer/GrpcStreamPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class GrpcStreamPeer : GrpcPeer
private StreamSendCallBack _streamSendCallBack;

protected readonly ActionBlock<StreamJob> _sendStreamJobs;
protected bool IsClosed = false;
public ILogger<GrpcStreamPeer> Logger { get; set; }

public GrpcStreamPeer(GrpcClient client, DnsEndPoint remoteEndpoint, PeerConnectionInfo peerConnectionInfo,
Expand All @@ -54,8 +55,11 @@ public GrpcStreamPeer(GrpcClient client, DnsEndPoint remoteEndpoint, PeerConnect

public AsyncDuplexStreamingCall<StreamMessage, StreamMessage> BuildCall()
{
_duplexStreamingCall = _client.RequestByStream(new CallOptions().WithDeadline(DateTime.MaxValue));
if (_client == null) return null;
var headers = new Metadata { new(GrpcConstants.GrpcRequestCompressKey, GrpcConstants.GrpcGzipConst) };
_duplexStreamingCall = _client.RequestByStream(new CallOptions().WithHeaders(headers).WithDeadline(DateTime.MaxValue));
_clientStreamWriter = _duplexStreamingCall.RequestStream;
IsClosed = false;
return _duplexStreamingCall;
}

Expand All @@ -75,6 +79,7 @@ public async Task DisposeAsync()
await _duplexStreamingCall?.RequestStream?.CompleteAsync();
_duplexStreamingCall?.Dispose();
_streamListenTaskTokenSource?.Cancel();
IsClosed = true;
}

public override async Task DisconnectAsync(bool gracefulDisconnect)
Expand Down Expand Up @@ -104,7 +109,7 @@ public async Task<HandshakeReply> HandShakeAsync(HandshakeRequest request)
var requestId = CommonHelper.GenerateRequestId();
var grpcRequest = new GrpcRequest { ErrorMessage = $"handshake failed.requestId={requestId}" };
var reply = await RequestAsync(() => StreamRequestAsync(MessageType.HandShake, request, metadata, requestId, true), grpcRequest);
return reply != null ? HandshakeReply.Parser.ParseFrom(reply.Message) : new HandshakeReply();
return reply != null ? HandshakeReply.Parser.ParseFrom(reply.Message) : new HandshakeReply() { Error = HandshakeError.InvalidConnection };
}

public override async Task ConfirmHandshakeAsync()
Expand Down Expand Up @@ -205,7 +210,8 @@ private async Task WriteStreamJobAsync(StreamJob job)
{
if (job.StreamMessage == null) return;
Logger.LogDebug("write request={requestId} {streamType}-{messageType}", job.StreamMessage.RequestId, job.StreamMessage.StreamType, job.StreamMessage.MessageType);

if (!(job.StreamMessage.StreamType == StreamType.Reply && job.StreamMessage.MessageType == MessageType.RequestBlocks))
_clientStreamWriter.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await _clientStreamWriter.WriteAsync(job.StreamMessage);
}
catch (RpcException ex)
Expand All @@ -215,7 +221,12 @@ private async Task WriteStreamJobAsync(StreamJob job)
}
catch (Exception e)
{
var type = e is InvalidOperationException or TimeoutException ? NetworkExceptionType.PeerUnstable : NetworkExceptionType.HandlerException;
var type = e switch
{
InvalidOperationException => NetworkExceptionType.Unrecoverable,
TimeoutException => NetworkExceptionType.PeerUnstable,
_ => NetworkExceptionType.HandlerException
};
job.SendCallback?.Invoke(
new NetworkException($"{job.StreamMessage?.RequestId}{job.StreamMessage?.StreamType}-{job.StreamMessage?.MessageType} size={job.StreamMessage.ToByteArray().Length} queueCount={_sendStreamJobs.InputCount}", e, type));
await Task.Delay(StreamRecoveryWaitTime);
Expand All @@ -237,6 +248,7 @@ protected async Task<TResp> RequestAsync<TResp>(Func<Task<TResp>> func, GrpcRequ

protected async Task<StreamMessage> StreamRequestAsync(MessageType messageType, IMessage message, Metadata header = null, string requestId = null, bool graceful = false)
{
if (IsClosed) return null;
for (var i = 0; i < TimeOutRetryTimes; i++)
{
requestId = requestId == null || i > 0 ? CommonHelper.GenerateRequestId() : requestId;
Expand Down Expand Up @@ -352,4 +364,9 @@ public override NetworkException HandleRpcException(RpcException exception, stri

return new NetworkException(message, exception, type);
}

public override string ToString()
{
return $"{{ streamPeer listening-port: {RemoteEndpoint}, key: {Info.Pubkey.Substring(0, 45)}... }}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public async Task GrpcStreamPeerTest()
new StreamTaskResourcePool(), new Dictionary<string, string>() { { GrpcConstants.PubkeyMetadataKey, nodePubkey } });
peerback.ConnectionStatus.ShouldBe("Stream Closed");
var res = await peerback.TryRecoverAsync();
res.ShouldBe(true);
res.ShouldBe(false);
var re = peerback.HandleRpcException(new RpcException(new Status(StatusCode.Cancelled, "")), "");
re.ExceptionType.ShouldBe(NetworkExceptionType.Unrecoverable);
re = peerback.HandleRpcException(new RpcException(new Status(StatusCode.Unknown, "")), "");
Expand Down

0 comments on commit a5cc8dd

Please sign in to comment.