diff --git a/.gitignore b/.gitignore index 7e1aa8a82..88ba36ed7 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ bin *.coverprofile vendor .idea -.go-pkg-cache \ No newline at end of file +.go-pkg-cache +junit.xml diff --git a/lib/health/health.go b/lib/health/health.go new file mode 100644 index 000000000..ff4551922 --- /dev/null +++ b/lib/health/health.go @@ -0,0 +1,169 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package health + +import ( + "fmt" + "net/http" + "sync" + "time" + + log "github.com/Sirupsen/logrus" +) + +// The HealthReport struct has slots for the levels of health that we monitor and aggregate. +type HealthReport struct { + Live bool + Ready bool +} + +type reporterState struct { + // The health indicators that this reporter reports. + reports HealthReport + + // Expiry time for this reporter's reports. + timeout time.Duration + + // The most recent report. + latest HealthReport + + // Time of that most recent report. + timestamp time.Time +} + +// A HealthAggregator receives health reports from individual reporters (which are typically +// components of a particular daemon or application) and aggregates them into an overall health +// summary. For each monitored kind of health, all of the reporters that report that need to say +// that it is good; for example, to be 'ready' overall, all of the reporters that report readiness +// need to have recently said 'Ready: true'. +type HealthAggregator struct { + // Mutex to protect concurrent access to this health aggregator. + mutex *sync.Mutex + + // Map from reporter name to corresponding state. + reporters map[string]*reporterState +} + +// RegisterReporter registers a reporter with a HealthAggregator. The aggregator uses NAME to +// identify the reporter. REPORTS indicates the kinds of health that this reporter will report. +// TIMEOUT is the expiry time for this reporter's reports; the implication of which is that the +// reporter should normally refresh its reports well before this time has expired. +func (aggregator *HealthAggregator) RegisterReporter(name string, reports *HealthReport, timeout time.Duration) { + aggregator.mutex.Lock() + defer aggregator.mutex.Unlock() + aggregator.reporters[name] = &reporterState{ + reports: *reports, + timeout: timeout, + latest: HealthReport{Live: true}, + timestamp: time.Now(), + } + return +} + +// Report reports current health from a reporter to a HealthAggregator. NAME is the reporter's name +// and REPORTS conveys the current status, for each kind of health that the reporter said it was +// going to report when it called RegisterReporter. +func (aggregator *HealthAggregator) Report(name string, report *HealthReport) { + aggregator.mutex.Lock() + defer aggregator.mutex.Unlock() + reporter := aggregator.reporters[name] + reporter.latest = *report + reporter.timestamp = time.Now() + return +} + +func NewHealthAggregator() *HealthAggregator { + return &HealthAggregator{mutex: &sync.Mutex{}, reporters: map[string]*reporterState{}} +} + +// Summary calculates the current overall health for a HealthAggregator. +func (aggregator *HealthAggregator) Summary() *HealthReport { + aggregator.mutex.Lock() + defer aggregator.mutex.Unlock() + + // In the absence of any reporters, default to indicating that we are both live and ready. + summary := &HealthReport{Live: true, Ready: true} + + // Now for each reporter... + for name, reporter := range aggregator.reporters { + log.WithFields(log.Fields{ + "name": name, + "reporter-state": reporter, + }).Debug("Detailed health state") + + // Reset Live to false if that reporter is registered to report liveness and hasn't + // recently said that it is live. + if summary.Live && reporter.reports.Live && (!reporter.latest.Live || + (time.Since(reporter.timestamp) > reporter.timeout)) { + summary.Live = false + } + + // Reset Ready to false if that reporter is registered to report readiness and + // hasn't recently said that it is ready. + if summary.Ready && reporter.reports.Ready && (!reporter.latest.Ready || + (time.Since(reporter.timestamp) > reporter.timeout)) { + summary.Ready = false + } + } + + log.WithField("summary", summary).Info("Overall health") + return summary +} + +const ( + // The HTTP status that we use for 'ready' or 'live'. 204 means "No Content: The server + // successfully processed the request and is not returning any content." (Kubernetes + // interpets any 200<=status<400 as 'good'.) + StatusGood = 204 + + // The HTTP status that we use for 'not ready' or 'not live'. 503 means "Service + // Unavailable: The server is currently unavailable (because it is overloaded or down for + // maintenance). Generally, this is a temporary state." (Kubernetes interpets any + // status>=400 as 'bad'.) + StatusBad = 503 +) + +// ServeHTTP publishes the current overall liveness and readiness at http://*:PORT/liveness and +// http://*:PORT/readiness respectively. A GET request on those URLs returns StatusGood or +// StatusBad, according to the current overall liveness or readiness. These endpoints are designed +// for use by Kubernetes liveness and readiness probes. +func (aggregator *HealthAggregator) ServeHTTP(port int) { + + log.WithField("port", port).Info("Starting health endpoints") + http.HandleFunc("/readiness", func(rsp http.ResponseWriter, req *http.Request) { + log.Debug("GET /readiness") + status := StatusBad + if aggregator.Summary().Ready { + log.Debug("Felix is ready") + status = StatusGood + } + rsp.WriteHeader(status) + }) + http.HandleFunc("/liveness", func(rsp http.ResponseWriter, req *http.Request) { + log.Debug("GET /liveness") + status := StatusBad + if aggregator.Summary().Live { + log.Debug("Felix is live") + status = StatusGood + } + rsp.WriteHeader(status) + }) + for { + err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil) + log.WithError(err).Error( + "Readiness endpoint failed, trying to restart it...") + time.Sleep(1 * time.Second) + } +} diff --git a/lib/health/health_suite_test.go b/lib/health/health_suite_test.go new file mode 100644 index 000000000..f7c065d06 --- /dev/null +++ b/lib/health/health_suite_test.go @@ -0,0 +1,33 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package health_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" + + "github.com/projectcalico/libcalico-go/lib/testutils" +) + +func init() { + testutils.HookLogrusForGinkgo() +} + +func TestSet(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Health Suite") +} diff --git a/lib/health/health_test.go b/lib/health/health_test.go new file mode 100644 index 000000000..59475d84b --- /dev/null +++ b/lib/health/health_test.go @@ -0,0 +1,145 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package health_test + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/projectcalico/libcalico-go/lib/health" +) + +const ( + SOURCE1 = "source1" + SOURCE2 = "source2" + SOURCE3 = "source3" +) + +var _ = Describe("Health", func() { + + var ( + aggregator *health.HealthAggregator + ) + + notifySource := func(source string) func() { + return func() { + switch source { + case SOURCE1: + aggregator.Report(source, &health.HealthReport{Ready: true}) + case SOURCE2: + aggregator.Report(source, &health.HealthReport{Live: true, Ready: true}) + case SOURCE3: + aggregator.Report(source, &health.HealthReport{Live: true}) + } + } + } + + cancelSource := func(source string) func() { + return func() { + aggregator.Report(source, &health.HealthReport{Live: false, Ready: false}) + } + } + + BeforeEach(func() { + aggregator = health.NewHealthAggregator() + aggregator.RegisterReporter(SOURCE1, &health.HealthReport{Ready: true}, 1*time.Second) + aggregator.RegisterReporter(SOURCE2, &health.HealthReport{Live: true, Ready: true}, 1*time.Second) + aggregator.RegisterReporter(SOURCE3, &health.HealthReport{Live: true}, 1*time.Second) + }) + + It("is initially live but not ready", func() { + Expect(aggregator.Summary().Ready).To(BeFalse()) + Expect(aggregator.Summary().Live).To(BeTrue()) + }) + + Context("with ready reports", func() { + + BeforeEach(func() { + notifySource(SOURCE1)() + notifySource(SOURCE2)() + }) + + It("is ready and live", func() { + Expect(aggregator.Summary().Ready).To(BeTrue()) + Expect(aggregator.Summary().Live).To(BeTrue()) + }) + + Context("with live report", func() { + + BeforeEach(notifySource(SOURCE3)) + + It("is ready and live", func() { + Expect(aggregator.Summary().Ready).To(BeTrue()) + Expect(aggregator.Summary().Live).To(BeTrue()) + }) + }) + + Context("with not-ready report", func() { + + BeforeEach(cancelSource(SOURCE1)) + + It("is live but not ready", func() { + Expect(aggregator.Summary().Ready).To(BeFalse()) + Expect(aggregator.Summary().Live).To(BeTrue()) + }) + }) + }) + + Context("with live reports", func() { + + BeforeEach(func() { + notifySource(SOURCE3)() + notifySource(SOURCE2)() + }) + + It("is live but not ready", func() { + Expect(aggregator.Summary().Live).To(BeTrue()) + Expect(aggregator.Summary().Ready).To(BeFalse()) + }) + + Context("with ready report also", func() { + + BeforeEach(notifySource(SOURCE1)) + + It("is ready and live", func() { + Expect(aggregator.Summary().Ready).To(BeTrue()) + Expect(aggregator.Summary().Live).To(BeTrue()) + }) + + Context("with time passing so that reports expire", func() { + + BeforeEach(func() { + time.Sleep(2 * time.Second) + }) + + It("is not ready and not live", func() { + Expect(aggregator.Summary().Ready).To(BeFalse()) + Expect(aggregator.Summary().Live).To(BeFalse()) + }) + }) + }) + + Context("with not-live report", func() { + + BeforeEach(cancelSource(SOURCE3)) + + It("is not ready and not live", func() { + Expect(aggregator.Summary().Ready).To(BeFalse()) + Expect(aggregator.Summary().Live).To(BeFalse()) + }) + }) + }) +}) diff --git a/lib/set/set.go b/lib/set/set.go new file mode 100644 index 000000000..a3fc94f9f --- /dev/null +++ b/lib/set/set.go @@ -0,0 +1,146 @@ +// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package set + +import ( + "errors" + "reflect" + + log "github.com/Sirupsen/logrus" +) + +type Set interface { + Len() int + Add(interface{}) + AddAll(itemArray interface{}) + Discard(interface{}) + Clear() + Contains(interface{}) bool + Iter(func(item interface{}) error) + Copy() Set + Equals(Set) bool + ContainsAll(Set) bool +} + +type empty struct{} + +var emptyValue = empty{} + +var ( + StopIteration = errors.New("Stop iteration") + RemoveItem = errors.New("Remove item") +) + +func New() Set { + return make(mapSet) +} + +func From(members ...interface{}) Set { + s := New() + s.AddAll(members) + return s +} + +func FromArray(membersArray interface{}) Set { + s := New() + s.AddAll(membersArray) + return s +} + +func Empty() Set { + return mapSet(nil) +} + +type mapSet map[interface{}]empty + +func (set mapSet) Len() int { + return len(set) +} + +func (set mapSet) Add(item interface{}) { + set[item] = emptyValue +} + +func (set mapSet) AddAll(itemArray interface{}) { + + arrVal := reflect.ValueOf(itemArray) + for i := 0; i < arrVal.Len(); i++ { + set.Add(arrVal.Index(i).Interface()) + } +} + +func (set mapSet) Discard(item interface{}) { + delete(set, item) +} + +func (set mapSet) Clear() { + for item := range set { + delete(set, item) + } +} + +func (set mapSet) Contains(item interface{}) bool { + _, present := set[item] + return present +} + +func (set mapSet) Iter(visitor func(item interface{}) error) { +loop: + for item := range set { + err := visitor(item) + switch err { + case StopIteration: + break loop + case RemoveItem: + delete(set, item) + case nil: + break + default: + log.WithError(err).Panic("Unexpected iteration error") + } + } +} + +func (set mapSet) Copy() Set { + cpy := New() + for item := range set { + cpy.Add(item) + } + return cpy +} + +func (set mapSet) Equals(other Set) bool { + if set.Len() != other.Len() { + return false + } + for item := range set { + if !other.Contains(item) { + return false + } + } + return true +} + +func (set mapSet) ContainsAll(other Set) bool { + result := true + other.Iter(func(item interface{}) error { + if !set.Contains(item) { + result = false + return StopIteration + } + return nil + }) + return result +} diff --git a/lib/set/set_suite_test.go b/lib/set/set_suite_test.go new file mode 100644 index 000000000..fd3be1fb8 --- /dev/null +++ b/lib/set/set_suite_test.go @@ -0,0 +1,36 @@ +// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package set_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" + + "github.com/onsi/ginkgo/reporters" + + "github.com/projectcalico/libcalico-go/lib/testutils" +) + +func init() { + testutils.HookLogrusForGinkgo() +} + +func TestSet(t *testing.T) { + RegisterFailHandler(Fail) + junitReporter := reporters.NewJUnitReporter("junit.xml") + RunSpecsWithDefaultAndCustomReporters(t, "Set Suite", []Reporter{junitReporter}) +} diff --git a/lib/set/set_test.go b/lib/set/set_test.go new file mode 100644 index 000000000..e7a8769c4 --- /dev/null +++ b/lib/set/set_test.go @@ -0,0 +1,222 @@ +// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package set_test + +import ( + "github.com/projectcalico/libcalico-go/lib/set" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Set", func() { + var s set.Set + BeforeEach(func() { + s = set.New() + }) + + It("should be empty", func() { + Expect(s.Len()).To(BeZero()) + }) + It("should iterate over no items", func() { + called := false + s.Iter(func(item interface{}) error { + called = true + return nil + }) + Expect(called).To(BeFalse()) + }) + It("should do nothing on clear", func() { + s.Clear() + Expect(s.Len()).To(BeZero()) + }) + + Describe("Set created by FromArray", func() { + BeforeEach(func() { + s = set.FromArray([]int{1, 2}) + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should contain 2", func() { + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should not contain 3", func() { + Expect(s.Contains(3)).To(BeFalse()) + }) + }) + + Describe("Set created by From", func() { + BeforeEach(func() { + s = set.From(1, 2) + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should contain 2", func() { + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should not contain 3", func() { + Expect(s.Contains(3)).To(BeFalse()) + }) + It("should contain all of {1, 2}", func() { + Expect(s.ContainsAll(set.From(1, 2))).To(BeTrue()) + }) + It("should not contain all of {1, 2, 3}", func() { + Expect(s.ContainsAll(set.From(1, 2, 3))).To(BeFalse()) + }) + }) + + Describe("after adding 1 and 2", func() { + BeforeEach(func() { + s.Add(1) + s.Add(2) + s.Add(2) // Duplicate should have no effect + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should contain 2", func() { + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should not contain 3", func() { + Expect(s.Contains(3)).To(BeFalse()) + }) + It("should iterate over 1 and 2 in some order", func() { + seen1 := false + seen2 := false + s.Iter(func(item interface{}) error { + if item.(int) == 1 { + Expect(seen1).To(BeFalse()) + seen1 = true + } else if item.(int) == 2 { + Expect(seen2).To(BeFalse()) + seen2 = true + } else { + Fail("Unexpected item") + } + return nil + }) + Expect(seen1).To(BeTrue()) + Expect(seen2).To(BeTrue()) + }) + It("should allow remove during iteration", func() { + s.Iter(func(item interface{}) error { + if item.(int) == 1 { + return set.RemoveItem + } + return nil + }) + Expect(s.Contains(1)).To(BeFalse()) + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should support stopping iteration", func() { + iterationStarted := false + s.Iter(func(item interface{}) error { + if iterationStarted { + Fail("Iteration continued after stop") + } + iterationStarted = true + return set.StopIteration + }) + Expect(s.Contains(1)).To(BeTrue()) + Expect(s.Contains(2)).To(BeTrue()) + }) + It("can copy a Set", func() { + c := s.Copy() + Expect(c.Len()).To(Equal(s.Len())) + Expect(c).NotTo(BeIdenticalTo(s)) // Check they're not the same object. + Expect(c).To(Equal(s)) // DeepEquals, will check the contents. + }) + It("should correctly determine set equality", func() { + c := s.Copy() + Expect(c.Equals(s)).To(BeTrue()) + Expect(s.Equals(c)).To(BeTrue()) + c.Add(3) + Expect(c.Equals(s)).To(BeFalse()) + Expect(s.Equals(c)).To(BeFalse()) + c.Discard(2) + Expect(c.Equals(s)).To(BeFalse()) + Expect(s.Equals(c)).To(BeFalse()) + c.Add(2) + c.Discard(3) + Expect(c.Equals(s)).To(BeTrue()) + Expect(s.Equals(c)).To(BeTrue()) + }) + + Describe("after removing 2", func() { + BeforeEach(func() { + s.Discard(2) + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should not contain 2", func() { + Expect(s.Contains(2)).To(BeFalse()) + }) + It("should not contain 3", func() { + Expect(s.Contains(3)).To(BeFalse()) + }) + }) + Describe("after using AddAll to add 2, 3, 4", func() { + BeforeEach(func() { + s.AddAll([]int{2, 3, 4}) + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should contain 2", func() { + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should contain 3", func() { + Expect(s.Contains(3)).To(BeTrue()) + }) + It("should contain 4", func() { + Expect(s.Contains(4)).To(BeTrue()) + }) + }) + + Describe("after Clear()", func() { + BeforeEach(func() { + s.Clear() + }) + It("should be empty", func() { + Expect(s.Len()).To(BeZero()) + }) + }) + }) +}) + +var _ = Describe("EmptySet", func() { + var empty set.Set + BeforeEach(func() { + empty = set.Empty() + }) + It("has length 0", func() { + Expect(empty.Len()).To(Equal(0)) + }) + It("should panic on add", func() { + Expect(func() { empty.Add("foo") }).To(Panic()) + }) + It("should ignore discard", func() { + Expect(func() { empty.Discard("foo") }).NotTo(Panic()) + }) + It("should iterate 0 times", func() { + empty.Iter(func(item interface{}) error { + Fail("Iterated > 0 times") + return nil + }) + }) +})