Skip to content

Commit 9192dc5

Browse files
authored
Add test for kubeLoggingWithReconnect (#1236)
1 parent 31c861c commit 9192dc5

File tree

3 files changed

+98
-11
lines changed

3 files changed

+98
-11
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ require (
6262
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
6363
github.com/onsi/ginkgo/v2 v2.21.0 // indirect
6464
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
65+
github.com/pkg/errors v0.9.1 // indirect
6566
github.com/sagikazarmark/locafero v0.4.0 // indirect
6667
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
6768
github.com/sourcegraph/conc v0.3.0 // indirect
@@ -83,6 +84,7 @@ require (
8384
golang.org/x/time v0.5.0 // indirect
8485
golang.org/x/tools v0.27.0 // indirect
8586
google.golang.org/protobuf v1.34.2 // indirect
87+
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
8688
gopkg.in/inf.v0 v0.9.1 // indirect
8789
gopkg.in/ini.v1 v1.67.0 // indirect
8890
gopkg.in/yaml.v3 v3.0.1 // indirect

pkg/workceptor/kubernetes.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (ku KubeAPIWrapper) NewFakeAlwaysRateLimiter() flowcontrol.RateLimiter {
171171
// It is instantiated in the NewkubeWorker function and available throughout the package.
172172
var KubeAPIWrapperInstance KubeAPIer
173173

174-
var KubeAPIWrapperLock *sync.RWMutex
174+
var KubeAPIWrapperLock sync.Mutex
175175

176176
// ErrPodCompleted is returned when pod has already completed before we could attach.
177177
var ErrPodCompleted = fmt.Errorf("pod ran to completion")
@@ -294,7 +294,7 @@ func (kw *KubeUnit) kubeLoggingNoReconnect(streamWait *sync.WaitGroup, stdout *S
294294
}
295295
}
296296

297-
func (kw *KubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, stdoutErr *error) {
297+
func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout *STDoutWriter, stdinErr *error, stdoutErr *error) {
298298
// preferred method for k8s >= 1.23.14
299299
defer streamWait.Done()
300300
var sinceTime time.Time
@@ -420,7 +420,7 @@ func (kw *KubeUnit) kubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout
420420
}
421421
}
422422

423-
func (kw *KubeUnit) createPod(env map[string]string) error {
423+
func (kw *KubeUnit) CreatePod(env map[string]string) error {
424424
ked := kw.UnredactedStatus().ExtraData.(*KubeExtraData)
425425
command, err := shlex.Split(ked.Command)
426426
if err != nil {
@@ -522,7 +522,9 @@ func (kw *KubeUnit) createPod(env map[string]string) error {
522522
})
523523

524524
// Wait for the pod to be running
525+
KubeAPIWrapperLock.Lock()
525526
fieldSelector := KubeAPIWrapperInstance.OneTermEqualSelector("metadata.name", kw.pod.Name).String()
527+
KubeAPIWrapperLock.Unlock()
526528
lw := &cache.ListWatch{
527529
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
528530
options.FieldSelector = fieldSelector
@@ -544,7 +546,9 @@ func (kw *KubeUnit) createPod(env map[string]string) error {
544546
}
545547

546548
time.Sleep(2 * time.Second)
549+
KubeAPIWrapperLock.Lock()
547550
ev, err := KubeAPIWrapperInstance.UntilWithSync(ctxPodReady, lw, &corev1.Pod{}, nil, podRunningAndReady())
551+
KubeAPIWrapperLock.Unlock()
548552
if ev == nil || ev.Object == nil {
549553
return fmt.Errorf("did not return an event while watching pod for work unit %s", kw.ID())
550554
}
@@ -621,7 +625,7 @@ func (kw *KubeUnit) runWorkUsingLogger() {
621625
if podName == "" {
622626
// create new pod if ked.PodName is empty
623627
// TODO: add retry logic to make this more resilient to transient errors
624-
if err := kw.createPod(nil); err != nil {
628+
if err := kw.CreatePod(nil); err != nil {
625629
if err != ErrPodCompleted {
626630
errMsg := fmt.Sprintf("Error creating pod: %s", err)
627631
kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet
@@ -842,7 +846,7 @@ func (kw *KubeUnit) runWorkUsingLogger() {
842846
stdoutWithReconnect := ShouldUseReconnect(kw)
843847
if stdoutWithReconnect && stdoutErr == nil {
844848
kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with reconnect support")
845-
go kw.kubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr)
849+
go kw.KubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr)
846850
} else {
847851
kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with no reconnect support")
848852
go kw.kubeLoggingNoReconnect(&streamWait, stdout, &stdoutErr)
@@ -1062,7 +1066,7 @@ func (kw *KubeUnit) runWorkUsingTCP() {
10621066
}()
10631067

10641068
// Create the pod
1065-
err = kw.createPod(map[string]string{"RECEPTOR_HOST": listenHost, "RECEPTOR_PORT": listenPort})
1069+
err = kw.CreatePod(map[string]string{"RECEPTOR_HOST": listenHost, "RECEPTOR_PORT": listenPort})
10661070
if err != nil {
10671071
errMsg := fmt.Sprintf("Error creating pod: %s", err)
10681072
kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0)
@@ -1574,11 +1578,11 @@ func (cfg KubeWorkerCfg) NewkubeWorker(bwu BaseWorkUnitForWorkUnit, w *Workcepto
15741578
}
15751579
}
15761580

1577-
KubeAPIWrapperLock = &sync.RWMutex{}
15781581
KubeAPIWrapperLock.Lock()
1579-
KubeAPIWrapperInstance = KubeAPIWrapper{}
15801582
if kawi != nil {
15811583
KubeAPIWrapperInstance = kawi
1584+
} else {
1585+
KubeAPIWrapperInstance = KubeAPIWrapper{}
15821586
}
15831587
KubeAPIWrapperLock.Unlock()
15841588

pkg/workceptor/kubernetes_test.go

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package workceptor_test
22

33
import (
44
"context"
5+
"io"
6+
"net/http"
57
"os"
68
"reflect"
9+
"strings"
710
"sync"
811
"testing"
912
"time"
@@ -20,7 +23,9 @@ import (
2023
"k8s.io/apimachinery/pkg/selection"
2124
"k8s.io/apimachinery/pkg/watch"
2225
"k8s.io/client-go/kubernetes"
26+
"k8s.io/client-go/kubernetes/scheme"
2327
"k8s.io/client-go/rest"
28+
fakerest "k8s.io/client-go/rest/fake"
2429
"k8s.io/client-go/tools/remotecommand"
2530
)
2631

@@ -139,7 +144,7 @@ func TestParseTime(t *testing.T) {
139144
}
140145
}
141146

142-
func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workceptor.MockBaseWorkUnitForWorkUnit, *mock_workceptor.MockNetceptorForWorkceptor, *workceptor.Workceptor, *mock_workceptor.MockKubeAPIer, context.Context) {
147+
func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workceptor.MockBaseWorkUnitForWorkUnit, *mock_workceptor.MockNetceptorForWorkceptor, *workceptor.Workceptor, *mock_workceptor.MockKubeAPIer, *gomock.Controller, context.Context) {
143148
ctrl := gomock.NewController(t)
144149
ctx := context.Background()
145150

@@ -157,7 +162,7 @@ func createKubernetesTestSetup(t *testing.T) (workceptor.WorkUnit, *mock_workcep
157162
kubeConfig := workceptor.KubeWorkerCfg{AuthMethod: "incluster"}
158163
ku := kubeConfig.NewkubeWorker(mockBaseWorkUnit, w, "", "", mockKubeAPI)
159164

160-
return ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctx
165+
return ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctrl, ctx
161166
}
162167

163168
type hasTerm struct {
@@ -189,7 +194,7 @@ func (e *ex) StreamWithContext(_ context.Context, _ remotecommand.StreamOptions)
189194
}
190195

191196
func TestKubeStart(t *testing.T) {
192-
ku, mockbwu, mockNet, w, mockKubeAPI, ctx := createKubernetesTestSetup(t)
197+
ku, mockbwu, mockNet, w, mockKubeAPI, _, ctx := createKubernetesTestSetup(t)
193198

194199
startTestCases := []struct {
195200
name string
@@ -423,3 +428,79 @@ func Test_IsCompatibleK8S(t *testing.T) {
423428
})
424429
}
425430
}
431+
432+
func TestKubeLoggingWithReconnect(t *testing.T) {
433+
var stdinErr error
434+
var stdoutErr error
435+
ku, mockBaseWorkUnit, mockNetceptor, w, mockKubeAPI, ctrl, ctx := createKubernetesTestSetup(t)
436+
437+
kw := &workceptor.KubeUnit{
438+
BaseWorkUnitForWorkUnit: mockBaseWorkUnit,
439+
}
440+
tests := []struct {
441+
name string
442+
expectedCalls func()
443+
}{
444+
{
445+
name: "Kube error should be read",
446+
expectedCalls: func() {
447+
mockBaseWorkUnit.EXPECT().UpdateBasicStatus(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
448+
config := rest.Config{}
449+
mockKubeAPI.EXPECT().InClusterConfig().Return(&config, nil)
450+
mockBaseWorkUnit.EXPECT().GetWorkceptor().Return(w).AnyTimes()
451+
clientset := kubernetes.Clientset{}
452+
mockKubeAPI.EXPECT().NewForConfig(gomock.Any()).Return(&clientset, nil)
453+
lock := &sync.RWMutex{}
454+
mockBaseWorkUnit.EXPECT().GetStatusLock().Return(lock).AnyTimes()
455+
mockBaseWorkUnit.EXPECT().MonitorLocalStatus().AnyTimes()
456+
mockBaseWorkUnit.EXPECT().UnitDir().Return("TestDir2").AnyTimes()
457+
kubeExtraData := workceptor.KubeExtraData{}
458+
status := workceptor.StatusFileData{ExtraData: &kubeExtraData}
459+
mockBaseWorkUnit.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes()
460+
mockBaseWorkUnit.EXPECT().GetStatusCopy().Return(status).AnyTimes()
461+
mockBaseWorkUnit.EXPECT().GetContext().Return(ctx).AnyTimes()
462+
pod := corev1.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "Test_Name"}, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{}}
463+
mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes()
464+
mockBaseWorkUnit.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes()
465+
field := hasTerm{}
466+
mockKubeAPI.EXPECT().OneTermEqualSelector(gomock.Any(), gomock.Any()).Return(&field).AnyTimes()
467+
ev := watch.Event{Object: &pod}
468+
mockKubeAPI.EXPECT().UntilWithSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&ev, nil).AnyTimes()
469+
mockKubeAPI.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes()
470+
req := fakerest.RESTClient{
471+
Client: fakerest.CreateHTTPClient(func(request *http.Request) (*http.Response, error) {
472+
resp := &http.Response{
473+
StatusCode: http.StatusOK,
474+
Body: io.NopCloser(strings.NewReader("2024-12-09T00:31:18.823849250Z HI\n kube error")),
475+
}
476+
477+
return resp, nil
478+
}),
479+
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
480+
}
481+
mockKubeAPI.EXPECT().GetLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes()
482+
logger := logger.NewReceptorLogger("")
483+
mockNetceptor.EXPECT().GetLogger().Return(logger).AnyTimes()
484+
mockKubeAPI.EXPECT().SubResource(gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes()
485+
exec := ex{}
486+
mockKubeAPI.EXPECT().NewSPDYExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&exec, nil).AnyTimes()
487+
},
488+
},
489+
}
490+
for _, tt := range tests {
491+
t.Run(tt.name, func(t *testing.T) {
492+
tt.expectedCalls()
493+
ku.Start()
494+
kw.CreatePod(nil)
495+
wg := &sync.WaitGroup{}
496+
wg.Add(1)
497+
mockfilesystemer := mock_workceptor.NewMockFileSystemer(ctrl)
498+
mockfilesystemer.EXPECT().OpenFile(gomock.Any(), gomock.Any(), gomock.Any()).Return(&os.File{}, nil)
499+
stdout, _ := workceptor.NewStdoutWriter(mockfilesystemer, "")
500+
mockFileWC := mock_workceptor.NewMockFileWriteCloser(ctrl)
501+
stdout.SetWriter(mockFileWC)
502+
mockFileWC.EXPECT().Write(gomock.AnyOf([]byte("HI\n"), []byte(" kube error\n"))).Return(0, nil).Times(2)
503+
kw.KubeLoggingWithReconnect(wg, stdout, &stdinErr, &stdoutErr)
504+
})
505+
}
506+
}

0 commit comments

Comments
 (0)