Skip to content

Commit

Permalink
HostVolumePlugin interface and implementations
Browse files Browse the repository at this point in the history
* mkdir: HostVolumePluginMkdir: just creates a directory
* example-host-volume: HostVolumePluginExternal:
  mkfs and mount loopback
  • Loading branch information
gulducat committed Nov 19, 2024
1 parent d3c286a commit 21083e5
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 96 deletions.
12 changes: 9 additions & 3 deletions client/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@ func (v *HostVolume) Create(
return nil
}

func (v *HostVolume) Delete(req *cstructs.ClientHostVolumeDeleteRequest, resp *cstructs.ClientHostVolumeDeleteResponse) error {
func (v *HostVolume) Delete(
req *cstructs.ClientHostVolumeDeleteRequest,
resp *cstructs.ClientHostVolumeDeleteResponse) error {
defer metrics.MeasureSince([]string{"client", "host_volume", "create"}, time.Now())
_, cancelFn := v.requestContext()
ctx, cancelFn := v.requestContext()
defer cancelFn()

// TODO(db): call into Client's host volume manager to delete the volume here
_, err := v.c.hostVolumeManager.Delete(ctx, req) // TODO(db): cresp is empty... why return it?
if err != nil {
v.c.logger.Debug("failed to delete host volume", "ID", req.ID, "error", err)
return err
}

v.c.logger.Debug("deleted host volume", "id", req.ID, "path", req.HostPath)
return nil
Expand Down
165 changes: 165 additions & 0 deletions client/hostvolumemanager/host_volume_plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package hostvolumemanager

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"

"github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
)

type HostVolumePlugin interface {
Version(ctx context.Context) (string, error) // TODO(db): semver?
Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*hostVolumePluginCreateResponse, error)
Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error
// TODO(db): update? resize? ??
}

type hostVolumePluginCreateResponse struct {
Path string `json:"path"`
SizeBytes int64 `json:"bytes"`
Context map[string]string `json:"context"` // metadata
}

var _ HostVolumePlugin = &HostVolumePluginMkdir{}

type HostVolumePluginMkdir struct {
ID string
TargetPath string

log hclog.Logger
}

func (p *HostVolumePluginMkdir) Version(_ context.Context) (string, error) {
return "0.0.1", nil
}

func (p *HostVolumePluginMkdir) Create(_ context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*hostVolumePluginCreateResponse, error) {

path := filepath.Join(p.TargetPath, req.ID)
p.log.Debug("CREATE: default host volume plugin", "target_path", path)

err := os.Mkdir(path, 0o700)
if err != nil {
return nil, err
}

return &hostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
Context: map[string]string{},
}, nil
}

func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
path := filepath.Join(p.TargetPath, req.ID)
p.log.Debug("DELETE: default host volume plugin", "target_path", path)
return os.RemoveAll(path)
}

var _ HostVolumePlugin = &HostVolumePluginExternal{}

type HostVolumePluginExternal struct {
ID string
Executable string
TargetPath string

log hclog.Logger
}

func (p *HostVolumePluginExternal) Version(_ context.Context) (string, error) {
return "0.0.1", nil
}

func (p *HostVolumePluginExternal) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*hostVolumePluginCreateResponse, error) {

stdin, err := json.Marshal(req)
if err != nil {
return nil, err
}

stdout, _, err := p.runPlugin(ctx, "create", req.ID, stdin, []string{
"NODE_ID=" + req.NodeID,
"VOLUME_NAME=" + req.Name,
fmt.Sprintf("CAPACITY_MIN_BYTES=%d", req.RequestedCapacityMinBytes),
fmt.Sprintf("CAPACITY_MAX_BYTES=%d", req.RequestedCapacityMaxBytes),
})
if err != nil {
return nil, fmt.Errorf("error creating volume %q with plugin %q: %w", req.ID, req.PluginID, err)
}

var pluginResp hostVolumePluginCreateResponse
err = json.Unmarshal(stdout, &pluginResp)
if err != nil {
return nil, err
}
return &pluginResp, nil
}

func (p *HostVolumePluginExternal) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) error {

stdin, err := json.Marshal(req)
if err != nil {
return err
}

_, _, err = p.runPlugin(ctx, "delete", req.ID, stdin, []string{})
if err != nil {
return fmt.Errorf("error deleting volume %q with plugin %q: %w", req.ID, req.PluginID, err)
}
return nil
}

func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, op, volID string,
stdin []byte, env []string) (stdout, stderr []byte, err error) {

p.log.Debug("running host volume plugin",
"operation", op,
"volume_id", volID,
"stdin", string(stdin),
)

path := filepath.Join(p.TargetPath, volID)
// set up plugin execution
cmd := exec.CommandContext(ctx, p.Executable, op, path)

// plugins may read the full json from stdin,
cmd.Stdin = bytes.NewReader(stdin)
// and/or environment variables for basic features.
cmd.Env = append([]string{
"OPERATION=" + op,
"HOST_PATH=" + path,
}, env...)

var errBuf bytes.Buffer
cmd.Stderr = io.Writer(&errBuf) // TODO(db): surely a better way to capture stderr...

// run the command and capture output
stdout, err = cmd.Output()
stderr, _ = io.ReadAll(&errBuf)

logArgs := []any{
"operation", op,
"volume_id", volID,
"stdout", string(stdout),
"stderr", string(stderr),
}

if err != nil {
logArgs = append(logArgs, []any{"err", err}...)
p.log.Error("error with plugin", logArgs...)
return stdout, stderr, err
} else {
p.log.Debug("plugin ran", logArgs...)
}
return stdout, stderr, nil
}
146 changes: 53 additions & 93 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
package hostvolumemanager

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"sync"

"github.com/hashicorp/go-hclog"
Expand All @@ -17,48 +11,65 @@ import (
)

type HostVolumeManager struct {
log hclog.Logger
sharedMountDir string

pluginsLock sync.Mutex
plugins map[string]hostVolumePluginFunc // todo: sync.map?
log hclog.Logger
plugins *sync.Map
}

func NewHostVolumeManager(sharedMountDir string, logger hclog.Logger) *HostVolumeManager {

log := logger.Named("host_volumes")

return &HostVolumeManager{
log: log,
sharedMountDir: sharedMountDir,
plugins: map[string]hostVolumePluginFunc{
// TODO(db): discover plugins on disk
// TODO: how do we define the external mounter plugins? plugin configs?
// note that these can't be in the usual go-plugins directory
"default-mounter": newDefaultHostVolumePluginFn(log, sharedMountDir),
"example-host-volume": newExternalHostVolumePluginFn(
log, sharedMountDir, "/opt/nomad/hostvolumeplugins/example-host-volume"),
},
mgr := &HostVolumeManager{
log: log,
plugins: &sync.Map{},
}
// TODO(db): discover plugins on disk, need a new plugin dir
// TODO: how do we define the external mounter plugins? plugin configs?
mgr.setPlugin("mkdir", &HostVolumePluginMkdir{
ID: "mkdir",
TargetPath: sharedMountDir,
log: log.With("plugin_id", "mkdir"),
})
mgr.setPlugin("example-host-volume", &HostVolumePluginExternal{
ID: "example-host-volume",
Executable: "/opt/nomad/hostvolumeplugins/example-host-volume",
TargetPath: sharedMountDir,
log: log.With("plugin_id", "example-host-volume"),
})
return mgr
}

// TODO(db): fingerprint elsewhere / on sighup, and SetPlugin from afar?
func (hvm *HostVolumeManager) setPlugin(id string, plug HostVolumePlugin) {
hvm.plugins.Store(id, plug)
}

func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, bool) {
obj, ok := hvm.plugins.Load(id)
if !ok {
return nil, false
}
return obj.(HostVolumePlugin), true
}

func (hvm *HostVolumeManager) Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) {
hvm.pluginsLock.Lock()
pluginFn, ok := hvm.plugins[req.PluginID]
hvm.pluginsLock.Unlock()
func (hvm *HostVolumeManager) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) {

plug, ok := hvm.getPlugin(req.PluginID)
if !ok {
return nil, fmt.Errorf("no such plugin %q", req.PluginID)
}

volID := uuid.Generate()
pluginResp, err := pluginFn(ctx, volID, req)
if req.ID == "" {
req.ID = uuid.Generate()
}
pluginResp, err := plug.Create(ctx, req)
if err != nil {
return nil, err
}

resp := &cstructs.ClientHostVolumeCreateResponse{
HostPath: pluginResp.Path,
CapacityBytes: pluginResp.SizeInMB,
CapacityBytes: pluginResp.SizeBytes,
}

// TODO(db): now we need to add it to the node fingerprint!
Expand All @@ -67,73 +78,22 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, req *cstructs.ClientHo
return resp, nil
}

type hostVolumePluginFunc func(ctx context.Context, id string, req *cstructs.ClientHostVolumeCreateRequest) (*hostVolumePluginResponse, error)
func (hvm *HostVolumeManager) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) {

type hostVolumePluginResponse struct {
Path string `json:"path"`
SizeInMB int64 `json:"size"` // TODO(db): why MB here, when before it was bytes?
Context map[string]string `json:"context"` // metadata
}

func newExternalHostVolumePluginFn(log hclog.Logger, sharedMountDir, executablePath string) hostVolumePluginFunc {
return func(ctx context.Context, id string, req *cstructs.ClientHostVolumeCreateRequest) (*hostVolumePluginResponse, error) {

buf, err := json.Marshal(req)
if err != nil {
return nil, err
}

log.Trace("external host volume plugin", "req", string(buf))

stdin := bytes.NewReader(buf)
targetPath := filepath.Join(sharedMountDir, id)

cmd := exec.CommandContext(ctx, executablePath, targetPath)
cmd.Stdin = stdin
// plugins may read the full json from stdin, and/or environment
// variables for basic features.
cmd.Env = []string{
fmt.Sprintf("Path=%s", sharedMountDir),
fmt.Sprintf("NodeID=%s", req.NodeID),
fmt.Sprintf("Plugin=%s", req.Plugin),
fmt.Sprintf("Name=%s", req.Name),
fmt.Sprintf("CapacityMinBytes=%d", req.CapacityMin),
fmt.Sprintf("CapacityMaxBytes=%d", req.CapacityMax),
}

var stderr bytes.Buffer
cmd.Stderr = io.Writer(&stderr)
outBuf, err := cmd.Output()
if err != nil {
out, _ := io.ReadAll(&stderr)
return nil, fmt.Errorf("hostvolume plugin failed: %v, %v", err, string(out))
}

var pluginResp hostVolumePluginResponse
err = json.Unmarshal(outBuf, &pluginResp)
if err != nil {
return nil, err
}

return &pluginResp, nil
plug, ok := hvm.getPlugin(req.PluginID)
if !ok {
return nil, fmt.Errorf("no such plugin %q", req.PluginID)
}
}

func newDefaultHostVolumePluginFn(log hclog.Logger, sharedMountDir string) hostVolumePluginFunc {
return func(ctx context.Context, id string, req *cstructs.ClientHostVolumeCreateRequest) (*hostVolumePluginResponse, error) {
err := plug.Delete(ctx, req)
if err != nil {
return nil, err
}

targetPath := filepath.Join(sharedMountDir, id)
log.Trace("default host volume plugin", "target_path", targetPath)
resp := &cstructs.ClientHostVolumeDeleteResponse{}

err := os.Mkdir(targetPath, 0o700)
if err != nil {
return nil, err
}
// TODO(db): save the client state!

return &hostVolumePluginResponse{
Path: targetPath,
SizeInMB: 0,
Context: map[string]string{},
}, nil
}
return resp, nil
}
5 changes: 5 additions & 0 deletions client/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ type ClientHostVolumeDeleteRequest struct {
// ID is a UUID-like string generated by the server.
ID string

// PluginID is the name of the host volume plugin on the client that will be
// used for deleting the volume. If omitted, the client will use its default
// built-in plugin.
PluginID string

// NodeID is the node where the volume is placed. It's included in the
// client RPC request so that the server can route the request to the
// correct node.
Expand Down
1 change: 1 addition & 0 deletions nomad/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ func (v *HostVolume) deleteVolume(vol *structs.HostVolume) error {
method := "ClientHostVolume.Delete"
cReq := &cstructs.ClientHostVolumeDeleteRequest{
ID: vol.ID,
PluginID: vol.PluginID,
NodeID: vol.NodeID,
HostPath: vol.HostPath,
Parameters: vol.Parameters,
Expand Down

0 comments on commit 21083e5

Please sign in to comment.