Skip to content

Commit 0e1affb

Browse files
committed
Add test for kubeLoggingWithReconnect
1 parent 6bf181f commit 0e1affb

File tree

3 files changed

+69
-5
lines changed

3 files changed

+69
-5
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ require (
6262
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
6363
github.com/onsi/ginkgo/v2 v2.21.0 // indirect
6464
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
65+
github.com/pkg/errors v0.9.1 // indirect
6566
github.com/sagikazarmark/locafero v0.4.0 // indirect
6667
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
6768
github.com/sourcegraph/conc v0.3.0 // indirect
@@ -83,6 +84,7 @@ require (
8384
golang.org/x/time v0.5.0 // indirect
8485
golang.org/x/tools v0.27.0 // indirect
8586
google.golang.org/protobuf v1.34.2 // indirect
87+
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
8688
gopkg.in/inf.v0 v0.9.1 // indirect
8789
gopkg.in/ini.v1 v1.67.0 // indirect
8890
gopkg.in/yaml.v3 v3.0.1 // indirect

pkg/workceptor/kubernetes.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ func (kw *KubeUnit) kubeLoggingNoReconnect(streamWait *sync.WaitGroup, stdout *S
294294
}
295295
}
296296

297-
func (kw *KubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, stdoutErr *error) {
297+
func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, stdoutErr *error) {
298298
// preferred method for k8s >= 1.23.14
299299
defer streamWait.Done()
300300
var sinceTime time.Time
@@ -420,7 +420,7 @@ func (kw *KubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout
420420
}
421421
}
422422

423-
func (kw *KubeUnit) createPod(env map[string]string) error {
423+
func (kw *KubeUnit) CreatePod(env map[string]string) error {
424424
ked := kw.UnredactedStatus().ExtraData.(*KubeExtraData)
425425
command, err := shlex.Split(ked.Command)
426426
if err != nil {
@@ -621,7 +621,7 @@ func (kw *KubeUnit) runWorkUsingLogger() {
621621
if podName == "" {
622622
// create new pod if ked.PodName is empty
623623
// TODO: add retry logic to make this more resilient to transient errors
624-
if err := kw.createPod(nil); err != nil {
624+
if err := kw.CreatePod(nil); err != nil {
625625
if err != ErrPodCompleted {
626626
errMsg := fmt.Sprintf("Error creating pod: %s", err)
627627
kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet
@@ -842,7 +842,7 @@ func (kw *KubeUnit) runWorkUsingLogger() {
842842
stdoutWithReconnect := ShouldUseReconnect(kw)
843843
if stdoutWithReconnect && stdoutErr == nil {
844844
kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with reconnect support")
845-
go kw.kubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr)
845+
go kw.KubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr)
846846
} else {
847847
kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with no reconnect support")
848848
go kw.kubeLoggingNoReconnect(&streamWait, stdout, &stdoutErr)
@@ -1062,7 +1062,7 @@ func (kw *KubeUnit) runWorkUsingTCP() {
10621062
}()
10631063

10641064
// Create the pod
1065-
err = kw.createPod(map[string]string{"RECEPTOR_HOST": listenHost, "RECEPTOR_PORT": listenPort})
1065+
err = kw.CreatePod(map[string]string{"RECEPTOR_HOST": listenHost, "RECEPTOR_PORT": listenPort})
10661066
if err != nil {
10671067
errMsg := fmt.Sprintf("Error creating pod: %s", err)
10681068
kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0)

pkg/workceptor/kubernetes_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"k8s.io/apimachinery/pkg/selection"
2121
"k8s.io/apimachinery/pkg/watch"
2222
"k8s.io/client-go/kubernetes"
23+
"k8s.io/client-go/kubernetes/fake"
2324
"k8s.io/client-go/rest"
2425
"k8s.io/client-go/tools/remotecommand"
2526
)
@@ -423,3 +424,64 @@ func Test_IsCompatibleK8S(t *testing.T) {
423424
})
424425
}
425426
}
427+
428+
func TestKubeLoggingWithReconnect(t *testing.T) {
429+
var stdinErr error
430+
var stdoutErr error
431+
ctx := context.Background()
432+
ctrl := gomock.NewController(t)
433+
mockBaseWorkUnitForWorkUnit := mock_workceptor.NewMockBaseWorkUnitForWorkUnit(ctrl)
434+
mockBaseWorkUnit := mock_workceptor.NewMockBaseWorkUnitForWorkUnit(ctrl)
435+
mockNetceptor := mock_workceptor.NewMockNetceptorForWorkceptor(ctrl)
436+
mockNetceptor.EXPECT().NodeID().Return("NodeID")
437+
mockKubeAPI := mock_workceptor.NewMockKubeAPIer(ctrl)
438+
439+
kw := &workceptor.KubeUnit{
440+
BaseWorkUnitForWorkUnit: mockBaseWorkUnitForWorkUnit,
441+
}
442+
443+
w, err := workceptor.New(ctx, mockNetceptor, "/tmp")
444+
if err != nil {
445+
t.Errorf("Error while creating Workceptor: %v", err)
446+
}
447+
448+
mockBaseWorkUnit.EXPECT().Init(w, "", "", workceptor.FileSystem{}, nil)
449+
kubeConfig := workceptor.KubeWorkerCfg{AuthMethod: "incluster"}
450+
kubeConfig.NewkubeWorker(mockBaseWorkUnit, w, "", "", mockKubeAPI)
451+
452+
tests := []struct {
453+
name string
454+
expectedCalls func()
455+
}{
456+
{
457+
name: "Kube error should be read",
458+
expectedCalls: func() {
459+
lock := &sync.RWMutex{}
460+
mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusLock().Return(lock).AnyTimes()
461+
kubeExtraData := workceptor.KubeExtraData{}
462+
status := workceptor.StatusFileData{ExtraData: &kubeExtraData}
463+
mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes()
464+
mockBaseWorkUnitForWorkUnit.EXPECT().GetStatusCopy().Return(status).AnyTimes()
465+
mockBaseWorkUnitForWorkUnit.EXPECT().GetContext().Return(ctx).AnyTimes()
466+
pod := corev1.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "Test Name"}, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{}}
467+
mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes()
468+
mockBaseWorkUnitForWorkUnit.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes()
469+
field := hasTerm{}
470+
mockKubeAPI.EXPECT().OneTermEqualSelector(gomock.Any(), gomock.Any()).Return(&field).AnyTimes()
471+
ev := watch.Event{Object: &pod}
472+
mockKubeAPI.EXPECT().UntilWithSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&ev, nil).AnyTimes()
473+
mockKubeAPI.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes()
474+
client := fake.NewSimpleClientset()
475+
req := client.CoreV1().Pods("default").GetLogs("pod-1", &corev1.PodLogOptions{})
476+
mockKubeAPI.EXPECT().GetLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(req)
477+
},
478+
},
479+
}
480+
for _, tt := range tests {
481+
t.Run(tt.name, func(t *testing.T) {
482+
tt.expectedCalls()
483+
kw.CreatePod(nil)
484+
kw.KubeLoggingWithReconnect(&sync.WaitGroup{}, &workceptor.STDoutWriter{}, &stdinErr, &stdoutErr)
485+
})
486+
}
487+
}

0 commit comments

Comments
 (0)