Skip to content

Commit

Permalink
Merge pull request #144 from ipfs/fix/stuff
Browse files Browse the repository at this point in the history
roundup of cleanup fixes
  • Loading branch information
Stebalien authored Feb 19, 2019
2 parents 4fc5f96 + affc206 commit ff61303
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 244 deletions.
7 changes: 0 additions & 7 deletions chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/ipfs/go-ipfs-cmdkit"
"github.com/ipfs/go-ipfs-cmds/debug"
)

func NewChanResponsePair(req *Request) (ResponseEmitter, Response) {
Expand Down Expand Up @@ -130,12 +129,6 @@ func (re *chanResponseEmitter) Emit(v interface{}) error {
re.wl.Lock()
defer re.wl.Unlock()

// Initially this library allowed commands to return errors by sending an
// error value along a stream. We removed that in favour of CloseWithError,
// so we want to make sure we catch situations where some code still uses the
// old error emitting semantics and _panic_ in those situations.
debug.AssertNotError(v)

// unblock Length()
select {
case <-re.waitLen:
Expand Down
120 changes: 48 additions & 72 deletions cli/responseemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,19 @@ import (
"syscall"

"github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs-cmds/debug"
)

var _ ResponseEmitter = &responseEmitter{}

func NewResponseEmitter(stdout, stderr io.Writer, req *cmds.Request) (cmds.ResponseEmitter, <-chan int, error) {
ch := make(chan int)
func NewResponseEmitter(stdout, stderr io.Writer, req *cmds.Request) (ResponseEmitter, error) {
encType, enc, err := cmds.GetEncoder(req, stdout, cmds.TextNewline)
if err != nil {
close(ch)
return nil, ch, err
}

return &responseEmitter{
stdout: stdout,
stderr: stderr,
encType: encType,
enc: enc,
ch: ch,
}, ch, err
}, err
}

// ResponseEmitter extends cmds.ResponseEmitter to give better control over the command line
Expand All @@ -37,7 +30,12 @@ type ResponseEmitter interface {

Stdout() io.Writer
Stderr() io.Writer
Exit(int)

// SetStatus sets the exit status for this command.
SetStatus(int)

// Status returns the exit status for the command.
Status() int
}

type responseEmitter struct {
Expand All @@ -50,8 +48,6 @@ type responseEmitter struct {
encType cmds.EncodingType
exit int
closed bool

ch chan<- int
}

func (re *responseEmitter) Type() cmds.PostRunType {
Expand All @@ -62,36 +58,6 @@ func (re *responseEmitter) SetLength(l uint64) {
re.length = l
}

func (re *responseEmitter) CloseWithError(err error) error {
var msg string
switch err {
case nil:
return re.Close()
case context.Canceled:
msg = "canceled"
case context.DeadlineExceeded:
msg = "timed out"
default:
msg = err.Error()
}

re.l.Lock()
defer re.l.Unlock()

if re.closed {
return cmds.ErrClosingClosedEmitter
}

re.exit = 1 // TODO we could let err carry an exit code

_, err = fmt.Fprintln(re.stderr, "Error:", msg)
if err != nil {
return err
}

return re.close()
}

func (re *responseEmitter) isClosed() bool {
re.l.Lock()
defer re.l.Unlock()
Expand All @@ -100,24 +66,39 @@ func (re *responseEmitter) isClosed() bool {
}

func (re *responseEmitter) Close() error {
return re.CloseWithError(nil)
}

func (re *responseEmitter) CloseWithError(err error) error {
re.l.Lock()
defer re.l.Unlock()

return re.close()
}

func (re *responseEmitter) close() error {
if re.closed {
return cmds.ErrClosingClosedEmitter
}
re.closed = true

var msg string
if err != nil {
if re.exit == 0 {
// Default "error" exit code.
re.exit = 1
}
switch err {
case context.Canceled:
msg = "canceled"
case context.DeadlineExceeded:
msg = "timed out"
default:
msg = err.Error()
}

re.ch <- re.exit
close(re.ch)
fmt.Fprintln(re.stderr, "Error:", msg)
}

defer func() {
re.stdout = nil
re.stderr = nil
re.closed = true
}()

// ignore error if the operating system doesn't support syncing std{out,err}
Expand All @@ -131,23 +112,19 @@ func (re *responseEmitter) close() error {
return false
}

var errStderr, errStdout error
if f, ok := re.stderr.(*os.File); ok {
err := f.Sync()
if err != nil {
if !ignoreError(err) {
return err
}
}
errStderr = f.Sync()
}
if f, ok := re.stdout.(*os.File); ok {
err := f.Sync()
if err != nil {
if !ignoreError(err) {
return err
}
}
errStdout = f.Sync()
}
if errStderr != nil && !ignoreError(errStderr) {
return errStderr
}
if errStdout != nil && !ignoreError(errStdout) {
return errStdout
}

return nil
}

Expand All @@ -159,12 +136,6 @@ func (re *responseEmitter) Emit(v interface{}) error {
isSingle = true
}

// Initially this library allowed commands to return errors by sending an
// error value along a stream. We removed that in favour of CloseWithError,
// so we want to make sure we catch situations where some code still uses the
// old error emitting semantics and _panic_ in those situations.
debug.AssertNotError(v)

// channel emission iteration
if ch, ok := v.(chan interface{}); ok {
v = (<-chan interface{})(ch)
Expand Down Expand Up @@ -220,11 +191,16 @@ func (re *responseEmitter) Stdout() io.Writer {
return re.stdout
}

// Exit sends code to the channel that was returned by NewResponseEmitter, so main() can pass it to os.Exit()
func (re *responseEmitter) Exit(code int) {
defer re.Close()

// SetStatus sets the exit status of the command.
func (re *responseEmitter) SetStatus(code int) {
re.l.Lock()
defer re.l.Unlock()
re.exit = code
}

// Status _returns_ the exit status of the command.
func (re *responseEmitter) Status() int {
re.l.Lock()
defer re.l.Unlock()
return re.exit
}
8 changes: 4 additions & 4 deletions cli/responseemitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ type tcCloseWithError struct {

func (tc tcCloseWithError) Run(t *testing.T) {
req := &cmds.Request{}
cmdsre, exitCh, err := NewResponseEmitter(tc.stdout, tc.stderr, req)
cmdsre, err := NewResponseEmitter(tc.stdout, tc.stderr, req)
if err != nil {
t.Fatal(err)
}

re := cmdsre.(ResponseEmitter)

go tc.f(re, t)
tc.f(re, t)

if exitCode := <-exitCh; exitCode != tc.exExit {
t.Fatalf("expected exit code %d, got %d", tc.exExit, exitCode)
if re.Status() != tc.exExit {
t.Fatalf("expected exit code %d, got %d", tc.exExit, re.Status())
}

if tc.stdout.String() != tc.exStdout {
Expand Down
33 changes: 10 additions & 23 deletions cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func Run(ctx context.Context, root *cmds.Command,
printHelp(false, stderr)
}

return err
return errParse
}

// here we handle the cases where
Expand Down Expand Up @@ -120,11 +120,6 @@ func Run(ctx context.Context, root *cmds.Command,
return err
}

var (
re cmds.ResponseEmitter
exitCh <-chan int
)

encTypeStr, _ := req.Options[cmds.EncLong].(string)
encType := cmds.EncodingType(encTypeStr)

Expand All @@ -133,23 +128,17 @@ func Run(ctx context.Context, root *cmds.Command,
req.Options[cmds.EncLong] = cmds.JSON
}

// first if condition checks the command's encoder map, second checks global encoder map (cmd vs. cmds)
re, exitCh, err = NewResponseEmitter(stdout, stderr, req)
re, err := NewResponseEmitter(stdout, stderr, req)
if err != nil {
printErr(err)
return err
}

errCh := make(chan error, 1)
go func() {
err := exctr.Execute(req, re, env)
if err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
// Execute the command.
err = exctr.Execute(req, re, env)
// If we get an error here, don't bother reading the status from the
// response emitter. It may not even be closed.
if err != nil {
printErr(err)

if kiterr, ok := err.(*cmdkit.Error); ok {
Expand All @@ -160,12 +149,10 @@ func Run(ctx context.Context, root *cmds.Command,
}

return err

case code := <-exitCh:
if code != 0 {
return ExitError(code)
}
}

if code := re.Status(); code != 0 {
return ExitError(code)
}
return nil
}
64 changes: 64 additions & 0 deletions cli/run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cli

import (
"context"
"os"
"testing"
"time"

"github.com/ipfs/go-ipfs-cmds"
)

var root = &cmds.Command{
Subcommands: map[string]*cmds.Command{
"test": &cmds.Command{
Run: func(req *cmds.Request, re cmds.ResponseEmitter, e cmds.Environment) error {
err := cmds.EmitOnce(re, 42)

time.Sleep(2 * time.Second)

e.(env).ch <- struct{}{}
return err
},
},
},
}

type env struct {
ch chan struct{}
}

func (e env) Context() context.Context {
return context.Background()
}

func TestRunWaits(t *testing.T) {
flag := make(chan struct{}, 1)

devnull, err := os.OpenFile(os.DevNull, os.O_RDWR, 0600)
if err != nil {
t.Fatal(err)
}
defer devnull.Close()

err = Run(
context.Background(),
root,
[]string{"test", "test"},
devnull, devnull, devnull,
func(ctx context.Context, req *cmds.Request) (cmds.Environment, error) {
return env{flag}, nil
},
func(req *cmds.Request, env interface{}) (cmds.Executor, error) {
return cmds.NewExecutor(req.Root), nil
},
)
if err != nil {
t.Fatal(err)
}
select {
case <-flag:
default:
t.Fatal("expected flag to be raised")
}
}
Loading

0 comments on commit ff61303

Please sign in to comment.