Skip to content

Commit bb0b10f

Browse files
committed
Reference ansible#883: env timeout open k8s logstream
Signed-off-by: Adrian Nackov <adrian.nackov@mail.schwarz>
1 parent 1be2c55 commit bb0b10f

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

pkg/workceptor/kubernetes.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,24 @@ func podRunningAndReady() func(event watch.Event) (bool, error) {
229229
return inner
230230
}
231231

232+
func GetTimeoutOpenLogstream(kw *KubeUnit) int {
233+
// RECEPTOR_OPEN_LOGSTREAM_TIMEOUT
234+
// default: 1
235+
openLogStreamTimeout := 1
236+
envTimeout := os.Getenv("RECEPTOR_OPEN_LOGSTREAM_TIMEOUT")
237+
if envTimeout != "" {
238+
var err error
239+
openLogStreamTimeout, err = strconv.Atoi(envTimeout)
240+
if err != nil {
241+
// ignore error, use default
242+
kw.GetWorkceptor().nc.GetLogger().Warning("Invalid value for RECEPTOR_OPEN_LOGSTREAM_TIMEOUT: %s. Ignoring", envTimeout)
243+
openLogStreamTimeout = 1
244+
}
245+
}
246+
kw.GetWorkceptor().nc.GetLogger().Debug("RECEPTOR_OPEN_LOGSTREAM_TIMEOUT: %d", openLogStreamTimeout)
247+
return openLogStreamTimeout
248+
}
249+
232250
func (kw *KubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time.Time) (io.ReadCloser, error) {
233251
var logStream io.ReadCloser
234252
var err error
@@ -257,7 +275,7 @@ func (kw *KubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time
257275
retries,
258276
err,
259277
)
260-
time.Sleep(time.Second)
278+
time.Sleep(time.Duration(GetTimeoutOpenLogstream(kw)) * time.Second)
261279
}
262280
if err != nil {
263281
errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s. Error: %s", podNamespace, podName, err)

pkg/workceptor/kubernetes_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,52 @@ func TestShouldUseReconnect(t *testing.T) {
101101
}
102102
}
103103

104+
func TestGetTimeoutOpenLogstream(t *testing.T) {
105+
const envVariable string = "RECEPTOR_OPEN_LOGSTREAM_TIMEOUT"
106+
107+
kw, err := startNetceptorNodeWithWorkceptor()
108+
if err != nil {
109+
t.Fatal(err)
110+
}
111+
112+
tests := []struct {
113+
name string
114+
envValue string
115+
want int
116+
}{
117+
{
118+
name: "No env value set",
119+
envValue: "",
120+
want: 1,
121+
},
122+
{
123+
name: "Env value set incorrectly",
124+
envValue: "text instead of int",
125+
want: 1,
126+
},
127+
{
128+
name: "Env value set correctly",
129+
envValue: "2",
130+
want: 2,
131+
},
132+
}
133+
for _, tt := range tests {
134+
t.Run(tt.name, func(t *testing.T) {
135+
if tt.envValue != "" {
136+
os.Setenv(envVariable, tt.envValue)
137+
defer os.Unsetenv(envVariable)
138+
} else {
139+
os.Unsetenv(envVariable)
140+
}
141+
142+
if got := workceptor.GetTimeoutOpenLogstream(kw); got != tt.want {
143+
t.Errorf("GetTimeoutOpenLogstream() = %v, want %v", got, tt.want)
144+
}
145+
})
146+
}
147+
148+
}
149+
104150
func TestParseTime(t *testing.T) {
105151
type args struct {
106152
s string

0 commit comments

Comments
 (0)