Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix docker exec stdout EOF missing #93

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/containerhelper/handlers/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ func init() {
model.RegisterHandler("inputProxy", inputProxyHandler)
model.RegisterHandler("httpProxy", httpProxyHandler)
model.RegisterHandler("tcpProxy", tcpProxyHandler)
model.RegisterHandler("fixout", fixOutHandler)
}
52 changes: 52 additions & 0 deletions cmd/containerhelper/handlers/fixout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package handlers

import (
fixout2 "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/handlers/fixout"
"github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model"
"os"
"os/exec"
)

// fixOutHandler will run another process and encode the stdout/stderr of that process into fixOutHandler's stdout.
func fixOutHandler(action string, resp *model.Resp) error {
// build cmd
cmd := exec.Command(os.Args[2], os.Args[3:]...)
cmd.Stdin = os.Stdin
stdoutr, err := cmd.StdoutPipe()
if err != nil {
return err
}
stderrr, err := cmd.StderrPipe()
if err != nil {
return err
}

if err := cmd.Start(); err != nil {
return err
}

errChan := make(chan error, 2)
// encode cmd's stdout and stderr into os.Stdout
go fixout2.CopyStream(fixout2.StdoutFd, stdoutr, errChan)
go fixout2.CopyStream(fixout2.StderrFd, stderrr, errChan)

wait := 2
loop:
for {
select {
case <-errChan:
wait--
if wait == 0 {
cmd.Wait()
// done
break loop
}
}
}

return nil
}
11 changes: 11 additions & 0 deletions cmd/containerhelper/handlers/fixout/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package fixout

const (
StdoutFd = 1
StderrFd = 2
bufSize = 4096
)
45 changes: 45 additions & 0 deletions cmd/containerhelper/handlers/fixout/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package fixout

import (
"encoding/binary"
"io"
)

func Decode(hr *io.PipeReader, stdoutW io.WriteCloser, stderrW io.WriteCloser) error {
var err error
activeFdCount := 2
for activeFdCount > 0 {
var fd byte
var size int16
if err = binary.Read(hr, binary.LittleEndian, &fd); err != nil {
break
}
if err = binary.Read(hr, binary.LittleEndian, &size); err != nil {
break
}
if size == -1 {
activeFdCount--
switch fd {
case StdoutFd:
stdoutW.Close()
case StderrFd:
stderrW.Close()
}
continue
}
switch fd {
case StdoutFd:
_, err = io.CopyN(stdoutW, hr, int64(size))
case StderrFd:
_, err = io.CopyN(stderrW, hr, int64(size))
}
if err != nil {
return err
}
}
return nil
}
71 changes: 71 additions & 0 deletions cmd/containerhelper/handlers/fixout/encode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package fixout

import (
"encoding/binary"
"io"
"os"
"sync"
)

var (
writeLock sync.Mutex
)

func write(fd int, payload []byte) error {
writeLock.Lock()
defer writeLock.Unlock()

// fd 1 byte
// size 2 bytes
// payload <size> bytes (optional)
if err := binary.Write(os.Stdout, binary.LittleEndian, byte(fd)); err != nil {
return err
}
if err := binary.Write(os.Stdout, binary.LittleEndian, int16(len(payload))); err != nil {
return err
}
return binary.Write(os.Stdout, binary.LittleEndian, payload)
}

func writeClose(fd int) error {
writeLock.Lock()
defer writeLock.Unlock()

// fd 1 byte
// -1 2 bytes (const)
if err := binary.Write(os.Stdout, binary.LittleEndian, byte(fd)); err != nil {
return err
}
return binary.Write(os.Stdout, binary.LittleEndian, int16(-1))
}

// copyStream reads bytes from in, and encodes bytes into os.Stdout
func CopyStream(fd int, in io.Reader, errChan chan error) {
buf := make([]byte, bufSize)
for {
n, err := in.Read(buf)
var err2 error
if n > 0 {
err2 = write(fd, buf[:n])
}
if err == io.EOF {
if err2 == nil {
err2 = writeClose(fd)
}
}
if err == nil {
err = err2
}
if err != nil {
errChan <- err
if err != io.EOF {
io.Copy(io.Discard, in)
}
break
}
}
}
3 changes: 2 additions & 1 deletion pkg/cri/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type (
WorkingDir string `json:"workingDir"`
Input io.Reader
// User is the user passed to docker exec, defaults to 'root'
User string
User string
FixOut bool
}
)

Expand Down
21 changes: 13 additions & 8 deletions pkg/cri/criutils/tcpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,26 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin
if c.Runtime == cri.Runc {
sandbox := c.Pod.Sandbox
if sandbox != nil {
return cricore.NsEnterDial(c, "tcp", addr, dialTimeout)
conn, err := cricore.NsEnterDial(c, "tcp", addr, dialTimeout)
logger.Infozc(ctx, "[netproxy] runtime is runc, use nsenter", zap.Error(err))
return conn, err
}
}

pin, pout := io.Pipe()
ctx2, cancel := context.WithCancel(ctx)
logger.Infozc(ctx, "[netproxy] use cri exec")
ear, err := i.ExecAsync(ctx2, c, cri.ExecRequest{
Cmd: []string{core.HelperToolPath, "tcpProxy"},
Env: []string{"TCPPROXY_ADDR=" + addr, "TCPPROXY_IDLE_TIMEOUT=180s", "NO_JSON_OUTPUT=true"},
Input: pin,
Cmd: []string{core.HelperToolPath, "tcpProxy"},
Env: []string{"TCPPROXY_ADDR=" + addr, "TCPPROXY_IDLE_TIMEOUT=180s", "NO_JSON_OUTPUT=true"},
Input: pin,
FixOut: true,
})
if err != nil {
cancel()
pout.CloseWithError(err)
return nil, err
}

stderrCh := make(chan string, 1)
go func() {
bs, _ := io.ReadAll(ear.Stderr)
Expand All @@ -47,28 +50,30 @@ func TcpProxy(ctx context.Context, i cri.Interface, c *cri.Container, addr strin

go func() {
defer cancel()

var rc cri.ExecAsyncResultCode
hasResult := false
select {
case <-ctx.Done():
case <-ctx2.Done():
case rc = <-ear.Result:
hasResult = true
}

if !hasResult {
rc = <-ear.Result
}
stderr := <-stderrCh
logger.Infozc(ctx, "[netproxy] tcpproxy exec finished", zap.Int("code", rc.Code), zap.String("stderr", stderr), zap.Error(rc.Err))
logger.Infozc(ctx, "[netproxy] cri exec finished", zap.Int("code", rc.Code), zap.String("stderr", stderr), zap.Error(rc.Err))
pout.CloseWithError(rc.Err)
}()

return &util.ReadWriterConn{
Reader: ear.Stdout,
Writer: pout,
CloseFunc: func() {
// TODO uses a more deterministic strategy
// If we cancel ctx immediately, then the bottom layer has a certain probability to return <-ctx.Done() instead of <-ear.Result, and there is competition here.
//We prefer to leave the opportunity to <-ear.Result, so here is an appropriate delay of 100ms.
// cancel happens before ear.Result
time.AfterFunc(100*time.Millisecond, cancel)
},
}, nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/cri/impl/default_cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,5 +956,11 @@ func (e *defaultCri) ExecAsync(ctx context.Context, c *cri.Container, req cri.Ex
if req.User == "" {
req.User = defaultExecUser
}
logger.Criz("[digest] exec async",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("runtime", c.Runtime),
zap.Strings("cmd", req.Cmd),
zap.Strings("env", req.Env))
return e.engine.ExecAsync(ctx, c, req)
}
40 changes: 32 additions & 8 deletions pkg/cri/impl/engine/docker_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/docker/docker/api/types"
dockersdk "github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/traas-stack/holoinsight-agent/cmd/containerhelper/handlers/fixout"
"github.com/traas-stack/holoinsight-agent/pkg/core"
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"github.com/traas-stack/holoinsight-agent/pkg/cri/dockerutils"
"github.com/traas-stack/holoinsight-agent/pkg/k8s/k8slabels"
Expand Down Expand Up @@ -223,6 +225,10 @@ func (e *DockerContainerEngine) Supports(feature cri.ContainerEngineFeature) boo

func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container, req cri.ExecRequest) (cri.ExecAsyncResult, error) {
resultCh := make(chan cri.ExecAsyncResultCode)
hackedCmd := req.Cmd
if req.FixOut {
hackedCmd = append([]string{core.HelperToolPath, "fixout"}, req.Cmd...)
}
invalidResult := cri.ExecAsyncResult{Cmd: strings.Join(req.Cmd, " "), Result: resultCh}
create, err := e.Client.ContainerExecCreate(ctx, c.Id, types.ExecConfig{
User: req.User,
Expand All @@ -235,7 +241,7 @@ func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container,
DetachKeys: "",
Env: req.Env,
WorkingDir: req.WorkingDir,
Cmd: req.Cmd,
Cmd: hackedCmd,
})
if err != nil {
return invalidResult, err
Expand Down Expand Up @@ -263,14 +269,32 @@ func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container,

respReaderDone := false
go func() {
_, err := stdcopy.StdCopy(stdoutW, stderrW, resp.Reader)
respReaderDone = true
if err != nil && err != io.EOF {
stdoutW.CloseWithError(err)
stderrW.CloseWithError(err)
if req.FixOut {
hr, hw := io.Pipe()
go func() {
// decode docker resp to stdout and stderr
// stderr is useless wo put it into Discard
stdcopy.StdCopy(hw, io.Discard, resp.Reader)
hw.Close()
}()
err := fixout.Decode(hr, stdoutW, stderrW)
respReaderDone = true
if err != nil && err != io.EOF {
stdoutW.CloseWithError(err)
stderrW.CloseWithError(err)
}
errCh <- err
io.Copy(io.Discard, hr)
} else {
_, err := stdcopy.StdCopy(stdoutW, stderrW, resp.Reader)
respReaderDone = true
if err != nil && err != io.EOF {
stdoutW.CloseWithError(err)
stderrW.CloseWithError(err)
}
errCh <- err
io.Copy(io.Discard, resp.Reader)
}
errCh <- err
io.Copy(io.Discard, resp.Reader)
}()

wait := 2
Expand Down
13 changes: 10 additions & 3 deletions pkg/cri/impl/netproxy/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package netproxy
import (
"context"
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"net"
"net/http"
"time"
)
Expand All @@ -17,13 +18,19 @@ const (
)

type (
Handler func(ctx context.Context, pod *cri.Pod, req *http.Request) (*http.Request, *http.Response, error)
HttpHandler func(ctx context.Context, pod *cri.Pod, req *http.Request) (*http.Request, *http.Response, error)
TcpHandler func(ctx context.Context, i cri.Interface, c *cri.Container, addr string, dialTimeout time.Duration) (net.Conn, error)
)

var (
handlers []Handler
handlers []HttpHandler
tcpHandlers []TcpHandler
)

func AddHttpProxyHandler(handler Handler) {
func AddHttpProxyHandler(handler HttpHandler) {
handlers = append(handlers, handler)
}

func AddTcpProxyHandler(handler TcpHandler) {
tcpHandlers = append(tcpHandlers, handler)
}
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/port_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (t *PortForwardTask) Start(ctx context.Context) (string, error) {
func handlePortForwardRequest(logCtx zap.Option, biz *cri.Container, conn net.Conn, addr string) {
defer conn.Close()

subConn, err := criutils.TcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout)
subConn, err := tcpProxy(logger.WithLogCtx(context.Background(), logCtx), ioc.Crii, biz, addr, DefaultDialTimeout)
if err != nil {
panic(err)
}
Expand Down
Loading
Loading