@@ -5,17 +5,27 @@ import (
5
5
"context"
6
6
"encoding/json"
7
7
"errors"
8
+ "fmt"
9
+ "time"
8
10
9
11
k8sTypes "k8s.io/api/core/v1"
10
12
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
+ "k8s.io/apimachinery/pkg/fields"
11
14
"k8s.io/apimachinery/pkg/types"
12
15
"k8s.io/apimachinery/pkg/util/strategicpatch"
16
+ "k8s.io/apimachinery/pkg/watch"
13
17
"k8s.io/client-go/kubernetes"
14
18
"k8s.io/client-go/kubernetes/scheme"
15
19
"k8s.io/client-go/rest"
16
20
"k8s.io/client-go/tools/remotecommand"
17
21
)
18
22
23
+ // ValidPod status
24
+ const (
25
+ Running string = "Running"
26
+ Succeeded string = "Succeeded"
27
+ )
28
+
19
29
func New (client kubernetes.Interface , config * rest.Config , metaOptions metav1.ListOptions , ctx context.Context ) * Pods {
20
30
return & Pods {
21
31
client ,
@@ -62,6 +72,7 @@ type PodOptions struct {
62
72
Image string // image to be executed by the pod's container
63
73
Command []string // command to be executed by the pod's container and its arguments
64
74
RestartPolicy k8sTypes.RestartPolicy // policy for restarting containers in the pod. One of One of Always, OnFailure, Never
75
+ Wait string // timeout for waiting until the pod is running
65
76
}
66
77
67
78
func (obj * Pods ) List (namespace string ) ([]k8sTypes.Pod , error ) {
@@ -133,7 +144,81 @@ func (obj *Pods) Create(options PodOptions) (k8sTypes.Pod, error) {
133
144
if err != nil {
134
145
return k8sTypes.Pod {}, err
135
146
}
136
- return * pod , nil
147
+
148
+ if options .Wait == "" {
149
+ return * pod , nil
150
+ }
151
+ waitOpts := WaitOptions {
152
+ Name : options .Name ,
153
+ Namespace : options .Namespace ,
154
+ Status : Running ,
155
+ Timeout : options .Wait ,
156
+ }
157
+ status , err := obj .Wait (waitOpts )
158
+ if err != nil {
159
+ return k8sTypes.Pod {}, err
160
+ }
161
+ if ! status {
162
+ return k8sTypes.Pod {}, errors .New ("timeout exceeded waiting for pod to be running" )
163
+ }
164
+
165
+ return obj .Get (options .Name , options .Namespace )
166
+ }
167
+
168
+ // Options for waiting for a Pod status
169
+ type WaitOptions struct {
170
+ Name string // Pod name
171
+ Namespace string // Namespace where the pod is running
172
+ Status string // Wait until pod reaches the specified status. Must be one of "Running" or "Succeeded".
173
+ Timeout string // Timeout for waiting condition to be true
174
+ }
175
+
176
+ // Wait for the Pod to be in a given status up to given timeout and returns a boolean indicating if the staus was reached. If the pod is Failed returns error.
177
+ func (obj * Pods ) Wait (options WaitOptions ) (bool , error ) {
178
+ if options .Status != Running && options .Status != Succeeded {
179
+ return false , errors .New ("wait condition must be 'Running' or 'Succeeded'" )
180
+ }
181
+ timeout , err := time .ParseDuration (options .Timeout )
182
+ if err != nil {
183
+ return false , err
184
+ }
185
+ selector := fields.Set {
186
+ "metadata.name" : options .Name ,
187
+ }.AsSelector ()
188
+ watcher , err := obj .client .CoreV1 ().Pods (options .Namespace ).Watch (
189
+ obj .ctx ,
190
+ metav1.ListOptions {
191
+ FieldSelector : selector .String (),
192
+ },
193
+ )
194
+ if err != nil {
195
+ return false , err
196
+ }
197
+ defer watcher .Stop ()
198
+
199
+ for {
200
+ select {
201
+ case <- time .After (timeout ):
202
+ return false , nil
203
+ case event := <- watcher .ResultChan ():
204
+ if event .Type == watch .Error {
205
+ return false , fmt .Errorf ("error watching for pod: %v" , event .Object )
206
+ }
207
+ if event .Type == watch .Modified {
208
+ pod , isPod := event .Object .(* k8sTypes.Pod )
209
+ if ! isPod {
210
+ return false , errors .New ("received unknown object while watching for pods" )
211
+ }
212
+ if pod .Status .Phase == k8sTypes .PodFailed {
213
+ return false , errors .New ("Pod has failed" )
214
+ }
215
+ if string (pod .Status .Phase ) == options .Status {
216
+ return true , nil
217
+ }
218
+ }
219
+ }
220
+ }
221
+ return false , nil
137
222
}
138
223
139
224
// Exec executes a non-interactive command described in options and returns the stdout and stderr outputs
0 commit comments