Skip to content

Commit

Permalink
Revert "[engine] Limit number of active subprocesses"
Browse files Browse the repository at this point in the history
This reverts commit 612779b.

Reason for revert: `shac check --all` is hanging in large repos: 
https://ci.chromium.org/b/8767597391211255425

Original change's description:
> [engine] Limit number of active subprocesses
>
> Previously the number of concurrent subprocess invocations launched by
> `ctx.os.exec()` was unbounded, which could place a strain on the system.
> Now there's effectively a pool of NumCPU+2 workers for running
> subprocesses.
>
> `ctx.os.exec()` returns immediately, but the underlying subprocess is
> started asynchronously.
>
> `ba -against main` showed no significant difference in the results of
> the `ctx.os.exec()` benchmarks.
>
> Change-Id: I76e4542249783c9a503f0f927e327e9f90f8bb04
> Reviewed-on: https://fuchsia-review.googlesource.com/c/shac-project/shac/+/867979
> Reviewed-by: Ina Huh <ihuh@google.com>
> Commit-Queue: Oliver Newman <olivernewman@google.com>

Change-Id: Icfd3611825b1995948c856170ddc353b7ebfb1eb
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://fuchsia-review.googlesource.com/c/shac-project/shac/+/929633
Fuchsia-Auto-Submit: Oliver Newman <olivernewman@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
Reviewed-by: RubberStamper 🤖 <android-build-ayeaye@system.gserviceaccount.com>
  • Loading branch information
orn688 authored and CQ Bot committed Oct 10, 2023
1 parent 612779b commit 0dae038
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 68 deletions.
10 changes: 1 addition & 9 deletions internal/engine/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"go.starlark.net/resolve"
"go.starlark.net/starlark"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/encoding/prototext"
)

Expand Down Expand Up @@ -365,8 +364,6 @@ func runInner(ctx context.Context, o *Options, tmpdir string) error {
packages: packages,
}

subprocessSem := semaphore.NewWeighted(int64(runtime.NumCPU()) + 2)

var vars map[string]string

newState := func(scm scmCheckout, subdir string, idx int) (*shacState, error) {
Expand Down Expand Up @@ -407,7 +404,6 @@ func runInner(ctx context.Context, o *Options, tmpdir string) error {
sandbox: sb,
scm: scm,
subdir: subdir,
subprocessSem: subprocessSem,
tmpdir: filepath.Join(tmpdir, strconv.Itoa(idx)),
writableRoot: doc.WritableRoot,
vars: vars,
Expand Down Expand Up @@ -468,7 +464,7 @@ func runInner(ctx context.Context, o *Options, tmpdir string) error {
if err != nil {
return err
}
shacStates = append(shacStates, state)
shacStates = []*shacState{state}
}

// Parse the starlark files. Run everything from our errgroup.
Expand All @@ -492,7 +488,6 @@ func runInner(ctx context.Context, o *Options, tmpdir string) error {
if cb == nil {
loop = false
} else {
// Actually run the check.
eg.Go(cb)
}
case <-done:
Expand Down Expand Up @@ -649,9 +644,6 @@ type shacState struct {
filter CheckFilter
passthroughEnv []*PassthroughEnv

// Limits the number of concurrent subprocesses launched by ctx.os.exec().
subprocessSem *semaphore.Weighted

// Set when fail() is called. This happens only during the first phase, thus
// no mutex is needed.
failErr *failure
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2273,7 +2273,7 @@ func TestTestDataPrint(t *testing.T) {
},
{
name: "ctx-os-exec-parallel.star",
want: strings.Repeat("[//ctx-os-exec-parallel.star:28] Hello, world\n", 1000),
want: strings.Repeat("[//ctx-os-exec-parallel.star:27] Hello, world\n", 10),
},
{
name: "ctx-os-exec-relpath.star",
Expand Down
75 changes: 22 additions & 53 deletions internal/engine/runtime_ctx_os.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type subprocess struct {
raiseOnFailure bool
okRetcodes []int
tempDir string
startErrs <-chan error

waitCalled bool
}
Expand Down Expand Up @@ -81,27 +80,15 @@ func (s *subprocess) AttrNames() []string {
return []string{"wait"}
}

func (s *subprocess) wait(state *shacState) (starlark.Value, error) {
func (s *subprocess) wait() (starlark.Value, error) {
if s.waitCalled {
return nil, fmt.Errorf("wait was already called")
}
s.waitCalled = true
val, err := s.waitInner(state)
if err2 := s.cleanup(); err == nil {
err = err2
}
return val, err
}

func (s *subprocess) waitInner(state *shacState) (starlark.Value, error) {
if err := <-s.startErrs; err != nil {
// If cmd.Start() failed the semaphore will already have been released,
// no need to release it.
return nil, err
}
defer s.cleanup()

err := s.cmd.Wait()
state.subprocessSem.Release(1)
retcode := 0
if err != nil {
var errExit *exec.ExitError
Expand Down Expand Up @@ -140,25 +127,16 @@ func (s *subprocess) waitInner(state *shacState) (starlark.Value, error) {
}

func (s *subprocess) cleanup() error {
// Wait for the subprocess to launch before trying to kill it. s.startErrs
// gets closed after the subprocess starts, so even if the error has already
// been received by `wait()`, this receive will return due to the channel
// being closed.
<-s.startErrs
// Kill the process before doing any other cleanup steps to ensure resources
// are no longer in use before cleaning them up.
var err error
if s.cmd.ProcessState == nil {
err = s.cmd.Process.Kill()
// Kill() is non-blocking, so it's necessary to wait for the process to
// exit before cleaning up resources.
_ = s.cmd.Wait()
}
// are no longer in use.
err := s.cmd.Process.Kill()
// Kill() doesn't block until the process actually completes, so we need to
// wait before cleaning up resources.
_ = s.cmd.Wait()

if err2 := os.RemoveAll(s.tempDir); err == nil {
err = err2
}

buffers.push(s.stdout)
buffers.push(s.stderr)
s.stdout, s.stderr = nil, nil
Expand All @@ -170,7 +148,7 @@ var subprocessWaitBuiltin = newBoundBuiltin("wait", func(ctx context.Context, s
if err := starlark.UnpackArgs(name, args, kwargs); err != nil {
return nil, err
}
return self.(*subprocess).wait(s)
return self.(*subprocess).wait()
})

// ctxOsExec implements the native function ctx.os.exec().
Expand Down Expand Up @@ -234,6 +212,15 @@ func ctxOsExec(ctx context.Context, s *shacState, name string, args starlark.Tup
return os.RemoveAll(tempDir)
})

stdout := buffers.get()
stderr := buffers.get()

cleanupFuncs = append(cleanupFuncs, func() error {
buffers.push(stdout)
buffers.push(stderr)
return nil
})

env := map[string]string{
"PATH": os.Getenv("PATH"),
"TEMP": tempDir,
Expand Down Expand Up @@ -400,31 +387,15 @@ func ctxOsExec(ctx context.Context, s *shacState, name string, args starlark.Tup

cmd := s.sandbox.Command(ctx, config)

stdout, stderr := buffers.get(), buffers.get()
cmd.Stdin = stdin
// TODO(olivernewman): Also handle commands that may output non-utf-8 bytes.
cmd.Stdout = stdout
cmd.Stderr = stderr
cmd.Stdin = stdin

startErrs := make(chan error, 1)
go func() {
// Signals to subprocess.cleanup() that starting the subprocess is done,
// whether or not it was successful.
defer close(startErrs)

err := s.subprocessSem.Acquire(ctx, 1)
if err != nil {
startErrs <- err
return
}
err = execsupport.Start(cmd)
if err != nil {
// Release early if the process failed to start, no point in
// delaying until wait() is called.
s.subprocessSem.Release(1)
}
startErrs <- err
}()
err = execsupport.Start(cmd)
if err != nil {
return nil, err
}

proc := &subprocess{
cmd: cmd,
Expand All @@ -434,9 +405,7 @@ func ctxOsExec(ctx context.Context, s *shacState, name string, args starlark.Tup
raiseOnFailure: bool(argraiseOnFailure),
okRetcodes: okRetcodes,
tempDir: tempDir,
startErrs: startErrs,
}

// Only clean up now if starting the subprocess failed; otherwise it will
// get cleaned up by wait().
cleanupFuncs = cleanupFuncs[:0]
Expand Down
7 changes: 3 additions & 4 deletions internal/engine/testdata/print/ctx-os-exec-parallel.star
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ def cb(ctx):
else:
cmd = ["./hello_world.sh"]

# Launch more parallel subprocesses than any realistic host machine will
# have cores (but not too many, or the test will be very slow).
num_procs = 1000
procs = [ctx.os.exec(cmd) for _ in range(num_procs)]
procs = []
for _ in range(10):
procs.append(ctx.os.exec(cmd))

for proc in procs:
res = proc.wait()
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var (
// Version is the current tool version.
//
// TODO(maruel): Add proper version, preferably from git tag.
Version = shacVersion{0, 1, 15}
Version = shacVersion{0, 1, 14}
)

func (v shacVersion) String() string {
Expand Down

0 comments on commit 0dae038

Please sign in to comment.