From 9d715c8b10217a7c7a9618afa1adf79c0bf8cfe9 Mon Sep 17 00:00:00 2001 From: W192547975 <138874985+W192547975@users.noreply.github.com> Date: Sat, 18 Nov 2023 11:41:29 +0800 Subject: [PATCH 1/2] Update p2pnetwork.go(restart) --- core/p2pnetwork.go | 210 ++++++++++++++++++++++----------------------- 1 file changed, 104 insertions(+), 106 deletions(-) diff --git a/core/p2pnetwork.go b/core/p2pnetwork.go index 58c32dc..b679b75 100644 --- a/core/p2pnetwork.go +++ b/core/p2pnetwork.go @@ -78,7 +78,7 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { if config != nil { instance.config = *config } - instance.init() + instance.doInit() go instance.run() go func() { for { @@ -92,6 +92,14 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { return instance } +func (pn *P2PNetwork) doInit() { + if pn.init() != nil { + // init failed, retry + pn.restartCh <- true + gLog.Println(LvERROR, "P2PNetwork init error:", err) + } +} + func (pn *P2PNetwork) run() { heartbeatTimer := time.NewTicker(NetworkHeartbeatTime) pn.t1 = time.Now().UnixNano() @@ -103,12 +111,13 @@ func (pn *P2PNetwork) run() { pn.write(MsgHeartbeat, 0, "") case <-pn.restartCh: pn.online = false + if pn.conn != nil { + pn.conn.Close() + pn.conn = nil + } pn.wgReconnect.Wait() // wait read/autorunapp goroutine end time.Sleep(ClientAPITimeout) - err := pn.init() - if err != nil { - gLog.Println(LvERROR, "P2PNetwork init error:", err) - } + pn.doInit() } } } @@ -471,107 +480,97 @@ func (pn *P2PNetwork) init() error { pn.wgReconnect.Add(1) defer pn.wgReconnect.Done() var err error - for { - // detect nat type - pn.config.publicIP, pn.config.natType, pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, err = getNATType(pn.config.ServerHost, pn.config.UDPPort1, pn.config.UDPPort2) - // for testcase - if strings.Contains(pn.config.Node, "openp2pS2STest") { - pn.config.natType = NATSymmetric - pn.config.hasIPv4 = 0 - pn.config.hasUPNPorNATPMP = 0 - gLog.Println(LvINFO, "openp2pS2STest debug") - - } - if strings.Contains(pn.config.Node, "openp2pC2CTest") { - pn.config.natType = NATCone - pn.config.hasIPv4 = 0 - pn.config.hasUPNPorNATPMP = 0 - gLog.Println(LvINFO, "openp2pC2CTest debug") - } - if err != nil { - gLog.Println(LvDEBUG, "detect NAT type error:", err) - break - } - if pn.config.hasIPv4 == 1 || pn.config.hasUPNPorNATPMP == 1 { - onceV4Listener.Do(func() { - v4l = &v4Listener{port: gConf.Network.TCPPort} - go v4l.start() - }) - } - gLog.Printf(LvINFO, "hasIPv4:%d, UPNP:%d, NAT type:%d, publicIP:%s", pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, pn.config.natType, pn.config.publicIP) - gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort) - uri := "/api/v1/login" - caCertPool, err := x509.SystemCertPool() - if err != nil { - gLog.Println(LvERROR, "Failed to load system root CAs:", err) - } else { - caCertPool = x509.NewCertPool() - } - caCertPool.AppendCertsFromPEM([]byte(rootCA)) - caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1)) - config := tls.Config{ - RootCAs: caCertPool, - InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert - websocket.DefaultDialer.TLSClientConfig = &config - websocket.DefaultDialer.HandshakeTimeout = ClientAPITimeout - u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri} - q := u.Query() - q.Add("node", pn.config.Node) - q.Add("token", fmt.Sprintf("%d", pn.config.Token)) - q.Add("version", OpenP2PVersion) - q.Add("nattype", fmt.Sprintf("%d", pn.config.natType)) - q.Add("sharebandwidth", fmt.Sprintf("%d", pn.config.ShareBandwidth)) - u.RawQuery = q.Encode() - var ws *websocket.Conn - ws, _, err = websocket.DefaultDialer.Dial(u.String(), nil) - if err != nil { - gLog.Println(LvERROR, "Dial error:", err) - break - } - pn.online = true - pn.conn = ws - localAddr := strings.Split(ws.LocalAddr().String(), ":") - if len(localAddr) == 2 { - pn.config.localIP = localAddr[0] - } else { - err = errors.New("get local ip failed") - break - } - go pn.readLoop() - pn.config.mac = getmac(pn.config.localIP) - pn.config.os = getOsName() - go func() { - req := ReportBasic{ - Mac: pn.config.mac, - LanIP: pn.config.localIP, - OS: pn.config.os, - HasIPv4: pn.config.hasIPv4, - HasUPNPorNATPMP: pn.config.hasUPNPorNATPMP, - Version: OpenP2PVersion, - } - rsp := netInfo() - gLog.Println(LvDEBUG, "netinfo:", rsp) - if rsp != nil && rsp.Country != "" { - if IsIPv6(rsp.IP.String()) { - gConf.setIPv6(rsp.IP.String()) - } - req.NetInfo = *rsp - } else { - pn.refreshIPv6(true) - } - req.IPv6 = gConf.IPv6() - pn.write(MsgReport, MsgReportBasic, &req) - }() - go pn.autorunApp() - gLog.Println(LvDEBUG, "P2PNetwork init ok") - break + // detect nat type + pn.config.publicIP, pn.config.natType, pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, err = getNATType(pn.config.ServerHost, pn.config.UDPPort1, pn.config.UDPPort2) + // for testcase + if strings.Contains(pn.config.Node, "openp2pS2STest") { + pn.config.natType = NATSymmetric + pn.config.hasIPv4 = 0 + pn.config.hasUPNPorNATPMP = 0 + gLog.Println(LvINFO, "openp2pS2STest debug") + } + if strings.Contains(pn.config.Node, "openp2pC2CTest") { + pn.config.natType = NATCone + pn.config.hasIPv4 = 0 + pn.config.hasUPNPorNATPMP = 0 + gLog.Println(LvINFO, "openp2pC2CTest debug") } if err != nil { - // init failed, retry - pn.restartCh <- true - gLog.Println(LvERROR, "P2PNetwork init error:", err) + gLog.Println(LvDEBUG, "detect NAT type error:", err) + return err } - return err + if pn.config.hasIPv4 == 1 || pn.config.hasUPNPorNATPMP == 1 { + onceV4Listener.Do(func() { + v4l = &v4Listener{port: gConf.Network.TCPPort} + go v4l.start() + }) + } + gLog.Printf(LvINFO, "hasIPv4:%d, UPNP:%d, NAT type:%d, publicIP:%s", pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, pn.config.natType, pn.config.publicIP) + gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort) + uri := "/api/v1/login" + caCertPool, err := x509.SystemCertPool() + if err != nil { + gLog.Println(LvERROR, "Failed to load system root CAs:", err) + } else { + caCertPool = x509.NewCertPool() + } + caCertPool.AppendCertsFromPEM([]byte(rootCA)) + caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1)) + config := tls.Config{ + RootCAs: caCertPool, + InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert + websocket.DefaultDialer.TLSClientConfig = &config + websocket.DefaultDialer.HandshakeTimeout = ClientAPITimeout + u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri} + q := u.Query() + q.Add("node", pn.config.Node) + q.Add("token", fmt.Sprintf("%d", pn.config.Token)) + q.Add("version", OpenP2PVersion) + q.Add("nattype", fmt.Sprintf("%d", pn.config.natType)) + q.Add("sharebandwidth", fmt.Sprintf("%d", pn.config.ShareBandwidth)) + u.RawQuery = q.Encode() + var ws *websocket.Conn + ws, _, err = websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + gLog.Println(LvERROR, "Dial error:", err) + return err + } + pn.online = true + pn.conn = ws + localAddr := strings.Split(ws.LocalAddr().String(), ":") + if len(localAddr) == 2 { + pn.config.localIP = localAddr[0] + } else { + return errors.New("get local ip failed") + } + go pn.readLoop() + pn.config.mac = getmac(pn.config.localIP) + pn.config.os = getOsName() + go func() { + req := ReportBasic{ + Mac: pn.config.mac, + LanIP: pn.config.localIP, + OS: pn.config.os, + HasIPv4: pn.config.hasIPv4, + HasUPNPorNATPMP: pn.config.hasUPNPorNATPMP, + Version: OpenP2PVersion, + } + rsp := netInfo() + gLog.Println(LvDEBUG, "netinfo:", rsp) + if rsp != nil && rsp.Country != "" { + if IsIPv6(rsp.IP.String()) { + gConf.setIPv6(rsp.IP.String()) + } + req.NetInfo = *rsp + } else { + pn.refreshIPv6(true) + } + req.IPv6 = gConf.IPv6() + pn.write(MsgReport, MsgReportBasic, &req) + }() + go pn.autorunApp() + gLog.Println(LvDEBUG, "P2PNetwork init ok") + return nil } func (pn *P2PNetwork) handleMessage(t int, msg []byte) { @@ -645,7 +644,6 @@ func (pn *P2PNetwork) readLoop() { t, msg, err := pn.conn.ReadMessage() if err != nil { gLog.Printf(LvERROR, "P2PNetwork read error:%s", err) - pn.conn.Close() pn.restartCh <- true break } @@ -666,7 +664,7 @@ func (pn *P2PNetwork) write(mainType uint16, subType uint16, packet interface{}) defer pn.writeMtx.Unlock() if err = pn.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil { gLog.Printf(LvERROR, "write msgType %d,%d error:%s", mainType, subType, err) - pn.conn.Close() + pn.restartCh <- true } return err } @@ -712,7 +710,7 @@ func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error defer pn.writeMtx.Unlock() if err = pn.conn.WriteMessage(websocket.BinaryMessage, pushMsg); err != nil { gLog.Printf(LvERROR, "push to %s error:%s", to, err) - pn.conn.Close() + pn.restartCh <- true } return err } From 20811049abaf4c9f56200883877d517978b487aa Mon Sep 17 00:00:00 2001 From: W192547975 <138874985+W192547975@users.noreply.github.com> Date: Sat, 18 Nov 2023 15:21:27 +0800 Subject: [PATCH 2/2] Update p2pnetwork.go --- core/p2pnetwork.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/p2pnetwork.go b/core/p2pnetwork.go index b679b75..76c46b8 100644 --- a/core/p2pnetwork.go +++ b/core/p2pnetwork.go @@ -78,7 +78,7 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { if config != nil { instance.config = *config } - instance.doInit() + instance.tryInit() go instance.run() go func() { for { @@ -92,8 +92,9 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { return instance } -func (pn *P2PNetwork) doInit() { - if pn.init() != nil { +func (pn *P2PNetwork) tryInit() { + err := pn.init() + if err != nil { // init failed, retry pn.restartCh <- true gLog.Println(LvERROR, "P2PNetwork init error:", err) @@ -111,13 +112,13 @@ func (pn *P2PNetwork) run() { pn.write(MsgHeartbeat, 0, "") case <-pn.restartCh: pn.online = false + pn.wgReconnect.Wait() // wait read/autorunapp goroutine end + time.Sleep(ClientAPITimeout) if pn.conn != nil { pn.conn.Close() pn.conn = nil } - pn.wgReconnect.Wait() // wait read/autorunapp goroutine end - time.Sleep(ClientAPITimeout) - pn.doInit() + pn.tryInit() } } }