Skip to content

Commit

Permalink
Fixing lb connection not lb'ing
Browse files Browse the repository at this point in the history
  • Loading branch information
dariopb committed Aug 21, 2020
1 parent d4d373c commit e820ac5
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 17 deletions.
20 changes: 17 additions & 3 deletions pkg/muxtunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"net"
"sync"
"time"
Expand Down Expand Up @@ -38,6 +39,8 @@ type TunnelFrontendServices struct {

// NewMuxTunnelService creates a new tunnel service on the port using the passed cert
func NewMuxTunnelService(cert tls.Certificate, servicePort int, token string, dynport int, dymportcount int) (*MuxTunnelService, error) {
rand.Seed(time.Now().UnixNano())

ts := MuxTunnelService{
port: servicePort,
token: token,
Expand Down Expand Up @@ -303,8 +306,19 @@ func (ts *MuxTunnelService) doProxy(serviceName string, conn net.Conn) {
var session *yamux.Session
var id string

// Pick a random backend
l := len(frontend.backendConnMap)
if l == 0 {
log.Errorf("frontend [%s:%s from %s] couldn't find any backend endpoint to proxy to", serviceName, conn.LocalAddr().String(), conn.RemoteAddr().String())
return
}

i := rand.Intn(l)
for id, session = range frontend.backendConnMap {
break
if i == 0 {
break
}
i = i - 1
}

if id == "" {
Expand All @@ -331,7 +345,7 @@ func (ts *MuxTunnelService) doProxy(serviceName string, conn net.Conn) {
}

// Send the first leg that I saved
l, err := backConn.Write((b[:n]))
l, err = backConn.Write((b[:n]))
if l != n || err != nil {
ts.mtx.Lock()
backConn.Close()
Expand Down Expand Up @@ -368,7 +382,7 @@ func (ts *MuxTunnelService) connectBackend(session *yamux.Session, conn net.Conn

tunnelConnect := TunnelConnecData{
ServiceName: serviceName,
SourceAddress: session.RemoteAddr().String(),
SourceAddress: conn.RemoteAddr().String(),
}
err = sendSerializedObject(backConn, tunnelConnect)
if err != nil {
Expand Down
28 changes: 17 additions & 11 deletions pkg/muxtunnelClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"net"
"sync"
"time"
Expand All @@ -31,6 +32,7 @@ type MuxTunnelClient struct {
func NewMuxTunnelClient(apiEndpoint string, td TunnelData) (*MuxTunnelClient, error) {
log.Infof("NewTunnelClient to apiEndpoint [%s] with tunnel info: [%v]", apiEndpoint, td)

rand.Seed(time.Now().UnixNano())
roots := x509.NewCertPool()
//ok := roots.AppendCertsFromPEM([]byte(rootCert))
//if !ok {
Expand Down Expand Up @@ -229,11 +231,11 @@ loop:
}

func (tc *MuxTunnelClient) handleStream(session *yamux.Session, conn net.Conn) {
log.Infof("session: [%s], new stream connection from: %s", session.LocalAddr().String(), conn.LocalAddr().String())
log.Debugf("session: [%s], new stream connection from: %s", tc.tunnelData.ServiceName, conn.RemoteAddr().String())

defer func() {
conn.Close()
log.Infof("session: [%s], stream from [%s] ended", session.LocalAddr().String(), conn.LocalAddr().String())
log.Infof("session: [%s], stream from [%s] ended", tc.tunnelData.ServiceName, conn.RemoteAddr().String())
}()

b, payloadLen, err := readFrame(conn)
Expand All @@ -250,34 +252,38 @@ func (tc *MuxTunnelClient) handleStream(session *yamux.Session, conn net.Conn) {
return
}

log.Debugf("Got new backend connection info: [%v]", td)
log.Infof("session: [%s], new stream connection from: [%s] via [%s]", td.ServiceName, td.SourceAddress, conn.RemoteAddr().String())

if len(tc.tunnelData.TargetAddresses) > 0 {
err = tc.doProxy(conn)
if err == nil {
}
err = tc.doProxy(conn)
if err == nil {
}
}

func (tc *MuxTunnelClient) doProxy(conn net.Conn) error {
var addr string
tc.mtx.Lock()

// #DARIO TODO: do lb to the backends
addr = tc.tunnelData.TargetAddresses[0]
// Pick a random backend
l := len(tc.tunnelData.TargetAddresses)
if l == 0 {
tc.mtx.Unlock()
return nil
}

addr = tc.tunnelData.TargetAddresses[rand.Intn(l)]
tc.mtx.Unlock()

addr = fmt.Sprintf("%s:%d", addr, tc.tunnelData.TargetPort)
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
log.Errorf("failed to resolve server [%s]: %s", tcpAddr.String(), err.Error())
log.Errorf("session: [%s], failed to resolve server [%s]: %s", tc.tunnelData.ServiceName, tcpAddr.String(), err.Error())
return err
}

d := net.Dialer{Timeout: tc.dialerTimeout}
backConn, err := d.Dial("tcp", tcpAddr.String())
if err != nil {
log.Errorf("failed to connect to [%s]: %s", tcpAddr.String(), err.Error())
log.Errorf("session: [%s], failed to connect to [%s]: %s", tc.tunnelData.ServiceName, tcpAddr.String(), err.Error())
return err
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/restapi/statik/statik.go

Large diffs are not rendered by default.

0 comments on commit e820ac5

Please sign in to comment.