Skip to content

Commit 3ed7c71

Browse files
committed
Merge pull request #37 from buildboxhq/killing-processes
Improving cancelling processes
2 parents 0261f72 + e73bcb9 commit 3ed7c71

File tree

2 files changed

+167
-62
lines changed

2 files changed

+167
-62
lines changed

buildbox/job.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ type Job struct {
2727

2828
FinishedAt string `json:"finished_at,omitempty"`
2929

30+
// If the job is currently being cancelled
31+
cancelled bool
32+
3033
// The currently running process of the job
3134
process *Process
3235
}
@@ -72,9 +75,17 @@ func (c *Client) JobUpdate(job *Job) (*Job, error) {
7275
}
7376

7477
func (j *Job) Kill() error {
75-
if j.process != nil {
78+
if j.cancelled {
79+
// Already cancelled
80+
} else {
7681
Logger.Infof("Cancelling job %s", j.ID)
77-
j.process.Kill()
82+
j.cancelled = true
83+
84+
if j.process != nil {
85+
j.process.Kill()
86+
} else {
87+
Logger.Errorf("No process to kill")
88+
}
7889
}
7990

8091
return nil
@@ -101,7 +112,7 @@ func (j *Job) Run(agent *Agent) error {
101112

102113
// This callback is called every second the build is running. This lets
103114
// us do a lazy-person's method of streaming data to Buildbox.
104-
callback := func(process Process) {
115+
callback := func(process *Process) {
105116
j.Output = process.Output
106117

107118
// Post the update to the API
@@ -116,15 +127,16 @@ func (j *Job) Run(agent *Agent) error {
116127
}
117128
}
118129

119-
// Run the bootstrap script
130+
// Initialze our process to run
120131
scriptPath := path.Dir(agent.BootstrapScript)
121132
scriptName := path.Base(agent.BootstrapScript)
122-
process, err := RunScript(scriptPath, scriptName, env, callback)
133+
process := InitProcess(scriptPath, scriptName, env, callback)
123134

124-
// Store the process for later
135+
// Store the process so we can cancel it later.
125136
j.process = process
126137

127-
// Did the process succeed?
138+
// Start the process. This will block until it finishes.
139+
err = process.Start()
128140
if err == nil {
129141
// Store the final output
130142
j.Output = j.process.Output

buildbox/script.go

Lines changed: 148 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
package buildbox
22

3+
// Logic for this file is largely based on:
4+
// https://github.com/jarib/childprocess/blob/783f7a00a1678b5d929062564ef5ae76822dfd62/lib/childprocess/unix/process.rb
5+
36
import (
47
"bytes"
8+
"errors"
59
"fmt"
610
"github.com/kr/pty"
711
"io"
812
"os"
913
"os/exec"
1014
"path"
1115
"path/filepath"
12-
"regexp"
1316
"sync"
17+
"syscall"
1418
"time"
1519
)
1620

@@ -20,55 +24,63 @@ type Process struct {
2024
Running bool
2125
ExitStatus string
2226
command *exec.Cmd
27+
callback func(*Process)
2328
}
2429

2530
// Implement the Stringer thingy
2631
func (p Process) String() string {
2732
return fmt.Sprintf("Process{Pid: %d, Running: %t, ExitStatus: %s}", p.Pid, p.Running, p.ExitStatus)
2833
}
2934

30-
func (p Process) Kill() error {
31-
return p.command.Process.Kill()
32-
}
33-
34-
func RunScript(dir string, script string, env []string, callback func(Process)) (*Process, error) {
35+
func InitProcess(dir string, script string, env []string, callback func(*Process)) *Process {
3536
// Create a new instance of our process struct
3637
var process Process
3738

3839
// Find the script to run
3940
absoluteDir, _ := filepath.Abs(dir)
4041
pathToScript := path.Join(absoluteDir, script)
4142

42-
Logger.Infof("Starting to run script `%s` from inside %s", script, absoluteDir)
43-
4443
process.command = exec.Command(pathToScript)
4544
process.command.Dir = absoluteDir
4645

46+
// Children of the forked process will inherit its process group
47+
// This is to make sure that all grandchildren dies when this Process instance is killed
48+
process.command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
49+
4750
// Copy the current processes ENV and merge in the
4851
// new ones. We do this so the sub process gets PATH
4952
// and stuff.
5053
// TODO: Is this correct?
5154
currentEnv := os.Environ()
5255
process.command.Env = append(currentEnv, env...)
5356

57+
// Set the callback
58+
process.callback = callback
59+
60+
return &process
61+
}
62+
63+
func (p *Process) Start() error {
64+
Logger.Infof("Starting to run script: %s", p.command.Path)
65+
5466
// Start our process
55-
pty, err := pty.Start(process.command)
67+
pty, err := pty.Start(p.command)
5668
if err != nil {
5769
// The process essentially failed, so we'll just make up
5870
// and exit status.
59-
process.ExitStatus = "1"
71+
p.ExitStatus = "1"
6072

61-
return &process, err
73+
return err
6274
}
6375

64-
process.Pid = process.command.Process.Pid
65-
process.Running = true
76+
p.Pid = p.command.Process.Pid
77+
p.Running = true
6678

67-
Logger.Infof("Process is running with PID: %d", process.Pid)
79+
Logger.Infof("Process is running with PID: %d", p.Pid)
6880

6981
var buffer bytes.Buffer
70-
var w sync.WaitGroup
71-
w.Add(2)
82+
var waitGroup sync.WaitGroup
83+
waitGroup.Add(2)
7284

7385
go func() {
7486
Logger.Debug("Starting to copy PTY to the buffer")
@@ -82,86 +94,167 @@ func RunScript(dir string, script string, env []string, callback func(Process))
8294
Logger.Debug("io.Copy finsihed")
8395
}
8496

85-
w.Done()
97+
waitGroup.Done()
8698
}()
8799

88100
go func() {
89-
for process.Running {
101+
for p.Running {
90102
Logger.Debug("Copying buffer to the process output")
91103

92104
// Convert the stdout buffer to a string
93-
process.Output = buffer.String()
105+
p.Output = buffer.String()
94106

95107
// Call the callback and pass in our process object
96-
callback(process)
108+
p.callback(p)
97109

98110
// Sleep for 1 second
99111
time.Sleep(1000 * time.Millisecond)
100112
}
101113

102114
Logger.Debug("Finished routine that copies the buffer to the process output")
103115

104-
w.Done()
116+
waitGroup.Done()
105117
}()
106118

107119
// Wait until the process has finished. The returned error is nil if the command runs,
108120
// has no problems copying stdin, stdout, and stderr, and exits with a zero exit status.
109-
waitResult := process.command.Wait()
121+
waitResult := p.command.Wait()
110122

111123
// The process is no longer running at this point
112-
process.Running = false
124+
p.Running = false
113125

114-
// Determine the exit status (if waitResult is an error, that means that the process
115-
// returned a non zero exit status)
116-
if waitResult != nil {
117-
if werr, ok := waitResult.(*exec.ExitError); ok {
118-
// This returns a string like: `exit status 123`
119-
exitString := werr.Error()
120-
exitStringRegex := regexp.MustCompile(`([0-9]+)$`)
126+
// Find the exit status of the script
127+
p.ExitStatus = getExitStatus(waitResult)
121128

122-
if exitStringRegex.MatchString(exitString) {
123-
process.ExitStatus = exitStringRegex.FindString(exitString)
124-
} else {
125-
Logger.Errorf("Weird looking exit status: %s", exitString)
129+
Logger.Debugf("Process with PID: %d finished with Exit Status: %s", p.Pid, p.ExitStatus)
126130

127-
// If the exit status isn't what I'm looking for, provide a generic one.
128-
process.ExitStatus = "-1"
131+
// Sometimes (in docker containers) io.Copy never seems to finish. This is a mega
132+
// hack around it. If it doesn't finish after 1 second, just continue.
133+
Logger.Debug("Waiting for io.Copy and incremental output to finish")
134+
err = timeoutWait(&waitGroup)
135+
if err != nil {
136+
Logger.Errorf("Timed out waiting for wait group: (%T: %v)", err, err)
137+
}
138+
139+
// Copy the final output back to the process
140+
p.Output = buffer.String()
141+
142+
// No error occured so we can return nil
143+
return nil
144+
}
145+
146+
func (p *Process) Kill() error {
147+
// Send a sigterm
148+
err := p.signal(syscall.SIGTERM)
149+
if err != nil {
150+
return err
151+
}
152+
153+
// Make a chanel that we'll use as a timeout
154+
c := make(chan int, 1)
155+
checking := true
156+
157+
// Start a routine that checks to see if the process
158+
// is still alive.
159+
go func() {
160+
for checking {
161+
Logger.Debugf("Checking to see if PID: %d is still alive", p.Pid)
162+
163+
foundProcess, err := os.FindProcess(p.Pid)
164+
165+
// Can't find the process at all
166+
if err != nil {
167+
Logger.Debugf("Could not find process with PID: %d", p.Pid)
168+
169+
break
129170
}
130-
} else {
131-
Logger.Errorf("Could not determine exit status. %T: %v", waitResult, waitResult)
132171

133-
// Not sure what to provide as an exit status if one couldn't be determined.
134-
process.ExitStatus = "-1"
172+
// We have some information about the procss
173+
if foundProcess != nil {
174+
processState, err := foundProcess.Wait()
175+
176+
if err != nil || processState.Exited() {
177+
Logger.Debugf("Process with PID: %d has exited.", p.Pid)
178+
179+
break
180+
}
181+
}
182+
183+
// Retry in a moment
184+
sleepTime := time.Duration(1 * time.Second)
185+
time.Sleep(sleepTime)
186+
}
187+
188+
c <- 1
189+
}()
190+
191+
// Timeout this process after 3 seconds
192+
select {
193+
case _ = <-c:
194+
// Was successfully terminated
195+
case <-time.After(5 * time.Second):
196+
// Stop checking in the routine above
197+
checking = false
198+
199+
// Forcefully kill the thing
200+
err = p.signal(syscall.SIGKILL)
201+
202+
if err != nil {
203+
return err
204+
}
205+
}
206+
207+
return nil
208+
}
209+
210+
func (p *Process) signal(sig os.Signal) error {
211+
Logger.Debugf("Sending signal: %s to PID: %d", sig.String(), p.Pid)
212+
213+
err := p.command.Process.Signal(syscall.SIGTERM)
214+
if err != nil {
215+
Logger.Errorf("Failed to send signal: %s to PID: %d (%T: %v)", sig.String(), p.Pid, err, err)
216+
return err
217+
}
218+
219+
return nil
220+
}
221+
222+
// https://github.com/hnakamur/commango/blob/fe42b1cf82bf536ce7e24dceaef6656002e03743/os/executil/executil.go#L29
223+
// TODO: Can this be better?
224+
func getExitStatus(waitResult error) string {
225+
exitStatus := -1
226+
227+
if waitResult != nil {
228+
if err, ok := waitResult.(*exec.ExitError); ok {
229+
if s, ok := err.Sys().(syscall.WaitStatus); ok {
230+
exitStatus = s.ExitStatus()
231+
} else {
232+
Logger.Error("Unimplemented for system where exec.ExitError.Sys() is not syscall.WaitStatus.")
233+
}
135234
}
136235
} else {
137-
process.ExitStatus = "0"
236+
exitStatus = 0
138237
}
139238

140-
Logger.Debugf("Process with PID: %d finished with Exit Status: %s", process.Pid, process.ExitStatus)
239+
return fmt.Sprintf("%d", exitStatus)
240+
}
141241

242+
func timeoutWait(waitGroup *sync.WaitGroup) error {
142243
// Make a chanel that we'll use as a timeout
143244
c := make(chan int, 1)
144245

145246
// Start waiting for the routines to finish
146-
Logger.Debug("Waiting for io.Copy and incremental output to finish")
147247
go func() {
148-
w.Wait()
248+
waitGroup.Wait()
149249
c <- 1
150250
}()
151251

152-
// Sometimes (in docker containers) io.Copy never seems to finish. This is a mega
153-
// hack around it. If it doesn't finish after 1 second, just continue.
154-
// TODO: Whyyyyy!?!?!?
155252
select {
156253
case _ = <-c:
157-
// nothing, wait finished fine.
158-
case <-time.After(1 * time.Second):
159-
Logger.Error("Timed out waiting for the routines to finish. Forcefully moving on.")
254+
return nil
255+
case <-time.After(3 * time.Second):
256+
return errors.New("Timeout")
160257
}
161258

162-
// Copy the final output back to the process
163-
process.Output = buffer.String()
164-
165-
// No error occured so we can return nil
166-
return &process, nil
259+
return nil
167260
}

0 commit comments

Comments
 (0)