From 9a63b3c8869fb337cb38656985bcd8fdeb55ea9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Tue, 16 Jul 2024 23:04:19 +0900 Subject: [PATCH] fix(tcp): recv on new endpoint --- gold/link/listen.go | 9 +++++++-- gold/p2p/tcp/tcp.go | 30 +++++++++++++++++++++++++++++- gold/p2p/udp/udp.go | 3 +++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/gold/link/listen.go b/gold/link/listen.go index 7221974..cf389eb 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -106,8 +106,13 @@ func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish return } if p.endpoint == nil || !p.endpoint.Euqal(addr) { - logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String()) - p.endpoint = addr + if m.ep.Network() == "udp" { + logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String()) + p.endpoint = addr + } else if !addr.Euqal(p.endpoint) && p.endpoint == nil { // tcp/ws, ep not registered + logrus.Infoln("[listen] @", index, "set endpoint of peer", p.peerip, "to", addr.String()) + p.endpoint = addr + } } switch { case p.IsToMe(packet.Dst): diff --git a/gold/p2p/tcp/tcp.go b/gold/p2p/tcp/tcp.go index 2f641df..c9cae00 100644 --- a/gold/p2p/tcp/tcp.go +++ b/gold/p2p/tcp/tcp.go @@ -31,6 +31,9 @@ func (ep *EndPoint) Network() string { } func (ep *EndPoint) Euqal(ep2 p2p.EndPoint) bool { + if ep == nil || ep2 == nil { + return ep == nil && ep2 == nil + } tcpep2, ok := ep2.(*EndPoint) if !ok { return false @@ -113,6 +116,10 @@ func (conn *Conn) accept() { } func (conn *Conn) receive(ep *EndPoint) { + dialtimeout := conn.addr.dialtimeout + if dialtimeout < time.Second { + dialtimeout = time.Second + } for { r := &connrecv{addr: ep} if conn.addr == nil || conn.lstn == nil || conn.peers == nil || conn.recv == nil { @@ -123,7 +130,27 @@ func (conn *Conn) receive(ep *EndPoint) { return } r.conn = tcpconn - _, err := io.Copy(&r.pckt, tcpconn) + + stopch := make(chan struct{}) + t := time.AfterFunc(dialtimeout, func() { + stopch <- struct{}{} + }) + + var err error + copych := make(chan struct{}) + go func() { + _, err = io.Copy(&r.pckt, tcpconn) + copych <- struct{}{} + }() + + select { + case <-stopch: + logrus.Debugln("[tcp] recv from", ep, "timeout") + return + case <-copych: + t.Stop() + } + if err != nil { logrus.Debugln("[tcp] recv from", ep, "err:", err) return @@ -211,6 +238,7 @@ func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (n int, err error) { }) logrus.Debugln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr()) conn.peers.Set(tcpep.String(), tcpconn) + go conn.receive(tcpep) } else { logrus.Debugln("[tcp] reuse tcpconn from", tcpconn.LocalAddr(), "to", tcpconn.RemoteAddr()) } diff --git a/gold/p2p/udp/udp.go b/gold/p2p/udp/udp.go index 7694bb7..bc23e58 100644 --- a/gold/p2p/udp/udp.go +++ b/gold/p2p/udp/udp.go @@ -17,6 +17,9 @@ func (ep *EndPoint) Network() string { } func (ep *EndPoint) Euqal(ep2 p2p.EndPoint) bool { + if ep == nil || ep2 == nil { + return ep == nil && ep2 == nil + } udpep2, ok := ep2.(*EndPoint) if !ok { return false