Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Podman final #13

Open
wants to merge 26 commits into
base: hackathon
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Outline container impl
  • Loading branch information
hexfusion committed Jun 23, 2023
commit bb1100e1b3e7e57877a35db3e511174edab62073
48 changes: 48 additions & 0 deletions vms/rpcchainvm/runtime/container/initializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package container

import (
"context"
"fmt"
"sync"

"github.com/ava-labs/avalanchego/version"
"github.com/ava-labs/avalanchego/vms/rpcchainvm/runtime"
)

var _ runtime.Initializer = (*initializer)(nil)

// Subprocess VM Runtime intializer.
type initializer struct {
once sync.Once
// Address of the RPC Chain VM server
vmAddr string
// Error, if one occurred, during Initialization
err error
// Initialized is closed once Initialize is called
initialized chan struct{}
}

func newInitializer() *initializer {
return &initializer{
initialized: make(chan struct{}),
}
}

func (i *initializer) Initialize(_ context.Context, protocolVersion uint, vmAddr string) error {
i.once.Do(func() {
if version.RPCChainVMProtocol != protocolVersion {
i.err = fmt.Errorf(
"%w avalanchego: %d, vm: %d",
runtime.ErrProtocolVersionMismatch,
version.RPCChainVMProtocol,
protocolVersion,
)
}
i.vmAddr = vmAddr
close(i.initialized)
})
return i.err
}
106 changes: 106 additions & 0 deletions vms/rpcchainvm/runtime/container/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package container

import (
"context"
"fmt"
"io"
"net"
"os"
"strings"
"time"

"go.uber.org/zap"

"github.com/ava-labs/avalanchego/vms/rpcchainvm/grpcutils"
"github.com/ava-labs/avalanchego/vms/rpcchainvm/gruntime"
"github.com/ava-labs/avalanchego/vms/rpcchainvm/runtime"
"github.com/ava-labs/avalanchego/utils/logging"


pb "github.com/ava-labs/avalanchego/proto/pb/vm/runtime"
)

type Config struct {
// Stderr of the VM process written to this writer.
Stderr io.Writer
// Stdout of the VM process written to this writer.
Stdout io.Writer
// Duration engine server will wait for handshake success.
HandshakeTimeout time.Duration
Log logging.Logger
}

type Status struct {
// Id of the process.
Pid int
// Address of the VM gRPC service.
Addr string
}

// Bootstrap starts a VM as a subprocess after initialization completes and
// pipes the IO to the appropriate writers.
//
// The subprocess is expected to be stopped by the caller if a non-nil error is
// returned. If piping the IO fails then the subprocess will be stopped.
//
// TODO: create the listener inside this method once we refactor the tests
func Bootstrap(
ctx context.Context,
listener net.Listener,
config *Config,
) (*Status, runtime.Stopper, error) {
defer listener.Close()

intitializer := newInitializer()

server := grpcutils.NewServer()
defer server.GracefulStop()
pb.RegisterRuntimeServer(server, gruntime.NewServer(intitializer))

go grpcutils.Serve(listener, server)

serverAddr := listener.Addr()

// set pod ENV
// cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", runtime.EngineAddressKey, serverAddr.String()))
// pass golang debug env to subprocess
for _, env := range os.Environ() {
if strings.HasPrefix(env, "GRPC_") || strings.HasPrefix(env, "GODEBUG") {
}
}

// start container


// fix stopper
// stopper := NewStopper(log, cmd)

// wait for handshake success
timeout := time.NewTimer(config.HandshakeTimeout)
defer timeout.Stop()

select {
case <-intitializer.initialized:
case <-timeout.C:
stopper.Stop(ctx)
return nil, nil, fmt.Errorf("%w: %v", runtime.ErrHandshakeFailed, runtime.ErrProcessNotFound)
}

if intitializer.err != nil {
stopper.Stop(ctx)
return nil, nil, fmt.Errorf("%w: %v", runtime.ErrHandshakeFailed, intitializer.err)
}

log.Info("plugin handshake succeeded",
zap.String("addr", intitializer.vmAddr),
)

status := &Status{
Pid: cmd.Process.Pid,
Addr: intitializer.vmAddr,
}
return status, stopper, nil
}
70 changes: 70 additions & 0 deletions vms/rpcchainvm/runtime/container/stopper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package container

import (
"context"
"os/exec"
"sync"
"syscall"

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/vms/rpcchainvm/runtime"
"go.uber.org/zap"
)

func NewStopper(logger logging.Logger, cmd *exec.Cmd) runtime.Stopper {
return &stopper{
cmd: cmd,
logger: logger,
}
}

type stopper struct {
once sync.Once
cmd *exec.Cmd
logger logging.Logger
}

func (s *stopper) Stop(ctx context.Context) {
s.once.Do(func() {
stop(ctx, s.logger, s.cmd)
})
}


func stop(ctx context.Context, log logging.Logger, cmd *exec.Cmd) {
waitChan := make(chan error)
go func() {
// attempt graceful shutdown
errs := wrappers.Errs{}
err := cmd.Process.Signal(syscall.SIGTERM)
errs.Add(err)
_, err = cmd.Process.Wait()
errs.Add(err)
waitChan <- errs.Err
close(waitChan)
}()

ctx, cancel := context.WithTimeout(ctx, runtime.DefaultGracefulTimeout)
defer cancel()

select {
case err := <-waitChan:
if err == nil {
log.Debug("subprocess gracefully shutdown")
} else {
log.Error("subprocess graceful shutdown failed",
zap.Error(err),
)
}
case <-ctx.Done():
// force kill
err := cmd.Process.Kill()
log.Error("subprocess was killed",
zap.Error(err),
)
}
}