Skip to content

Commit

Permalink
Updated script logic with interface. Fixed bug where Jobstack was onl…
Browse files Browse the repository at this point in the history
…y running the last entry in the stack, repeated for it's length, due to incorrectly copied config struct.
  • Loading branch information
DiscoRiver committed Dec 12, 2021
1 parent 2a3964c commit 7125338
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 28 deletions.
21 changes: 7 additions & 14 deletions job.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
package massh

import (
"fmt"
"io/ioutil"
)

// Job is a single remote task config. For script files, use Job.SetLocalScript().
type Job struct {
Command string
Script []byte
ScriptArgs string
Command string
Script script
}

// SetCommand sets the Command value in Job. This is the Command executed over SSH to all hosts.
func (j *Job) SetCommand(command string) {
j.Command = command
}

// SetLocalScript reads a script file contents into the Job config.
func (j *Job) SetLocalScript(filename string, args string) error {
var err error
j.Script, err = ioutil.ReadFile(filename)
func (j *Job) SetScript(filePath string, args ...string) error {
s, err := newScript(filePath, args...)
if err != nil {
return fmt.Errorf("failed to read script file")
return err
}
j.ScriptArgs = args

j.Script = s

return nil
}
110 changes: 110 additions & 0 deletions script.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package massh

import (
"bytes"
"fmt"
"golang.org/x/crypto/ssh"
"io/ioutil"
"path/filepath"
"strings"
)

type script interface {
prepare(*ssh.Session)
getPreparedCommandString() string
getBytes() []byte
getArgs() string
}

type shell struct {
bytes []byte
args string

commandString string

prepared bool
}

type python struct {
bytes []byte
args string

commandString string

prepared bool
}

// NewScript creates a new script type based on the file extension. Shebang line in supported scripts must be present.
//
// Each element in args should ideally contain an argument's key/value, for example "--some-arg value", or "--some-arg=value".
func newScript(scriptFile string, args ...string) (script, error) {
scriptBytes, err := ioutil.ReadFile(scriptFile)
if err != nil {
return nil, fmt.Errorf("failed to read script file: %s", err)
}

// Check shebang is present
if scriptBytes[0] != '#' {
return nil, fmt.Errorf("shebang line not present in file %s", filepath.Base(scriptFile))
}

if strings.HasSuffix(scriptFile, ".sh") {
shellScript := &shell{
bytes: scriptBytes,
args: strings.Join(args, " "),
}
return shellScript, nil
}

if strings.HasSuffix(scriptFile, ".py") {
pythonScript := &python{
bytes: scriptBytes,
args: strings.Join(args, " "),
}
return pythonScript, nil
}

return nil, fmt.Errorf("script file %s not supported", filepath.Base(scriptFile))
}

// Prepare populated the SSH sessions's stdin with the script data, and returns a command string to run the script from a temporary file.
func (s *shell) prepare(session *ssh.Session) {
// Set up remote script
session.Stdin = bytes.NewReader(s.bytes)

s.commandString = fmt.Sprintf("cat > massh-script-tmp.sh && chmod +x ./massh-script-tmp.sh && ./massh-script-tmp.sh %s && rm ./massh-script-tmp.sh", s.args)
s.prepared = true
}

func (s *shell) getPreparedCommandString() string {
return s.commandString
}

func (s *shell) getBytes() []byte {
return s.bytes
}

func (s *shell) getArgs() string {
return s.args
}

// Prepare populated the SSH sessions's stdin with the script data, and returns a command string to run the script from a temporary file.
func (s *python) prepare(session *ssh.Session) {
// Set up remote script
session.Stdin = bytes.NewReader(s.bytes)

s.commandString = fmt.Sprintf("cat > massh-script-tmp.py && chmod +x ./massh-script-tmp.py && ./massh-script-tmp.py %s && rm ./massh-script-tmp.py", s.args)
s.prepared = true
}

func (s *python) getPreparedCommandString() string {
return s.commandString
}

func (s *python) getBytes() []byte {
return s.bytes
}

func (s *python) getArgs() string {
return s.args
}
45 changes: 31 additions & 14 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ type Result struct {
DoneChannel chan struct{}
}

// getJob determines the type of job and returns the command string
// getJob determines the type of job and returns the command string. If type is a local script, then stdin will be populated with the script data and sent/executed on the remote machine.
func getJob(s *ssh.Session, j *Job) string {
// Set up remote script
if j.Script != nil {
s.Stdin = bytes.NewReader(j.Script)
return fmt.Sprintf("cat > outfile.sh && chmod +x ./outfile.sh && ./outfile.sh %s && rm ./outfile.sh", j.ScriptArgs)
j.Script.prepare(s)
return j.Script.getPreparedCommandString()
}

return j.Command
Expand Down Expand Up @@ -166,7 +166,7 @@ func sshCommandStream(host string, config *Config, resultChannel chan Result) {

// readToBytesChannel reads from io.Reader and directs the data to a byte slice channel for streaming.
func readToBytesChannel(reader io.Reader, stream chan []byte, r Result, wg *sync.WaitGroup) {
defer func(){ wg.Done() }()
defer func() { wg.Done() }()

rdr := bufio.NewReader(reader)

Expand All @@ -192,25 +192,31 @@ func worker(hosts <-chan string, results chan<- Result, config *Config, resChan
// TODO: Make the handling of a JobStack more elegant.
if resChan == nil {
for host := range hosts {
cfg := *config
if cfg.JobStack != nil {
for i := range *cfg.JobStack {
j := (*cfg.JobStack)[i]
if config.JobStack != nil {
for i := range *config.JobStack {
// Cfg is a copy of config, without job pointers. This is needed to separate the jobstack.
cfg := copyConfigNoJobs(config)

j := (*config.JobStack)[i]
cfg.Job = &j
results <- sshCommand(host, &cfg)

results <- sshCommand(host, cfg)
}
} else {
results <- sshCommand(host, config)
}
}
} else {
for host := range hosts {
cfg := *config
if cfg.JobStack != nil {
for i := range *cfg.JobStack {
j := (*cfg.JobStack)[i]
if config.JobStack != nil {
for i := range *config.JobStack {
// Cfg is a copy of config, without job pointers. This is needed to separate the jobstack.
cfg := copyConfigNoJobs(config)

j := (*config.JobStack)[i]
cfg.Job = &j
go sshCommandStream(host, &cfg, resChan)

go sshCommandStream(host, cfg, resChan)
}
} else {
go sshCommandStream(host, config, resChan)
Expand All @@ -219,6 +225,17 @@ func worker(hosts <-chan string, results chan<- Result, config *Config, resChan
}
}

func copyConfigNoJobs(config *Config) *Config {
cfg := NewConfig()
cfg.Hosts = config.Hosts
cfg.SSHConfig = config.SSHConfig
cfg.BastionHost = config.BastionHost
cfg.BastionHostSSHConfig = config.BastionHostSSHConfig
cfg.WorkerPool = config.WorkerPool

return cfg
}

// runStream is mostly the same as run, except it directs the results to a channel so they can be processed
// before the command has completed executing (i.e streaming the stdout and stderr as it runs).
func runStream(c *Config, rs chan Result) {
Expand Down

0 comments on commit 7125338

Please sign in to comment.