This repository has been archived by the owner on Aug 22, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathkubelogtail.go
198 lines (174 loc) · 4.58 KB
/
kubelogtail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package kubelogtail
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
// import for auth providers
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
)
// OptionsFunc is a function passed to new for setting options on a new KubeLogTail.
type OptionsFunc func(*KubeLogTail) error
// KubeLogTail is a wrapper for discovering and printing logs from Kubernetes containers.
type KubeLogTail struct {
kubeconfig string
clientset *kubernetes.Clientset
ctx context.Context
cancel context.CancelFunc
refreshTime time.Duration
namespace string
selector string
pods map[string]*podTail
colorMode string
color *logColorPrint
}
// New creates a new KubeLogTail
func New(options ...OptionsFunc) (*KubeLogTail, error) {
k := &KubeLogTail{
refreshTime: time.Second * 10,
namespace: "default",
pods: make(map[string]*podTail),
colorMode: "pod",
}
k.ctx, k.cancel = context.WithCancel(context.Background())
for _, f := range options {
if err := f(k); err != nil {
return nil, errors.Wrapf(err, "failed to process options")
}
}
client, err := kubeClient(k.kubeconfig)
if err != nil {
return nil, errors.Wrapf(err, "failed to create kubernetes client from %s", k.kubeconfig)
}
k.clientset = client
c, err := newLogColorPrint(k.colorMode)
if err != nil {
return nil, errors.Wrapf(err, "failed to create color functions")
}
k.color = c
return k, nil
}
// SetKubeConfig creates a function that will set the kubeconfig.
// Generally, only used when create a new KubeLogTail.
func SetKubeConfig(kubeconfig string) OptionsFunc {
return func(k *KubeLogTail) error {
k.kubeconfig = kubeconfig
return nil
}
}
// SetRefreshTime creates a function that will set the pod refresh time.
// Generally, only used when create a new KubeLogTail.
func SetRefreshTime(duration time.Duration) OptionsFunc {
return func(k *KubeLogTail) error {
k.refreshTime = duration
return nil
}
}
// SetNamespace creates a function that will set the namespace for pods.
// a blank string indicates all namespaces
// Generally, only used when create a new KubeLogTail.
func SetNamespace(namespace string) OptionsFunc {
return func(k *KubeLogTail) error {
k.namespace = namespace
return nil
}
}
// SetLabelSelector creates a function that will set the label selecotr for
// listing pods.
// Generally, only used when create a new KubeLogTail.
func SetLabelSelector(query string) OptionsFunc {
return func(k *KubeLogTail) error {
l, err := labels.Parse(query)
if err != nil {
return errors.Wrapf(err, "failed to parse label selector: \"%s\"", query)
}
k.selector = l.String()
return nil
}
}
// SetColorMode creates a function that will set the color print mode.
// Generally, only used when create a new KubeLogTail.
func SetColorMode(mode string) OptionsFunc {
return func(k *KubeLogTail) error {
k.colorMode = mode
return nil
}
}
// Stop triggers the kubelog tail to stop processing.
func (k *KubeLogTail) Stop() {
k.cancel()
}
// Run discovers the pods and tails the container logs.
// This generally does not return.
func (k *KubeLogTail) Run() error {
//run first time
if err := k.processPods(); err != nil {
return errors.Wrapf(err, "failed to process pods for logs")
}
t := time.NewTicker(k.refreshTime)
LOOP:
for {
select {
case <-t.C:
if err := k.processPods(); err != nil {
fmt.Println(errors.Wrapf(err, "failed to process pods for logs"))
}
case <-k.ctx.Done():
// this probably isn't actually needed
t.Stop()
break LOOP
}
}
// cancel should propogate to all the children
return nil
}
func (k *KubeLogTail) processPods() error {
pods, err := getPods(k.clientset, k.namespace, k.selector)
if err != nil {
return errors.Wrapf(err, "failed to list pods")
}
oldPods := make(map[string]bool)
for key := range k.pods {
oldPods[key] = true
}
foundPods := make(map[string]*v1.Pod)
for i := range pods.Items {
p := &pods.Items[i]
key := fmt.Sprintf("%s/%s", p.GetNamespace(), p.GetName())
foundPods[key] = p
}
// start new pods
for key, pod := range foundPods {
_, ok := k.pods[key]
if ok {
continue
}
t := k.newPodTail(pod)
k.pods[key] = t
// TODO: channel for errors?
go func() {
err := t.tail()
if err != nil {
fmt.Println(err)
}
}()
}
// stop old ones
for key := range oldPods {
_, ok := foundPods[key]
if ok {
continue
}
t, ok := k.pods[key]
if !ok {
// should never happen
continue
}
t.stop()
}
return nil
}