diff --git a/gold/p2p/tcp/tcp.go b/gold/p2p/tcp/tcp.go index 61118e4..883601a 100644 --- a/gold/p2p/tcp/tcp.go +++ b/gold/p2p/tcp/tcp.go @@ -3,9 +3,9 @@ package tcp import ( "errors" "io" - "math/rand" "net" "reflect" + "runtime" "strconv" "time" @@ -49,45 +49,22 @@ func (ep *EndPoint) Listen() (p2p.Conn, error) { ep.addr = lstn.Addr().(*net.TCPAddr) peerstimeout := ep.peerstimeout if peerstimeout < time.Second { - peerstimeout = time.Second + peerstimeout = time.Second * 5 } chansz := ep.recvchansize if chansz < 32 { chansz = 32 } conn := &Conn{ - addr: ep, - lstn: lstn, - peers: ttl.NewCacheOn(peerstimeout, [4]func(string, *net.TCPConn){ - nil, - nil, - func(s string, t *net.TCPConn) { - err := t.Close() - if err != nil { - logrus.Debugln("[tcp] close conn from", ep, "to", s, "err:", err) - } else { - logrus.Debugln("[tcp] close conn from", ep, "to", s) - } - }, - ep.keepAlive, - }), - recv: make(chan *connrecv, chansz), + addr: ep, + lstn: lstn, + peers: ttl.NewCache[string, *net.TCPConn](peerstimeout), + recv: make(chan *connrecv, chansz), } go conn.accept() return conn, nil } -func (ep *EndPoint) keepAlive(_ string, t *net.TCPConn) { - _, err := io.Copy(t, &packet{ - typ: packetTypeKeepAlive, - len: 1, - dat: []byte{byte(rand.Intn(256))}, - }) - if err != nil { - logrus.Debugln("[tcp] write keepalive from", ep, "to conn", t.RemoteAddr(), "err:", err) - } -} - type connrecv struct { addr *EndPoint // cast from tcpconn.RemoteAddr() pckt packet @@ -210,7 +187,7 @@ func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (n int, err error) { if dialtimeout < time.Second { dialtimeout = time.Second } - logrus.Infoln("[tcp] dial to", tcpep.addr, "timeout", dialtimeout) + logrus.Debugln("[tcp] dial to", tcpep.addr, "timeout", dialtimeout) var cn net.Conn // must use another port to send because there's no exsiting conn cn, err = net.DialTimeout(tcpep.Network(), tcpep.addr.String(), dialtimeout) @@ -221,8 +198,18 @@ func (conn *Conn) WriteToPeer(b []byte, ep p2p.EndPoint) (n int, err error) { if !ok { return 0, errors.New("expect *net.TCPConn but got " + reflect.ValueOf(cn).Type().String()) } - logrus.Infoln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr()) + runtime.SetFinalizer(tcpconn, func(t *net.TCPConn) { + err := t.CloseWrite() + if err != nil { + logrus.Debugln("[tcp] close write from", t.LocalAddr(), "to", t.RemoteAddr(), "err:", err) + } else { + logrus.Debugln("[tcp] close write from", t.LocalAddr(), "to", t.RemoteAddr()) + } + }) + logrus.Debugln("[tcp] dial to", tcpep.addr, "success, local:", tcpconn.LocalAddr()) conn.peers.Set(tcpep.String(), tcpconn) + } else { + logrus.Debugln("[tcp] reuse tcpconn from", tcpconn.LocalAddr(), "to", tcpconn.RemoteAddr()) } cnt, err := io.Copy(tcpconn, &packet{ typ: packetTypeNormal,