Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple sinks #2

Merged
merged 3 commits into from
Sep 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions eventrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"fmt"

"github.com/golang/glog"
Expand Down Expand Up @@ -62,6 +63,10 @@ func init() {
// EventRouter is responsible for maintaining a stream of kubernetes
// system Events and pushing them to another channel for storage
type EventRouter struct {
// AddEventHandler do not take context, so we need to pass it around so that
// we can propagate cancel to the sinks
ctx context.Context

// kubeclient is the main kubernetes interface
kubeClient kubernetes.Interface

Expand All @@ -71,16 +76,16 @@ type EventRouter struct {
// returns true if the event store has been synced
eListerSynched cache.InformerSynced

// event sink
// TODO: Determine if we want to support multiple sinks.
eSink sinks.EventSinkInterface
// event sinks
eSink []sinks.EventSinkInterface
}

// NewEventRouter will create a new event router using the input params
func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer) *EventRouter {
func NewEventRouter(ctx context.Context, kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer) *EventRouter {
er := &EventRouter{
ctx: ctx,
kubeClient: kubeClient,
eSink: sinks.ManufactureSink(),
eSink: sinks.ManufactureSink(ctx),
}
eventsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: er.addEvent,
Expand Down Expand Up @@ -111,15 +116,19 @@ func (er *EventRouter) Run(stopCh <-chan struct{}) {
func (er *EventRouter) addEvent(obj interface{}) {
e := obj.(*v1.Event)
prometheusEvent(e)
er.eSink.UpdateEvents(e, nil)
for _, eSink := range er.eSink {
eSink.UpdateEvents(e, nil)
}
}

// updateEvent is called any time there is an update to an existing event
func (er *EventRouter) updateEvent(objOld interface{}, objNew interface{}) {
eOld := objOld.(*v1.Event)
eNew := objNew.(*v1.Event)
prometheusEvent(eNew)
er.eSink.UpdateEvents(eNew, eOld)
for _, eSink := range er.eSink {
eSink.UpdateEvents(eNew, eOld)
}
}

// prometheusEvent is called when an event is added or updated
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down Expand Up @@ -116,6 +117,7 @@ github.com/influxdata/influxdb v1.7.7 h1:UvNzAPfBrKMENVbQ4mr4ccA9sW+W1Ihl0Yh1s0B
github.com/influxdata/influxdb v1.7.7/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down Expand Up @@ -309,6 +311,7 @@ gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20190620084959-7cf5895f2711/go.mod h1:TBhBqb1AWbBQbW3XRusr7n7E4v2+5ZY8r8sAMnyFC5A=
Expand All @@ -326,6 +329,7 @@ k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v0.4.0 h1:lCJCxf/LIowc2IGS9TPjWDyXY4nOmdGdfcwwDQCOURQ=
k8s.io/klog v0.4.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kube-openapi v0.0.0-20190709113604-33be087ad058 h1:di3XCwddOR9cWBNpfgXaskhh6cgJuwcK54rvtwUaC10=
k8s.io/kube-openapi v0.0.0-20190709113604-33be087ad058/go.mod h1:nfDlWeOsu3pUf4yWGL+ERqohP4YsZcBJXWMK+gkzOA4=
k8s.io/utils v0.0.0-20190221042446-c2654d5206da/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
k8s.io/utils v0.0.0-20190809000727-6c36bc71fc4a h1:uy5HAgt4Ha5rEMbhZA+aM1j2cq5LmR6LQ71EYC2sVH4=
Expand Down
55 changes: 41 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"flag"
"net/http"
"os"
Expand All @@ -39,7 +40,8 @@ import (
var addr = flag.String("listen-address", ":8080", "The address to listen on for HTTP requests.")

// setup a signal hander to gracefully exit
func sigHandler() <-chan struct{} {
func sigHandler(ctx context.Context) (<-chan struct{}, context.Context) {
ctx, cancel := context.WithCancel(ctx)
stop := make(chan struct{})
go func() {
c := make(chan os.Signal, 1)
Expand All @@ -50,16 +52,20 @@ func sigHandler() <-chan struct{} {
syscall.SIGABRT, // Abnormal termination
syscall.SIGILL, // illegal instruction
syscall.SIGFPE) // floating point - this is why we can't have nice things
sig := <-c
glog.Warningf("Signal (%v) Detected, Shutting Down", sig)
select {
case sig := <-c:
glog.Warningf("Signal (%v) Detected, Shutting Down", sig)
case <-ctx.Done():
glog.Warningf("cancelling ctx. ctx.Done invoked, %s", ctx.Err())
}
cancel()
close(stop)
}()
return stop
return stop, ctx
}

// loadConfig will parse input + config file and return a clientset
func loadConfig() kubernetes.Interface {
var config *rest.Config
// read viper configs
func loadViperConfig() {
var err error

flag.Parse()
Expand All @@ -77,12 +83,21 @@ func loadConfig() kubernetes.Interface {
panic(err.Error())
}

viper.BindEnv("kubeconfig") // Allows the KUBECONFIG env var to override where the kubeconfig is
err = viper.BindEnv("kubeconfig") // Allows the KUBECONFIG env var to override where the kubeconfig is
if err != nil {
glog.Fatalf("BindEnv Failed, %s", err)
}

// Allow specifying a custom config file via the EVENTROUTER_CONFIG env var
if forceCfg := os.Getenv("EVENTROUTER_CONFIG"); forceCfg != "" {
viper.SetConfigFile(forceCfg)
}
}

// loadConfig will parse input + config file and return a clientset
func loadK8sConfig() kubernetes.Interface {
var config *rest.Config
var err error
kubeconfig := viper.GetString("kubeconfig")
if len(kubeconfig) > 0 {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
Expand All @@ -101,17 +116,15 @@ func loadConfig() kubernetes.Interface {
return clientset
}

// main entry point of the program
func main() {
func startController(ctx context.Context, clientset kubernetes.Interface) {
var wg sync.WaitGroup

clientset := loadConfig()
sharedInformers := informers.NewSharedInformerFactory(clientset, viper.GetDuration("resync-interval"))
eventsInformer := sharedInformers.Core().V1().Events()

stop, ctx := sigHandler(ctx)

// TODO: Support locking for HA https://github.com/kubernetes/kubernetes/pull/42666
eventRouter := NewEventRouter(clientset, eventsInformer)
stop := sigHandler()
eventRouter := NewEventRouter(ctx, clientset, eventsInformer)

// Startup the http listener for Prometheus Metrics endpoint.
go func() {
Expand All @@ -132,5 +145,19 @@ func main() {
sharedInformers.Start(stop)
wg.Wait()
glog.Warningf("Exiting main()")
}

// main entry point of the program
func main() {
loadViperConfig()

clientset := loadK8sConfig()

// we use context to cancel the go routines created in sink, etc.
ctx := context.Background()

// makes testing easier if we separate out the controller
startController(ctx, clientset)

os.Exit(1)
}
61 changes: 51 additions & 10 deletions sinks/glogsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,71 @@ limitations under the License.
package sinks

import (
"context"
"encoding/json"

"github.com/golang/glog"
"k8s.io/api/core/v1"
"sync"
)

// let's have few go routines to emit out glog.
const glogSinkConcurrency = 5

// GlogSink is the most basic sink
// Useful when you already have ELK/EFK Stack
type GlogSink struct {
// TODO: create a channel and buffer for scaling
updateChan chan UpdateEvent
}

// NewGlogSink will create a new
func NewGlogSink() EventSinkInterface {
return &GlogSink{}
func NewGlogSink(ctx context.Context) EventSinkInterface {
gs := &GlogSink{
updateChan: make(chan UpdateEvent),
}

var wg sync.WaitGroup
wg.Add(glogSinkConcurrency)
glog.V(3).Infof("Starting glog sink with concurrency=%d", glogSinkConcurrency)
// let's have couple of parallel routines
go func() {
defer wg.Done()
gs.updateEvents(ctx)
}()

// wait
go func() {
wg.Wait()
glog.V(3).Info("Stopping glog sink WaitGroup")
close(gs.updateChan)
}()

return gs
}

// UpdateEvents implements the EventSinkInterface
// UpdateEvents implements the EventSinkInterface.
// This is not a non-blocking call because the channel could get full. But ATM I do not care because
// glog just logs the message. It is CPU heavy (JSON Marshalling) and has no I/O. So the time complexity of the
// blocking call is very minimal. Also we could spawn more routines of updateEvents to make it concurrent.
func (gs *GlogSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event) {
eData := NewEventData(eNew, eOld)
gs.updateChan <- UpdateEvent{
eNew: eNew,
eOld: eOld,
}
}

if eJSONBytes, err := json.Marshal(eData); err == nil {
glog.Info(string(eJSONBytes))
} else {
glog.Warningf("Failed to json serialize event: %v", err)
func (gs *GlogSink) updateEvents(ctx context.Context) {
for {
select {
case event := <-gs.updateChan:
eData := NewEventData(event.eNew, event.eOld)
if eJSONBytes, err := json.Marshal(eData); err == nil {
glog.Info(string(eJSONBytes))
} else {
glog.Warningf("Failed to json serialize event: %v", err)
}
case <-ctx.Done():
glog.Warning("Ending, glog sink receiver channel, got ctx.Done()")
return
}
}
}
63 changes: 63 additions & 0 deletions sinks/glogsink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package sinks

import (
"context"
core_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"testing"
)

func TestNewGlogSink(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
gs := NewGlogSink(ctx)

client := fake.NewSimpleClientset()

// test create
e := createTestEvent(t, client)
gs.UpdateEvents(e, nil)

// test update
oldE, newE := updateTestEvent(t, client, e)
gs.UpdateEvents(newE, oldE)

// we are not using it
deleteTestEvent(t, client)

cancel()
}

func createTestEvent(t *testing.T, client *fake.Clientset) *core_v1.Event {
var err error
e := &core_v1.Event{ObjectMeta: meta_v1.ObjectMeta{Name: "my-test-event"}}
e, err = client.CoreV1().Events("test-ns").Create(e)
if err != nil {
t.Errorf("Create Event Failed, %s", err)
}

return e
}

func updateTestEvent(t *testing.T, client *fake.Clientset, e *core_v1.Event) (*core_v1.Event, *core_v1.Event) {
var err error

// send a update event
e.Reason = "foobar"
newE, err := client.CoreV1().Events("test-ns").Update(e)
if err != nil {
t.Errorf("Update Event Failed, %s", err)
}

return e, newE
}

func deleteTestEvent(t *testing.T, client *fake.Clientset) {
var err error

err = client.CoreV1().Events("test-ns").Delete("my-test-event", &meta_v1.DeleteOptions{})
if err != nil {
t.Errorf("Delete Event Failed, %s", err)
}

}
Loading