-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
145 lines (123 loc) · 3.46 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
batch "k8s.io/api/batch/v1"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // enable GCP specific authentication
"k8s.io/client-go/tools/clientcmd"
"github.com/egym-playground/kubejob/pkg/cli"
"github.com/egym-playground/kubejob/pkg/job"
)
var githash string // set by linker, see '.travis.yml'
func main() {
args, err := cli.Parse(os.Args, os.Getenv("HOME"), githash, os.Stderr)
if err != nil {
log.Fatal("Error: ", err)
}
ctx := context.Background()
var cancel func()
if args.Timeout != time.Duration(0) {
ctx, cancel = context.WithTimeout(ctx, args.Timeout)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()
go func() {
sigChan := make(chan os.Signal)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// wait for signal
<-sigChan
// cancel context
cancel()
}()
jobSpec, err := parseAndValidateJob(args.JobFile)
if err != nil {
log.Fatal("Unable to parse job: ", err)
}
cs, err := k8sClientSet(args.Kubeconfig)
if err != nil {
log.Fatal("Failed to create client: ", err)
}
events := make(chan job.Event)
go func() {
var lastPhase core.PodPhase
for event := range events {
switch event := event.(type) {
case error:
log.Printf("Error: %v", event)
case job.LogLine:
log.Printf("%s: %s", event.Container, event.Line)
case core.PodStatus:
status := event
if status.Phase != lastPhase {
log.Print("Phase: ", status.Phase)
}
lastPhase = status.Phase
if status.Phase == core.PodPending {
for _, cs := range status.ContainerStatuses {
if cs.State.Waiting != nil {
log.Printf("Container %s is waiting: %s", cs.Name, cs.State.Waiting.Reason)
}
}
}
}
}
}()
success, err := job.RunJob(ctx, cs, args.Namespace, jobSpec, events)
if success {
log.Print("Job completed successfully")
} else {
log.Print("Job failed")
}
if err != nil {
log.Print("Error: ", err)
}
log.Print("Deleting job")
err = cs.BatchV1().Jobs(args.Namespace).Delete(jobSpec.Name, nil)
if err != nil {
log.Print("Deleting job: ", err)
}
if !success {
os.Exit(1)
}
}
// parseAndValidateJob reads the job spec from path and returns the result if possible. If path is "-" the job spec is
// read from os.Stdin. Warnings are logged if required values are not set.
func parseAndValidateJob(path string) (*batch.Job, error) {
f := os.Stdin
if path != "-" {
var err error
f, err = os.Open(path)
if err != nil {
return nil, fmt.Errorf("unable to open job file: %v", err)
}
}
var job batch.Job
err := yaml.NewYAMLOrJSONDecoder(f, 1024).Decode(&job)
if err != nil {
return nil, fmt.Errorf("unable to parse job spec: %v", err)
}
if job.Spec.Template.Spec.RestartPolicy != core.RestartPolicyNever {
log.Print(`Warning: ".spec.template.spec.restartPolicy" should be set to "Never" in order to avoid unintended restarts`)
}
if job.Spec.BackoffLimit == nil || *job.Spec.BackoffLimit != 0 {
log.Print(`Warning: ".spec.backoffLimit" should be set to "0" in order to avoid unintended restarts`)
}
return &job, nil
}
// k8sClientSet creates the Kubernetes client set from the config.
func k8sClientSet(kubeconfig string) (*kubernetes.Clientset, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(config)
}