Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 104 additions & 105 deletions core/p2pnetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
if config != nil {
instance.config = *config
}
instance.init()
instance.tryInit()
go instance.run()
go func() {
for {
Expand All @@ -92,6 +92,15 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork {
return instance
}

func (pn *P2PNetwork) tryInit() {
err := pn.init()
if err != nil {
// init failed, retry
pn.restartCh <- true
gLog.Println(LvERROR, "P2PNetwork init error:", err)
}
}

func (pn *P2PNetwork) run() {
heartbeatTimer := time.NewTicker(NetworkHeartbeatTime)
pn.t1 = time.Now().UnixNano()
Expand All @@ -105,10 +114,11 @@ func (pn *P2PNetwork) run() {
pn.online = false
pn.wgReconnect.Wait() // wait read/autorunapp goroutine end
time.Sleep(ClientAPITimeout)
err := pn.init()
if err != nil {
gLog.Println(LvERROR, "P2PNetwork init error:", err)
if pn.conn != nil {
pn.conn.Close()
pn.conn = nil
}
pn.tryInit()
}
}
}
Expand Down Expand Up @@ -471,107 +481,97 @@ func (pn *P2PNetwork) init() error {
pn.wgReconnect.Add(1)
defer pn.wgReconnect.Done()
var err error
for {
// detect nat type
pn.config.publicIP, pn.config.natType, pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, err = getNATType(pn.config.ServerHost, pn.config.UDPPort1, pn.config.UDPPort2)
// for testcase
if strings.Contains(pn.config.Node, "openp2pS2STest") {
pn.config.natType = NATSymmetric
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pS2STest debug")

}
if strings.Contains(pn.config.Node, "openp2pC2CTest") {
pn.config.natType = NATCone
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pC2CTest debug")
}
if err != nil {
gLog.Println(LvDEBUG, "detect NAT type error:", err)
break
}
if pn.config.hasIPv4 == 1 || pn.config.hasUPNPorNATPMP == 1 {
onceV4Listener.Do(func() {
v4l = &v4Listener{port: gConf.Network.TCPPort}
go v4l.start()
})
}
gLog.Printf(LvINFO, "hasIPv4:%d, UPNP:%d, NAT type:%d, publicIP:%s", pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, pn.config.natType, pn.config.publicIP)
gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort)
uri := "/api/v1/login"
caCertPool, err := x509.SystemCertPool()
if err != nil {
gLog.Println(LvERROR, "Failed to load system root CAs:", err)
} else {
caCertPool = x509.NewCertPool()
}
caCertPool.AppendCertsFromPEM([]byte(rootCA))
caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1))
config := tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
websocket.DefaultDialer.TLSClientConfig = &config
websocket.DefaultDialer.HandshakeTimeout = ClientAPITimeout
u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri}
q := u.Query()
q.Add("node", pn.config.Node)
q.Add("token", fmt.Sprintf("%d", pn.config.Token))
q.Add("version", OpenP2PVersion)
q.Add("nattype", fmt.Sprintf("%d", pn.config.natType))
q.Add("sharebandwidth", fmt.Sprintf("%d", pn.config.ShareBandwidth))
u.RawQuery = q.Encode()
var ws *websocket.Conn
ws, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
gLog.Println(LvERROR, "Dial error:", err)
break
}
pn.online = true
pn.conn = ws
localAddr := strings.Split(ws.LocalAddr().String(), ":")
if len(localAddr) == 2 {
pn.config.localIP = localAddr[0]
} else {
err = errors.New("get local ip failed")
break
}
go pn.readLoop()
pn.config.mac = getmac(pn.config.localIP)
pn.config.os = getOsName()
go func() {
req := ReportBasic{
Mac: pn.config.mac,
LanIP: pn.config.localIP,
OS: pn.config.os,
HasIPv4: pn.config.hasIPv4,
HasUPNPorNATPMP: pn.config.hasUPNPorNATPMP,
Version: OpenP2PVersion,
}
rsp := netInfo()
gLog.Println(LvDEBUG, "netinfo:", rsp)
if rsp != nil && rsp.Country != "" {
if IsIPv6(rsp.IP.String()) {
gConf.setIPv6(rsp.IP.String())
}
req.NetInfo = *rsp
} else {
pn.refreshIPv6(true)
}
req.IPv6 = gConf.IPv6()
pn.write(MsgReport, MsgReportBasic, &req)
}()
go pn.autorunApp()
gLog.Println(LvDEBUG, "P2PNetwork init ok")
break
// detect nat type
pn.config.publicIP, pn.config.natType, pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, err = getNATType(pn.config.ServerHost, pn.config.UDPPort1, pn.config.UDPPort2)
// for testcase
if strings.Contains(pn.config.Node, "openp2pS2STest") {
pn.config.natType = NATSymmetric
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pS2STest debug")
}
if strings.Contains(pn.config.Node, "openp2pC2CTest") {
pn.config.natType = NATCone
pn.config.hasIPv4 = 0
pn.config.hasUPNPorNATPMP = 0
gLog.Println(LvINFO, "openp2pC2CTest debug")
}
if err != nil {
// init failed, retry
pn.restartCh <- true
gLog.Println(LvERROR, "P2PNetwork init error:", err)
gLog.Println(LvDEBUG, "detect NAT type error:", err)
return err
}
return err
if pn.config.hasIPv4 == 1 || pn.config.hasUPNPorNATPMP == 1 {
onceV4Listener.Do(func() {
v4l = &v4Listener{port: gConf.Network.TCPPort}
go v4l.start()
})
}
gLog.Printf(LvINFO, "hasIPv4:%d, UPNP:%d, NAT type:%d, publicIP:%s", pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, pn.config.natType, pn.config.publicIP)
gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort)
uri := "/api/v1/login"
caCertPool, err := x509.SystemCertPool()
if err != nil {
gLog.Println(LvERROR, "Failed to load system root CAs:", err)
} else {
caCertPool = x509.NewCertPool()
}
caCertPool.AppendCertsFromPEM([]byte(rootCA))
caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1))
config := tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert
websocket.DefaultDialer.TLSClientConfig = &config
websocket.DefaultDialer.HandshakeTimeout = ClientAPITimeout
u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri}
q := u.Query()
q.Add("node", pn.config.Node)
q.Add("token", fmt.Sprintf("%d", pn.config.Token))
q.Add("version", OpenP2PVersion)
q.Add("nattype", fmt.Sprintf("%d", pn.config.natType))
q.Add("sharebandwidth", fmt.Sprintf("%d", pn.config.ShareBandwidth))
u.RawQuery = q.Encode()
var ws *websocket.Conn
ws, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
gLog.Println(LvERROR, "Dial error:", err)
return err
}
pn.online = true
pn.conn = ws
localAddr := strings.Split(ws.LocalAddr().String(), ":")
if len(localAddr) == 2 {
pn.config.localIP = localAddr[0]
} else {
return errors.New("get local ip failed")
}
go pn.readLoop()
pn.config.mac = getmac(pn.config.localIP)
pn.config.os = getOsName()
go func() {
req := ReportBasic{
Mac: pn.config.mac,
LanIP: pn.config.localIP,
OS: pn.config.os,
HasIPv4: pn.config.hasIPv4,
HasUPNPorNATPMP: pn.config.hasUPNPorNATPMP,
Version: OpenP2PVersion,
}
rsp := netInfo()
gLog.Println(LvDEBUG, "netinfo:", rsp)
if rsp != nil && rsp.Country != "" {
if IsIPv6(rsp.IP.String()) {
gConf.setIPv6(rsp.IP.String())
}
req.NetInfo = *rsp
} else {
pn.refreshIPv6(true)
}
req.IPv6 = gConf.IPv6()
pn.write(MsgReport, MsgReportBasic, &req)
}()
go pn.autorunApp()
gLog.Println(LvDEBUG, "P2PNetwork init ok")
return nil
}

func (pn *P2PNetwork) handleMessage(t int, msg []byte) {
Expand Down Expand Up @@ -645,7 +645,6 @@ func (pn *P2PNetwork) readLoop() {
t, msg, err := pn.conn.ReadMessage()
if err != nil {
gLog.Printf(LvERROR, "P2PNetwork read error:%s", err)
pn.conn.Close()
pn.restartCh <- true
break
}
Expand All @@ -666,7 +665,7 @@ func (pn *P2PNetwork) write(mainType uint16, subType uint16, packet interface{})
defer pn.writeMtx.Unlock()
if err = pn.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
gLog.Printf(LvERROR, "write msgType %d,%d error:%s", mainType, subType, err)
pn.conn.Close()
pn.restartCh <- true
}
return err
}
Expand Down Expand Up @@ -712,7 +711,7 @@ func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error
defer pn.writeMtx.Unlock()
if err = pn.conn.WriteMessage(websocket.BinaryMessage, pushMsg); err != nil {
gLog.Printf(LvERROR, "push to %s error:%s", to, err)
pn.conn.Close()
pn.restartCh <- true
}
return err
}
Expand Down