Skip to content

Commit

Permalink
refactor: refactor tcpproxy (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Mar 18, 2024
1 parent 3ccbd6b commit 7349ce2
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 27 deletions.
9 changes: 8 additions & 1 deletion cmd/containerhelper/handlers/tcpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net"
"os"
"strings"
"time"
)

Expand All @@ -27,7 +28,13 @@ func tcpProxyHandler(_ string, _ *model.Resp) error {
}

func tcpProxyHandler0(ctx context.Context, addr string, idleTimeout time.Duration, in io.Reader, out io.Writer) error {
conn, err := net.DialTimeout("tcp", addr, defaultDialTimeout)
var conn net.Conn
var err error
if strings.HasSuffix(addr, ".sock") {
conn, err = net.DialTimeout("unix", addr, defaultDialTimeout)
} else {
conn, err = net.DialTimeout("tcp", addr, defaultDialTimeout)
}
if err != nil {
return err
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/cri/cricore/nsenter_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
package cricore

import (
"errors"
"context"
"github.com/traas-stack/holoinsight-agent/pkg/core"
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -49,23 +49,24 @@ func NsEnterAndRunCodes(nsFile string, callback func()) error {
return err2
}

func NsEnterDial(c *cri.Container, network, addr string, dialTimeout time.Duration) (net.Conn, error) {
if c.NetworkMode == "host" {
return net.DialTimeout(network, addr, dialTimeout)
}

func NsEnterContainerAndRunCodes(c *cri.Container, callback func()) error {
if strings.HasPrefix(c.NetworkMode, "netns:") {
netNsFile := filepath.Join(core.GetHostfs(), c.NetworkMode[len("netns:"):])
var conn net.Conn
var err error
err2 := NsEnterAndRunCodes(netNsFile, func() {
conn, err = net.DialTimeout(network, addr, dialTimeout)
})
if err == nil {
err = err2
}
return conn, err
return NsEnterAndRunCodes(netNsFile, callback)
}

return nil, errors.New("invalid NetworkMode: " + c.NetworkMode)
callback()
return nil
}

func NsEnterDial(ctx context.Context, c *cri.Container, network, addr string, dialTimeout time.Duration) (net.Conn, error) {
var conn net.Conn
var err error
err2 := NsEnterContainerAndRunCodes(c, func() {
conn, err = (&net.Dialer{Timeout: dialTimeout}).DialContext(ctx, network, addr)
})
if err == nil {
err = err2
}
return conn, err
}
30 changes: 24 additions & 6 deletions pkg/cri/criutils/tcpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,40 @@ import (
"time"
)

const (
delayCancel = 500 * time.Millisecond
)

// TcpProxy
func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error) {
if c.Runtime == cri.Runc {
sandbox := c.Pod.Sandbox
if sandbox != nil {
conn, err := cricore.NsEnterDial(c, "tcp", addr, dialTimeout)
conn, err := cricore.NsEnterDial(ctx, c, "tcp", addr, dialTimeout)
logger.Infozc(ctx, "[netproxy] runtime is runc, use nsenter", zap.Error(err))
return conn, err
}
}
return TcpProxyByExec(ctx, i, c, addr)
}

func TcpProxyByExec(ctx context.Context, i cri.Interface, c *cri.Container, addr string) (net.Conn, error) {
pin, pout := io.Pipe()
ctx2, cancel := context.WithCancel(ctx)

// Normally, the stream ends before exec ends.
// If execCtx has been canceled at this time, exec will die from kill.
// Although there is no actual loss (because the stream has been read) this will result in an error.
// So we have to delay cancel execCtx
execCtx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
time.AfterFunc(delayCancel, cancel)
case <-execCtx.Done():
}
}()
logger.Infozc(ctx, "[netproxy] use cri exec")
ear, err := i.ExecAsync(ctx2, c, cri.ExecRequest{
ear, err := i.ExecAsync(execCtx, c, cri.ExecRequest{
Cmd: []string{core.HelperToolPath, "tcpProxy"},
Env: []string{"TCPPROXY_ADDR=" + addr, "TCPPROXY_IDLE_TIMEOUT=180s", "NO_JSON_OUTPUT=true"},
Input: pin,
Expand All @@ -53,7 +72,7 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin
var rc cri.ExecAsyncResultCode
hasResult := false
select {
case <-ctx2.Done():
case <-execCtx.Done():
case rc = <-ear.Result:
hasResult = true
}
Expand All @@ -70,11 +89,10 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin
Reader: ear.Stdout,
Writer: pout,
CloseFunc: func() {
// TODO uses a more deterministic strategy
// If we cancel ctx immediately, then the bottom layer has a certain probability to return <-ctx.Done() instead of <-ear.Result, and there is competition here.
//We prefer to leave the opportunity to <-ear.Result, so here is an appropriate delay of 100ms.
// cancel happens before ear.Result
time.AfterFunc(100*time.Millisecond, cancel)
time.AfterFunc(delayCancel, cancel)
},
}, nil
}
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/port_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (t *PortForwardTask) Start(ctx context.Context) (string, error) {
func handlePortForwardRequest(logCtx zap.Option, biz *cri.Container, conn net.Conn, addr string) {
defer conn.Close()

subConn, err := tcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout)
subConn, err := TcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/proxy_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func createNsEnterHttpClient(pod *cri.Pod) (*http.Transport, *http.Client, error
if d, ok := ctx.Deadline(); ok {
timeout = d.Sub(time.Now())
}
return cricore.NsEnterDial(pod.Sandbox, network, addr, timeout)
return cricore.NsEnterDial(ctx, pod.Sandbox, network, addr, timeout)
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cri/impl/netproxy/proxy_socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (h *CriHandle) tcpHandle(s *socks5.Server, c *net.TCPConn, r *socks5.Reques

logCtx := zap.Fields(zap.String("uuid", uuid2), zap.String("protocol", "socks5"), zap.String("cid", biz.ShortContainerID()), zap.String("addr", addr))

proxied, err := tcpProxy(logger.WithLogCtx(context.Background(), logCtx), h.Cri, biz, addr, DefaultDialTimeout)
proxied, err := TcpProxy(logger.WithLogCtx(context.Background(), logCtx), h.Cri, biz, addr, DefaultDialTimeout)
if err != nil {
logger.Infozo(logCtx, "[netproxy] create tcperror error", zap.Error(err))
a, addr, port, _ := socks5.ParseAddress(Socks5ProxyAddr)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (h *CriHandle) UDPHandle(s *socks5.Server, addr *net.UDPAddr, d *socks5.Dat
return errors.New("unsupported")
}

func tcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error) {
func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error) {
for _, handler := range tcpHandlers {
if conn, err := handler(ctx, i, c, addr, dialTimeout); conn != nil && err == nil {
return conn, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,7 @@ func ReplaceHost(hostport string, host string) string {
}
return host
}

func ReplaceHostToLocalhost(hostport string) string {
return ReplaceHost(hostport, "127.0.0.1")
}

0 comments on commit 7349ce2

Please sign in to comment.