Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Submitting Batch Job in OCI Containers #61

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
generated
out
.idea/
.idea/
.vscode/
13 changes: 9 additions & 4 deletions internal/cbatch/CmdArgParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package cbatch

import (
"CraneFrontEnd/internal/util"
"github.com/spf13/cobra"
"os"

"github.com/spf13/cobra"
)

var (
Expand All @@ -38,6 +39,8 @@ var (
FlagExcludes string
FlagGetUserEnv string
FlagExport string
FlagContainer string
FlagInterpreter string
FlagStdoutPath string
FlagStderrPath string

Expand All @@ -55,8 +58,8 @@ func ParseCmdArgs() {
}

rootCmd.PersistentFlags().StringVarP(&FlagConfigFilePath, "config", "C",
util.DefaultConfigPath, "Path to configuration file")
rootCmd.Flags().Uint32VarP(&FlagNodes, "nodes", "N", 0, " number of nodes on which to run (N = min[-max])")
util.DefaultConfigPath, "path to configuration file")
rootCmd.Flags().Uint32VarP(&FlagNodes, "nodes", "N", 0, "number of nodes on which to run (N = min[-max])")
rootCmd.Flags().Float64VarP(&FlagCpuPerTask, "cpus-per-task", "c", 0, "number of cpus required per task")
rootCmd.Flags().Uint32Var(&FlagNtasksPerNode, "ntasks-per-node", 0, "number of tasks to invoke on each node")
rootCmd.Flags().StringVarP(&FlagTime, "time", "t", "", "time limit")
Expand All @@ -67,10 +70,12 @@ func ParseCmdArgs() {
rootCmd.Flags().StringVar(&FlagCwd, "chdir", "", "working directory of the task")
rootCmd.Flags().StringVarP(&FlagQos, "qos", "q", "", "quality of service")
rootCmd.Flags().Uint32Var(&FlagRepeat, "repeat", 1, "submit the task multiple times")
rootCmd.Flags().StringVarP(&FlagNodelist, "nodelist", "w", "", "List of specific nodes to be allocated to the job")
rootCmd.Flags().StringVarP(&FlagNodelist, "nodelist", "w", "", "list of specific nodes to be allocated to the job")
rootCmd.Flags().StringVarP(&FlagExcludes, "exclude", "x", "", "exclude a specific list of hosts")
rootCmd.Flags().StringVar(&FlagGetUserEnv, "get-user-env", "", "get user's environment variables")
rootCmd.Flags().StringVar(&FlagExport, "export", "", "propagate environment variables")
rootCmd.Flags().StringVar(&FlagContainer, "container", "", "OCI bundle path of the container")
rootCmd.Flags().StringVar(&FlagInterpreter, "interpreter", "", "interpreter for batch script")
rootCmd.Flags().StringVarP(&FlagStdoutPath, "output", "o", "", "file for batch script's standard output")
rootCmd.Flags().StringVarP(&FlagStderrPath, "error", "e", "", "file for batch script's standard error output")

Expand Down
48 changes: 33 additions & 15 deletions internal/cbatch/cbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"bufio"
"context"
"fmt"
log "github.com/sirupsen/logrus"
"os"
"regexp"
"strconv"
"strings"

log "github.com/sirupsen/logrus"
)

type CbatchArg struct {
Expand Down Expand Up @@ -55,7 +56,7 @@ func ProcessCbatchArg(args []CbatchArg) (bool, *protos.TaskToCtld) {
task.GetUserEnv = false
task.Env = make(map[string]string)

///*************set parameter values based on the file*******************************///
/*** Set parameter values based on the file ***/
for _, arg := range args {
switch arg.name {
case "--nodes", "-N":
Expand All @@ -66,6 +67,7 @@ func ProcessCbatchArg(args []CbatchArg) (bool, *protos.TaskToCtld) {
}
task.NodeNum = uint32(num)
case "--cpus-per-task", "-c":
// FIXME: BUG? 'bitSize' argument is invalid, must be either 32 or 64 (SA1030)
num, err := strconv.ParseFloat(arg.val, 10)
if err != nil {
log.Print("Invalid " + arg.name)
Expand All @@ -81,7 +83,7 @@ func ProcessCbatchArg(args []CbatchArg) (bool, *protos.TaskToCtld) {
task.NtasksPerNode = uint32(num)
case "--time", "-t":
isOk := util.ParseDuration(arg.val, task.TimeLimit)
if isOk == false {
if !isOk {
log.Print("Invalid " + arg.name)
return false, nil
}
Expand Down Expand Up @@ -111,6 +113,10 @@ func ProcessCbatchArg(args []CbatchArg) (bool, *protos.TaskToCtld) {
task.GetUserEnv = true
case "--export":
task.Env["CRANE_EXPORT_ENV"] = arg.val
case "--container":
task.Container = arg.val
case "--interpreter":
task.GetBatchMeta().Interpreter = arg.val
case "-o", "--output":
task.GetBatchMeta().OutputFilePattern = arg.val
case "-e", "--error":
Expand All @@ -120,7 +126,7 @@ func ProcessCbatchArg(args []CbatchArg) (bool, *protos.TaskToCtld) {
}
}

// ************* set parameter values based on the command line *********************
/*** Set parameter values based on the command line ***/
// If the command line argument is set, it replaces the argument read from the file,
// so the command line has a higher priority
if FlagNodes != 0 {
Expand All @@ -133,9 +139,8 @@ func ProcessCbatchArg(args []CbatchArg) (bool, *protos.TaskToCtld) {
task.NtasksPerNode = FlagNtasksPerNode
}
if FlagTime != "" {
ok := util.ParseDuration(FlagTime, task.TimeLimit)
if ok == false {
log.Print("Invalid --time")
if ok := util.ParseDuration(FlagTime, task.TimeLimit); !ok {
log.Error("Invalid --time")
return false, nil
}
}
Expand Down Expand Up @@ -175,6 +180,12 @@ func ProcessCbatchArg(args []CbatchArg) (bool, *protos.TaskToCtld) {
if FlagExport != "" {
task.Env["CRANE_EXPORT_ENV"] = FlagExport
}
if FlagContainer != "" {
task.Container = FlagContainer
}
if FlagInterpreter != "" {
task.GetBatchMeta().Interpreter = FlagInterpreter
}
if FlagStdoutPath != "" {
task.GetBatchMeta().OutputFilePattern = FlagStdoutPath
}
Expand All @@ -183,7 +194,7 @@ func ProcessCbatchArg(args []CbatchArg) (bool, *protos.TaskToCtld) {
}

if task.CpusPerTask <= 0 || task.NtasksPerNode == 0 || task.NodeNum == 0 {
log.Print("Invalid --cpus-per-task, --ntasks-per-node or --node-num")
log.Error("Invalid --cpus-per-task, --ntasks-per-node or --node-num")
return false, nil
}

Expand Down Expand Up @@ -324,7 +335,7 @@ func Cbatch(jobFilePath string) {
defer func(file *os.File) {
err := file.Close()
if err != nil {
log.Printf("Failed to close %s\n", file.Name())
log.Warnf("Failed to close %s\n", file.Name())
}
}(file)

Expand All @@ -336,11 +347,16 @@ func Cbatch(jobFilePath string) {

for scanner.Scan() {
num++
success := ProcessLine(scanner.Text(), &sh, &args)
if !success {
err = fmt.Errorf("grammer error at line %v", num)
fmt.Println(err.Error())
os.Exit(1)

// Shebang line
if (num == 1) && strings.HasPrefix(scanner.Text(), "#!") {
args = append(args, CbatchArg{name: "--interpreter", val: scanner.Text()[2:]})
continue
}

// #CBATCH line
if ok := ProcessLine(scanner.Text(), &sh, &args); !ok {
log.Fatalf("grammar error at line %v", num)
}
}

Expand All @@ -360,11 +376,13 @@ func Cbatch(jobFilePath string) {
task.Uid = uint32(os.Getuid())
task.CmdLine = strings.Join(os.Args, " ")

// Process the content of --get-user-env
// Process the content of --export
SetPropagatedEnviron(task)

task.Type = protos.TaskType_Batch
if task.Cwd == "" {
// If container task, cwd is omitted and only
// used to generate default output/error file path.
task.Cwd, _ = os.Getwd()
}

Expand Down
10 changes: 8 additions & 2 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ message TaskToCtld {
InteractiveTaskAdditionalMeta interactive_meta = 22;
}

string container = 25;

string cmd_line = 31;
string cwd = 32; // Current working directory
map<string, string> env = 33;
Expand All @@ -104,11 +106,11 @@ message TaskToCtld {
}

message TaskInEmbeddedDb {
PersistedPartOfTaskInCtld persisted_part = 1;
RuntimeAttrOfTask runtime_attr = 1;
TaskToCtld task_to_ctld = 2;
}

message PersistedPartOfTaskInCtld {
message RuntimeAttrOfTask {
// Fields that won't change after this task is accepted.
uint32 task_id = 1;
int64 task_db_id = 3;
Expand Down Expand Up @@ -164,10 +166,12 @@ message TaskToD {
double cpus_per_task = 23;

bool get_user_env = 24;
string container = 25;
}

message BatchTaskAdditionalMeta {
string sh_script = 1;
string interpreter = 2;
string output_file_pattern = 3;
string error_file_pattern = 4;
}
Expand Down Expand Up @@ -196,6 +200,8 @@ message TaskInfo {
string username = 15;
string qos = 16;

string container = 25;

// Dynamic task information
TaskStatus status = 31;
string craned_list = 32;
Expand Down