From 00ef16f980b7eeef2ba477c899af8d5d5913b039 Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 27 Jun 2017 13:37:42 +0100 Subject: [PATCH 1/2] Move set and health packages here to avoid Felix/Typha duplication --- lib/health/health.go | 171 ++++++++++++++++++++++++ lib/health/health_suite_test.go | 33 +++++ lib/health/health_test.go | 156 ++++++++++++++++++++++ lib/set/set.go | 146 +++++++++++++++++++++ lib/set/set_suite_test.go | 33 +++++ lib/set/set_test.go | 222 ++++++++++++++++++++++++++++++++ 6 files changed, 761 insertions(+) create mode 100644 lib/health/health.go create mode 100644 lib/health/health_suite_test.go create mode 100644 lib/health/health_test.go create mode 100644 lib/set/set.go create mode 100644 lib/set/set_suite_test.go create mode 100644 lib/set/set_test.go diff --git a/lib/health/health.go b/lib/health/health.go new file mode 100644 index 000000000..0376eb931 --- /dev/null +++ b/lib/health/health.go @@ -0,0 +1,171 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package health + +import ( + "fmt" + "net/http" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/projectcalico/libcalico-go/lib/set" +) + +// Any kind of value that can be used as a map key and is unique across multiple packages. For +// example, "type myHealthSource string". +type HealthSource interface{} + +type HealthIndicator struct { + // The source of this health indicator. + Source HealthSource + + // How long the indicator is valid for. In other words, if it continues operating normally, + // the source expects to refresh this indicator before this timeout. + Timeout time.Duration +} + +// For a component that provides health indications, return the sources that it provides to indicate +// readiness, and those that it provides to indicate liveness. +type HealthProvider interface { + ReadySources() []HealthSource + LiveSources() []HealthSource +} + +type HealthState struct { + // Whether we are overall 'ready'. + ready bool + + // Whether we are overall 'live'. + live bool + + // Mutex used to protect against concurrently reading and writing those attributes. + mutex *sync.Mutex +} + +func (state *HealthState) Ready() bool { + state.mutex.Lock() + defer state.mutex.Unlock() + return state.ready +} + +func (state *HealthState) Live() bool { + state.mutex.Lock() + defer state.mutex.Unlock() + return state.live +} + +func NewHealthState() *HealthState { + // Start as 'live' but not 'ready'. + return &HealthState{ready: false, live: true, mutex: &sync.Mutex{}} +} + +func MonitorHealth( + state *HealthState, + neededForReady set.Set, + neededForLive set.Set, + c <-chan HealthIndicator, +) { + currentHealth := set.New() + timer := map[HealthSource]*time.Timer{} + timeoutC := make(chan HealthSource) + + for { + select { + case indicator, ok := <-c: + if !ok { + log.Warningf("Health channel closed") + state.mutex.Lock() + state.ready = false + state.live = false + state.mutex.Unlock() + return + } + log.WithField("source", indicator.Source).Debug("Health indicator current") + if timer[indicator.Source] != nil { + timer[indicator.Source].Stop() + } + if indicator.Timeout > 0 { + currentHealth.Add(indicator.Source) + timer[indicator.Source] = time.AfterFunc(indicator.Timeout, func() { + timeoutC <- indicator.Source + }) + } else { + // Shortcut immediate timeout. A health source can use an + // indication with zero timeout to cancel a previous indication that + // might otherwise take a long time to expire. + log.WithField("source", indicator.Source).Debug("Health indicator cancelled") + currentHealth.Discard(indicator.Source) + } + case source := <-timeoutC: + log.WithField("source", source).Debug("Health indicator expired") + currentHealth.Discard(source) + } + state.mutex.Lock() + state.ready = currentHealth.ContainsAll(neededForReady) + state.live = currentHealth.ContainsAll(neededForLive) + log.WithFields(log.Fields{ + "ready": state.ready, + "live": state.live, + }).Debug("Health now") + state.mutex.Unlock() + } +} + +const ( + // The HTTP status that we use for 'ready' or 'live'. 204 means "No Content: The server + // successfully processed the request and is not returning any content." (Kubernetes + // interpets any 200<=status<400 as 'good'.) + STATUS_GOOD = 204 + + // The HTTP status that we use for 'not ready' or 'not live'. 503 means "Service + // Unavailable: The server is currently unavailable (because it is overloaded or down for + // maintenance). Generally, this is a temporary state." (Kubernetes interpets any + // status>=400 as 'bad'.) + STATUS_BAD = 503 +) + +func ServeHealth(port int, neededForReady set.Set, neededForLive set.Set, c <-chan HealthIndicator) { + + state := NewHealthState() + + go MonitorHealth(state, neededForReady, neededForLive, c) + + log.WithField("port", port).Info("Starting health endpoints") + http.HandleFunc("/readiness", func(rsp http.ResponseWriter, req *http.Request) { + log.Debug("GET /readiness") + status := STATUS_BAD + if state.Ready() { + log.Debug("Felix is ready") + status = STATUS_GOOD + } + rsp.WriteHeader(status) + }) + http.HandleFunc("/liveness", func(rsp http.ResponseWriter, req *http.Request) { + log.Debug("GET /liveness") + status := STATUS_BAD + if state.Live() { + log.Debug("Felix is live") + status = STATUS_GOOD + } + rsp.WriteHeader(status) + }) + for { + err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil) + log.WithError(err).Error( + "Readiness endpoint failed, trying to restart it...") + time.Sleep(1 * time.Second) + } +} diff --git a/lib/health/health_suite_test.go b/lib/health/health_suite_test.go new file mode 100644 index 000000000..f7c065d06 --- /dev/null +++ b/lib/health/health_suite_test.go @@ -0,0 +1,33 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package health_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" + + "github.com/projectcalico/libcalico-go/lib/testutils" +) + +func init() { + testutils.HookLogrusForGinkgo() +} + +func TestSet(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Health Suite") +} diff --git a/lib/health/health_test.go b/lib/health/health_test.go new file mode 100644 index 000000000..8d45974b3 --- /dev/null +++ b/lib/health/health_test.go @@ -0,0 +1,156 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package health_test + +import ( + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/projectcalico/libcalico-go/lib/health" + "github.com/projectcalico/libcalico-go/lib/set" +) + +type healthSource string + +var ( + SOURCE1 = healthSource("source1") + SOURCE2 = healthSource("source2") + SOURCE3 = healthSource("source3") +) + +var _ = Describe("Health", func() { + + var ( + healthChannel chan health.HealthIndicator + state *health.HealthState + ) + + notifySource := func(source healthSource) func() { + return func() { + healthChannel <- health.HealthIndicator{source, 1 * time.Second} + } + } + + cancelSource := func(source healthSource) func() { + return func() { + healthChannel <- health.HealthIndicator{source, 0} + } + } + + BeforeEach(func() { + healthChannel = make(chan health.HealthIndicator) + // Note: use a new HealthState, in each test. Otherwise what can happen is that the + // closing goroutine from the previous test changes it and confuses the test that is + // running now... + state = health.NewHealthState() + + go health.MonitorHealth( + state, + set.From(SOURCE1, SOURCE2), + set.From(SOURCE2, SOURCE3), + healthChannel, + ) + }) + + AfterEach(func() { + close(healthChannel) + Eventually(state.Ready).Should(BeFalse()) + Eventually(state.Live).Should(BeFalse()) + }) + + It("initially reports live but not ready", func() { + Expect(state.Ready()).To(BeFalse()) + Expect(state.Live()).To(BeTrue()) + }) + + Context("with indicators for readiness sources", func() { + + BeforeEach(func() { + notifySource(SOURCE1)() + notifySource(SOURCE2)() + }) + + It("is ready but not live", func() { + Eventually(state.Ready).Should(BeTrue()) + Expect(state.Live()).To(BeFalse()) + }) + + Context("with liveness source also", func() { + + BeforeEach(notifySource(SOURCE3)) + + It("is ready and live", func() { + Eventually(state.Ready).Should(BeTrue()) + Eventually(state.Live).Should(BeTrue()) + }) + }) + + Context("with a source cancelled", func() { + + BeforeEach(cancelSource(SOURCE1)) + + It("is not ready and not live", func() { + Eventually(state.Ready).Should(BeFalse()) + Eventually(state.Live).Should(BeFalse()) + }) + }) + }) + + Context("with indicators for liveness sources", func() { + + BeforeEach(func() { + notifySource(SOURCE3)() + notifySource(SOURCE2)() + }) + + It("is live but not ready", func() { + Eventually(state.Live).Should(BeTrue()) + Expect(state.Ready()).To(BeFalse()) + }) + + Context("with readiness source also", func() { + + BeforeEach(notifySource(SOURCE1)) + + It("is ready and live", func() { + Eventually(state.Ready).Should(BeTrue()) + Eventually(state.Live).Should(BeTrue()) + }) + + Context("with time passing so that indicators expire", func() { + + BeforeEach(func() { + time.Sleep(2 * time.Second) + }) + + It("is not ready and not live", func() { + Eventually(state.Ready).Should(BeFalse()) + Eventually(state.Live).Should(BeFalse()) + }) + }) + }) + + Context("with a source cancelled", func() { + + BeforeEach(cancelSource(SOURCE3)) + + It("is not ready and not live", func() { + Eventually(state.Ready).Should(BeFalse()) + Eventually(state.Live).Should(BeFalse()) + }) + }) + }) +}) diff --git a/lib/set/set.go b/lib/set/set.go new file mode 100644 index 000000000..a3fc94f9f --- /dev/null +++ b/lib/set/set.go @@ -0,0 +1,146 @@ +// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package set + +import ( + "errors" + "reflect" + + log "github.com/Sirupsen/logrus" +) + +type Set interface { + Len() int + Add(interface{}) + AddAll(itemArray interface{}) + Discard(interface{}) + Clear() + Contains(interface{}) bool + Iter(func(item interface{}) error) + Copy() Set + Equals(Set) bool + ContainsAll(Set) bool +} + +type empty struct{} + +var emptyValue = empty{} + +var ( + StopIteration = errors.New("Stop iteration") + RemoveItem = errors.New("Remove item") +) + +func New() Set { + return make(mapSet) +} + +func From(members ...interface{}) Set { + s := New() + s.AddAll(members) + return s +} + +func FromArray(membersArray interface{}) Set { + s := New() + s.AddAll(membersArray) + return s +} + +func Empty() Set { + return mapSet(nil) +} + +type mapSet map[interface{}]empty + +func (set mapSet) Len() int { + return len(set) +} + +func (set mapSet) Add(item interface{}) { + set[item] = emptyValue +} + +func (set mapSet) AddAll(itemArray interface{}) { + + arrVal := reflect.ValueOf(itemArray) + for i := 0; i < arrVal.Len(); i++ { + set.Add(arrVal.Index(i).Interface()) + } +} + +func (set mapSet) Discard(item interface{}) { + delete(set, item) +} + +func (set mapSet) Clear() { + for item := range set { + delete(set, item) + } +} + +func (set mapSet) Contains(item interface{}) bool { + _, present := set[item] + return present +} + +func (set mapSet) Iter(visitor func(item interface{}) error) { +loop: + for item := range set { + err := visitor(item) + switch err { + case StopIteration: + break loop + case RemoveItem: + delete(set, item) + case nil: + break + default: + log.WithError(err).Panic("Unexpected iteration error") + } + } +} + +func (set mapSet) Copy() Set { + cpy := New() + for item := range set { + cpy.Add(item) + } + return cpy +} + +func (set mapSet) Equals(other Set) bool { + if set.Len() != other.Len() { + return false + } + for item := range set { + if !other.Contains(item) { + return false + } + } + return true +} + +func (set mapSet) ContainsAll(other Set) bool { + result := true + other.Iter(func(item interface{}) error { + if !set.Contains(item) { + result = false + return StopIteration + } + return nil + }) + return result +} diff --git a/lib/set/set_suite_test.go b/lib/set/set_suite_test.go new file mode 100644 index 000000000..072c66c80 --- /dev/null +++ b/lib/set/set_suite_test.go @@ -0,0 +1,33 @@ +// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package set_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" + + "github.com/projectcalico/libcalico-go/lib/testutils" +) + +func init() { + testutils.HookLogrusForGinkgo() +} + +func TestSet(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Set Suite") +} diff --git a/lib/set/set_test.go b/lib/set/set_test.go new file mode 100644 index 000000000..e7a8769c4 --- /dev/null +++ b/lib/set/set_test.go @@ -0,0 +1,222 @@ +// Copyright (c) 2016-2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package set_test + +import ( + "github.com/projectcalico/libcalico-go/lib/set" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Set", func() { + var s set.Set + BeforeEach(func() { + s = set.New() + }) + + It("should be empty", func() { + Expect(s.Len()).To(BeZero()) + }) + It("should iterate over no items", func() { + called := false + s.Iter(func(item interface{}) error { + called = true + return nil + }) + Expect(called).To(BeFalse()) + }) + It("should do nothing on clear", func() { + s.Clear() + Expect(s.Len()).To(BeZero()) + }) + + Describe("Set created by FromArray", func() { + BeforeEach(func() { + s = set.FromArray([]int{1, 2}) + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should contain 2", func() { + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should not contain 3", func() { + Expect(s.Contains(3)).To(BeFalse()) + }) + }) + + Describe("Set created by From", func() { + BeforeEach(func() { + s = set.From(1, 2) + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should contain 2", func() { + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should not contain 3", func() { + Expect(s.Contains(3)).To(BeFalse()) + }) + It("should contain all of {1, 2}", func() { + Expect(s.ContainsAll(set.From(1, 2))).To(BeTrue()) + }) + It("should not contain all of {1, 2, 3}", func() { + Expect(s.ContainsAll(set.From(1, 2, 3))).To(BeFalse()) + }) + }) + + Describe("after adding 1 and 2", func() { + BeforeEach(func() { + s.Add(1) + s.Add(2) + s.Add(2) // Duplicate should have no effect + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should contain 2", func() { + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should not contain 3", func() { + Expect(s.Contains(3)).To(BeFalse()) + }) + It("should iterate over 1 and 2 in some order", func() { + seen1 := false + seen2 := false + s.Iter(func(item interface{}) error { + if item.(int) == 1 { + Expect(seen1).To(BeFalse()) + seen1 = true + } else if item.(int) == 2 { + Expect(seen2).To(BeFalse()) + seen2 = true + } else { + Fail("Unexpected item") + } + return nil + }) + Expect(seen1).To(BeTrue()) + Expect(seen2).To(BeTrue()) + }) + It("should allow remove during iteration", func() { + s.Iter(func(item interface{}) error { + if item.(int) == 1 { + return set.RemoveItem + } + return nil + }) + Expect(s.Contains(1)).To(BeFalse()) + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should support stopping iteration", func() { + iterationStarted := false + s.Iter(func(item interface{}) error { + if iterationStarted { + Fail("Iteration continued after stop") + } + iterationStarted = true + return set.StopIteration + }) + Expect(s.Contains(1)).To(BeTrue()) + Expect(s.Contains(2)).To(BeTrue()) + }) + It("can copy a Set", func() { + c := s.Copy() + Expect(c.Len()).To(Equal(s.Len())) + Expect(c).NotTo(BeIdenticalTo(s)) // Check they're not the same object. + Expect(c).To(Equal(s)) // DeepEquals, will check the contents. + }) + It("should correctly determine set equality", func() { + c := s.Copy() + Expect(c.Equals(s)).To(BeTrue()) + Expect(s.Equals(c)).To(BeTrue()) + c.Add(3) + Expect(c.Equals(s)).To(BeFalse()) + Expect(s.Equals(c)).To(BeFalse()) + c.Discard(2) + Expect(c.Equals(s)).To(BeFalse()) + Expect(s.Equals(c)).To(BeFalse()) + c.Add(2) + c.Discard(3) + Expect(c.Equals(s)).To(BeTrue()) + Expect(s.Equals(c)).To(BeTrue()) + }) + + Describe("after removing 2", func() { + BeforeEach(func() { + s.Discard(2) + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should not contain 2", func() { + Expect(s.Contains(2)).To(BeFalse()) + }) + It("should not contain 3", func() { + Expect(s.Contains(3)).To(BeFalse()) + }) + }) + Describe("after using AddAll to add 2, 3, 4", func() { + BeforeEach(func() { + s.AddAll([]int{2, 3, 4}) + }) + It("should contain 1", func() { + Expect(s.Contains(1)).To(BeTrue()) + }) + It("should contain 2", func() { + Expect(s.Contains(2)).To(BeTrue()) + }) + It("should contain 3", func() { + Expect(s.Contains(3)).To(BeTrue()) + }) + It("should contain 4", func() { + Expect(s.Contains(4)).To(BeTrue()) + }) + }) + + Describe("after Clear()", func() { + BeforeEach(func() { + s.Clear() + }) + It("should be empty", func() { + Expect(s.Len()).To(BeZero()) + }) + }) + }) +}) + +var _ = Describe("EmptySet", func() { + var empty set.Set + BeforeEach(func() { + empty = set.Empty() + }) + It("has length 0", func() { + Expect(empty.Len()).To(Equal(0)) + }) + It("should panic on add", func() { + Expect(func() { empty.Add("foo") }).To(Panic()) + }) + It("should ignore discard", func() { + Expect(func() { empty.Discard("foo") }).NotTo(Panic()) + }) + It("should iterate 0 times", func() { + empty.Iter(func(item interface{}) error { + Fail("Iterated > 0 times") + return nil + }) + }) +}) From 94021d7d2dc1118f2a39dcc42237d0186c774f25 Mon Sep 17 00:00:00 2001 From: Neil Jerram Date: Tue, 4 Jul 2017 17:27:34 +0100 Subject: [PATCH 2/2] Update to reviewed health code --- .gitignore | 3 +- lib/health/health.go | 188 +++++++++++++++++++------------------- lib/health/health_test.go | 103 ++++++++++----------- lib/set/set_suite_test.go | 5 +- 4 files changed, 145 insertions(+), 154 deletions(-) diff --git a/.gitignore b/.gitignore index 7e1aa8a82..88ba36ed7 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ bin *.coverprofile vendor .idea -.go-pkg-cache \ No newline at end of file +.go-pkg-cache +junit.xml diff --git a/lib/health/health.go b/lib/health/health.go index 0376eb931..ff4551922 100644 --- a/lib/health/health.go +++ b/lib/health/health.go @@ -21,144 +21,142 @@ import ( "time" log "github.com/Sirupsen/logrus" - "github.com/projectcalico/libcalico-go/lib/set" ) -// Any kind of value that can be used as a map key and is unique across multiple packages. For -// example, "type myHealthSource string". -type HealthSource interface{} - -type HealthIndicator struct { - // The source of this health indicator. - Source HealthSource - - // How long the indicator is valid for. In other words, if it continues operating normally, - // the source expects to refresh this indicator before this timeout. - Timeout time.Duration +// The HealthReport struct has slots for the levels of health that we monitor and aggregate. +type HealthReport struct { + Live bool + Ready bool } -// For a component that provides health indications, return the sources that it provides to indicate -// readiness, and those that it provides to indicate liveness. -type HealthProvider interface { - ReadySources() []HealthSource - LiveSources() []HealthSource -} +type reporterState struct { + // The health indicators that this reporter reports. + reports HealthReport + + // Expiry time for this reporter's reports. + timeout time.Duration -type HealthState struct { - // Whether we are overall 'ready'. - ready bool + // The most recent report. + latest HealthReport - // Whether we are overall 'live'. - live bool + // Time of that most recent report. + timestamp time.Time +} - // Mutex used to protect against concurrently reading and writing those attributes. +// A HealthAggregator receives health reports from individual reporters (which are typically +// components of a particular daemon or application) and aggregates them into an overall health +// summary. For each monitored kind of health, all of the reporters that report that need to say +// that it is good; for example, to be 'ready' overall, all of the reporters that report readiness +// need to have recently said 'Ready: true'. +type HealthAggregator struct { + // Mutex to protect concurrent access to this health aggregator. mutex *sync.Mutex + + // Map from reporter name to corresponding state. + reporters map[string]*reporterState } -func (state *HealthState) Ready() bool { - state.mutex.Lock() - defer state.mutex.Unlock() - return state.ready +// RegisterReporter registers a reporter with a HealthAggregator. The aggregator uses NAME to +// identify the reporter. REPORTS indicates the kinds of health that this reporter will report. +// TIMEOUT is the expiry time for this reporter's reports; the implication of which is that the +// reporter should normally refresh its reports well before this time has expired. +func (aggregator *HealthAggregator) RegisterReporter(name string, reports *HealthReport, timeout time.Duration) { + aggregator.mutex.Lock() + defer aggregator.mutex.Unlock() + aggregator.reporters[name] = &reporterState{ + reports: *reports, + timeout: timeout, + latest: HealthReport{Live: true}, + timestamp: time.Now(), + } + return } -func (state *HealthState) Live() bool { - state.mutex.Lock() - defer state.mutex.Unlock() - return state.live +// Report reports current health from a reporter to a HealthAggregator. NAME is the reporter's name +// and REPORTS conveys the current status, for each kind of health that the reporter said it was +// going to report when it called RegisterReporter. +func (aggregator *HealthAggregator) Report(name string, report *HealthReport) { + aggregator.mutex.Lock() + defer aggregator.mutex.Unlock() + reporter := aggregator.reporters[name] + reporter.latest = *report + reporter.timestamp = time.Now() + return } -func NewHealthState() *HealthState { - // Start as 'live' but not 'ready'. - return &HealthState{ready: false, live: true, mutex: &sync.Mutex{}} +func NewHealthAggregator() *HealthAggregator { + return &HealthAggregator{mutex: &sync.Mutex{}, reporters: map[string]*reporterState{}} } -func MonitorHealth( - state *HealthState, - neededForReady set.Set, - neededForLive set.Set, - c <-chan HealthIndicator, -) { - currentHealth := set.New() - timer := map[HealthSource]*time.Timer{} - timeoutC := make(chan HealthSource) +// Summary calculates the current overall health for a HealthAggregator. +func (aggregator *HealthAggregator) Summary() *HealthReport { + aggregator.mutex.Lock() + defer aggregator.mutex.Unlock() - for { - select { - case indicator, ok := <-c: - if !ok { - log.Warningf("Health channel closed") - state.mutex.Lock() - state.ready = false - state.live = false - state.mutex.Unlock() - return - } - log.WithField("source", indicator.Source).Debug("Health indicator current") - if timer[indicator.Source] != nil { - timer[indicator.Source].Stop() - } - if indicator.Timeout > 0 { - currentHealth.Add(indicator.Source) - timer[indicator.Source] = time.AfterFunc(indicator.Timeout, func() { - timeoutC <- indicator.Source - }) - } else { - // Shortcut immediate timeout. A health source can use an - // indication with zero timeout to cancel a previous indication that - // might otherwise take a long time to expire. - log.WithField("source", indicator.Source).Debug("Health indicator cancelled") - currentHealth.Discard(indicator.Source) - } - case source := <-timeoutC: - log.WithField("source", source).Debug("Health indicator expired") - currentHealth.Discard(source) - } - state.mutex.Lock() - state.ready = currentHealth.ContainsAll(neededForReady) - state.live = currentHealth.ContainsAll(neededForLive) + // In the absence of any reporters, default to indicating that we are both live and ready. + summary := &HealthReport{Live: true, Ready: true} + + // Now for each reporter... + for name, reporter := range aggregator.reporters { log.WithFields(log.Fields{ - "ready": state.ready, - "live": state.live, - }).Debug("Health now") - state.mutex.Unlock() + "name": name, + "reporter-state": reporter, + }).Debug("Detailed health state") + + // Reset Live to false if that reporter is registered to report liveness and hasn't + // recently said that it is live. + if summary.Live && reporter.reports.Live && (!reporter.latest.Live || + (time.Since(reporter.timestamp) > reporter.timeout)) { + summary.Live = false + } + + // Reset Ready to false if that reporter is registered to report readiness and + // hasn't recently said that it is ready. + if summary.Ready && reporter.reports.Ready && (!reporter.latest.Ready || + (time.Since(reporter.timestamp) > reporter.timeout)) { + summary.Ready = false + } } + + log.WithField("summary", summary).Info("Overall health") + return summary } const ( // The HTTP status that we use for 'ready' or 'live'. 204 means "No Content: The server // successfully processed the request and is not returning any content." (Kubernetes // interpets any 200<=status<400 as 'good'.) - STATUS_GOOD = 204 + StatusGood = 204 // The HTTP status that we use for 'not ready' or 'not live'. 503 means "Service // Unavailable: The server is currently unavailable (because it is overloaded or down for // maintenance). Generally, this is a temporary state." (Kubernetes interpets any // status>=400 as 'bad'.) - STATUS_BAD = 503 + StatusBad = 503 ) -func ServeHealth(port int, neededForReady set.Set, neededForLive set.Set, c <-chan HealthIndicator) { - - state := NewHealthState() - - go MonitorHealth(state, neededForReady, neededForLive, c) +// ServeHTTP publishes the current overall liveness and readiness at http://*:PORT/liveness and +// http://*:PORT/readiness respectively. A GET request on those URLs returns StatusGood or +// StatusBad, according to the current overall liveness or readiness. These endpoints are designed +// for use by Kubernetes liveness and readiness probes. +func (aggregator *HealthAggregator) ServeHTTP(port int) { log.WithField("port", port).Info("Starting health endpoints") http.HandleFunc("/readiness", func(rsp http.ResponseWriter, req *http.Request) { log.Debug("GET /readiness") - status := STATUS_BAD - if state.Ready() { + status := StatusBad + if aggregator.Summary().Ready { log.Debug("Felix is ready") - status = STATUS_GOOD + status = StatusGood } rsp.WriteHeader(status) }) http.HandleFunc("/liveness", func(rsp http.ResponseWriter, req *http.Request) { log.Debug("GET /liveness") - status := STATUS_BAD - if state.Live() { + status := StatusBad + if aggregator.Summary().Live { log.Debug("Felix is live") - status = STATUS_GOOD + status = StatusGood } rsp.WriteHeader(status) }) diff --git a/lib/health/health_test.go b/lib/health/health_test.go index 8d45974b3..59475d84b 100644 --- a/lib/health/health_test.go +++ b/lib/health/health_test.go @@ -20,96 +20,85 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/projectcalico/libcalico-go/lib/health" - "github.com/projectcalico/libcalico-go/lib/set" ) -type healthSource string - -var ( - SOURCE1 = healthSource("source1") - SOURCE2 = healthSource("source2") - SOURCE3 = healthSource("source3") +const ( + SOURCE1 = "source1" + SOURCE2 = "source2" + SOURCE3 = "source3" ) var _ = Describe("Health", func() { var ( - healthChannel chan health.HealthIndicator - state *health.HealthState + aggregator *health.HealthAggregator ) - notifySource := func(source healthSource) func() { + notifySource := func(source string) func() { return func() { - healthChannel <- health.HealthIndicator{source, 1 * time.Second} + switch source { + case SOURCE1: + aggregator.Report(source, &health.HealthReport{Ready: true}) + case SOURCE2: + aggregator.Report(source, &health.HealthReport{Live: true, Ready: true}) + case SOURCE3: + aggregator.Report(source, &health.HealthReport{Live: true}) + } } } - cancelSource := func(source healthSource) func() { + cancelSource := func(source string) func() { return func() { - healthChannel <- health.HealthIndicator{source, 0} + aggregator.Report(source, &health.HealthReport{Live: false, Ready: false}) } } BeforeEach(func() { - healthChannel = make(chan health.HealthIndicator) - // Note: use a new HealthState, in each test. Otherwise what can happen is that the - // closing goroutine from the previous test changes it and confuses the test that is - // running now... - state = health.NewHealthState() - - go health.MonitorHealth( - state, - set.From(SOURCE1, SOURCE2), - set.From(SOURCE2, SOURCE3), - healthChannel, - ) - }) - - AfterEach(func() { - close(healthChannel) - Eventually(state.Ready).Should(BeFalse()) - Eventually(state.Live).Should(BeFalse()) + aggregator = health.NewHealthAggregator() + aggregator.RegisterReporter(SOURCE1, &health.HealthReport{Ready: true}, 1*time.Second) + aggregator.RegisterReporter(SOURCE2, &health.HealthReport{Live: true, Ready: true}, 1*time.Second) + aggregator.RegisterReporter(SOURCE3, &health.HealthReport{Live: true}, 1*time.Second) }) - It("initially reports live but not ready", func() { - Expect(state.Ready()).To(BeFalse()) - Expect(state.Live()).To(BeTrue()) + It("is initially live but not ready", func() { + Expect(aggregator.Summary().Ready).To(BeFalse()) + Expect(aggregator.Summary().Live).To(BeTrue()) }) - Context("with indicators for readiness sources", func() { + Context("with ready reports", func() { BeforeEach(func() { notifySource(SOURCE1)() notifySource(SOURCE2)() }) - It("is ready but not live", func() { - Eventually(state.Ready).Should(BeTrue()) - Expect(state.Live()).To(BeFalse()) + It("is ready and live", func() { + Expect(aggregator.Summary().Ready).To(BeTrue()) + Expect(aggregator.Summary().Live).To(BeTrue()) }) - Context("with liveness source also", func() { + Context("with live report", func() { BeforeEach(notifySource(SOURCE3)) It("is ready and live", func() { - Eventually(state.Ready).Should(BeTrue()) - Eventually(state.Live).Should(BeTrue()) + Expect(aggregator.Summary().Ready).To(BeTrue()) + Expect(aggregator.Summary().Live).To(BeTrue()) }) }) - Context("with a source cancelled", func() { + Context("with not-ready report", func() { BeforeEach(cancelSource(SOURCE1)) - It("is not ready and not live", func() { - Eventually(state.Ready).Should(BeFalse()) - Eventually(state.Live).Should(BeFalse()) + It("is live but not ready", func() { + Expect(aggregator.Summary().Ready).To(BeFalse()) + Expect(aggregator.Summary().Live).To(BeTrue()) }) }) }) - Context("with indicators for liveness sources", func() { + Context("with live reports", func() { BeforeEach(func() { notifySource(SOURCE3)() @@ -117,39 +106,39 @@ var _ = Describe("Health", func() { }) It("is live but not ready", func() { - Eventually(state.Live).Should(BeTrue()) - Expect(state.Ready()).To(BeFalse()) + Expect(aggregator.Summary().Live).To(BeTrue()) + Expect(aggregator.Summary().Ready).To(BeFalse()) }) - Context("with readiness source also", func() { + Context("with ready report also", func() { BeforeEach(notifySource(SOURCE1)) It("is ready and live", func() { - Eventually(state.Ready).Should(BeTrue()) - Eventually(state.Live).Should(BeTrue()) + Expect(aggregator.Summary().Ready).To(BeTrue()) + Expect(aggregator.Summary().Live).To(BeTrue()) }) - Context("with time passing so that indicators expire", func() { + Context("with time passing so that reports expire", func() { BeforeEach(func() { time.Sleep(2 * time.Second) }) It("is not ready and not live", func() { - Eventually(state.Ready).Should(BeFalse()) - Eventually(state.Live).Should(BeFalse()) + Expect(aggregator.Summary().Ready).To(BeFalse()) + Expect(aggregator.Summary().Live).To(BeFalse()) }) }) }) - Context("with a source cancelled", func() { + Context("with not-live report", func() { BeforeEach(cancelSource(SOURCE3)) It("is not ready and not live", func() { - Eventually(state.Ready).Should(BeFalse()) - Eventually(state.Live).Should(BeFalse()) + Expect(aggregator.Summary().Ready).To(BeFalse()) + Expect(aggregator.Summary().Live).To(BeFalse()) }) }) }) diff --git a/lib/set/set_suite_test.go b/lib/set/set_suite_test.go index 072c66c80..fd3be1fb8 100644 --- a/lib/set/set_suite_test.go +++ b/lib/set/set_suite_test.go @@ -20,6 +20,8 @@ import ( "testing" + "github.com/onsi/ginkgo/reporters" + "github.com/projectcalico/libcalico-go/lib/testutils" ) @@ -29,5 +31,6 @@ func init() { func TestSet(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Set Suite") + junitReporter := reporters.NewJUnitReporter("junit.xml") + RunSpecsWithDefaultAndCustomReporters(t, "Set Suite", []Reporter{junitReporter}) }