Skip to content

Commit

Permalink
Add health check endpoint
Browse files Browse the repository at this point in the history
For now the only source of 'health' is a source, e.g. in this draft for kinesis:

* We keep track of events currently being processed downstream. If there is any message in memory that hasn't been acked for a while (exceeding certain configured threshold) == unhealthy.
* We check if underlying kinsumer client attempts to fetch records from Kinesis. Even we have no records on input - it's fine, we just have to know if kinsumer is not stuck for some unknown reason. If there is no fetch coming from kinsumer in a while (exceeding certain configured threshold) == unhealthy.

Health is exposed through `/health` HTTP endpoint.
  • Loading branch information
pondzix committed Jan 29, 2025
1 parent 4bbe884 commit 5108ac9
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 4 deletions.
17 changes: 17 additions & 0 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup
}
o.Start()

runHealthServer(s)

stopTelemetry := telemetry.InitTelemetryWithCollector(cfg)

// Handle SIGTERM
Expand Down Expand Up @@ -340,3 +342,18 @@ func exitWithError(err error, flushSentry bool) {
}
os.Exit(1)
}

func runHealthServer(source sourceiface.Source) {
healthServer := http.NewServeMux()
healthServer.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
status := source.Health()
if status.IsHealthy {
w.Write([]byte("ok"))
} else {
log.Warnf("Service is unhealthy, reson: %s\n", status.Message)
w.WriteHeader(http.StatusServiceUnavailable)
}
})

go http.ListenAndServe(":9000", healthServer)
}
4 changes: 4 additions & 0 deletions pkg/source/inmemory/in_memory_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,7 @@ func (ss *inMemorySource) Stop() {
func (ss *inMemorySource) GetID() string {
return "inMemory"
}

func (ss *inMemorySource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}
4 changes: 4 additions & 0 deletions pkg/source/kafka/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,7 @@ func newKafkaSourceWithInterfaces(client sarama.ConsumerGroup, s *kafkaSource) (
func (ks *kafkaSource) GetID() string {
return fmt.Sprintf("brokers:%s:topic:%s", ks.brokers, ks.topic)
}

func (ks *kafkaSource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}
75 changes: 71 additions & 4 deletions pkg/source/kinesis/kinesis_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package kinesissource

import (
"fmt"
"maps"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -56,8 +58,11 @@ type kinesisSource struct {
concurrentWrites int
region string
accountID string

log *log.Entry
statsReceiver *kinsumerActivityRecorder
unackedMsgs map[string]int64
maxLatency time.Duration
mutex sync.Mutex
log *log.Entry
}

// -- Config
Expand Down Expand Up @@ -180,13 +185,15 @@ func newKinesisSourceWithInterfaces(
leaderActionFreq int,
clientName string) (*kinesisSource, error) {

statsReceiver := &kinsumerActivityRecorder{}
config := kinsumer.NewConfig().
WithShardCheckFrequency(time.Duration(shardCheckFreq) * time.Second).
WithLeaderActionFrequency(time.Duration(leaderActionFreq) * time.Second).
WithManualCheckpointing(true).
WithLogger(&KinsumerLogrus{}).
WithIteratorStartTimestamp(startTimestamp).
WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond)
WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond).
WithStats(statsReceiver) // to record kinsumer activity and check it's not stuc, see `EventsFromKinesis` function implementation.

k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, clientName, config)
if err != nil {
Expand All @@ -200,6 +207,10 @@ func newKinesisSourceWithInterfaces(
region: region,
accountID: awsAccountID,
log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}),
statsReceiver: statsReceiver,
unackedMsgs: make(map[string]int64, concurrentWrites),
maxLatency: time.Duration(5) * time.Minute, //make configurable
mutex: sync.Mutex{}, //to protect our map of unacked messages in case of concurrent access
}, nil
}

Expand All @@ -224,10 +235,12 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error {
}

timePulled := time.Now().UTC()
randomUUID := uuid.New().String()

ackFunc := func() {
ks.log.Debugf("Ack'ing record with SequenceNumber: %s", *record.SequenceNumber)
checkpointer()
ks.removeUnacked(randomUUID)
}

if record != nil {
Expand All @@ -236,13 +249,14 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error {
messages := []*models.Message{
{
Data: record.Data,
PartitionKey: uuid.New().String(),
PartitionKey: randomUUID,
AckFunc: ackFunc,
TimeCreated: timeCreated,
TimePulled: timePulled,
},
}

ks.addUnacked(randomUUID, timePulled)
throttle <- struct{}{}
wg.Add(1)
go func() {
Expand Down Expand Up @@ -296,3 +310,56 @@ func (ks *kinesisSource) Stop() {
func (ks *kinesisSource) GetID() string {
return fmt.Sprintf("arn:aws:kinesis:%s:%s:stream/%s", ks.region, ks.accountID, ks.streamName)
}

type kinsumerActivityRecorder struct {
lastLiveness *time.Time
}

func (ks *kinsumerActivityRecorder) Checkpoint() {}
func (ks *kinsumerActivityRecorder) EventToClient(inserted, retrieved time.Time) {}

// Called every time after successful fetch executed by kinsumer, even if it a number of records is zero
func (ks *kinsumerActivityRecorder) EventsFromKinesis(num int, shardID string, lag time.Duration) {
now := time.Now().UTC()
ks.lastLiveness = &now
}

func (ks *kinesisSource) Health() sourceiface.HealthStatus {
ks.mutex.Lock()
defer ks.mutex.Unlock()

oldestAllowedTimestamp := time.Now().UTC().Add(-ks.maxLatency).UnixMilli()

// first check if there is anything pending in memory unacked...
unackedTimestamps := slices.Collect(maps.Values(ks.unackedMsgs))
if len(unackedTimestamps) > 0 {
oldestUnacked := slices.Min(unackedTimestamps)
if oldestAllowedTimestamp > oldestUnacked {
return sourceiface.Unhealthy("There is some stuck message being processed now for a while....")
}
}

// if there is nothing left unacked, let's check if kinsumer is healthy and not stuck...
if ks.statsReceiver.lastLiveness == nil {
return sourceiface.Unhealthy("We never recorded any activity from kinsumer...")
}

// There's been some activity, but it's been quite for a while since now.
if oldestAllowedTimestamp > ks.statsReceiver.lastLiveness.UnixMilli() {
return sourceiface.Unhealthy("We haven't recorded any activity from kinsumer for a while...")
}

return sourceiface.Healthy()
}

func (ks *kinesisSource) addUnacked(id string, timestamp time.Time) {
ks.mutex.Lock()
ks.unackedMsgs[id] = timestamp.UnixMilli()
ks.mutex.Unlock()
}

func (ks *kinesisSource) removeUnacked(id string) {
ks.mutex.Lock()
delete(ks.unackedMsgs, id)
ks.mutex.Unlock()
}
4 changes: 4 additions & 0 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,7 @@ func (ps *pubSubSource) Stop() {
func (ps *pubSubSource) GetID() string {
return fmt.Sprintf("projects/%s/subscriptions/%s", ps.projectID, ps.subscriptionID)
}

func (ps *pubSubSource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}
17 changes: 17 additions & 0 deletions pkg/source/sourceiface/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,26 @@

package sourceiface

type HealthStatus struct {
IsHealthy bool
Message string
}

func Unhealthy(message string) HealthStatus {
return HealthStatus {
IsHealthy: false,
Message: message,
}
}

func Healthy() HealthStatus {
return HealthStatus{IsHealthy: true}
}

// Source describes the interface for how to read the data pulled from the source
type Source interface {
Read(sf *SourceFunctions) error
Stop()
GetID() string
Health() HealthStatus
}
5 changes: 5 additions & 0 deletions pkg/source/sqs/sqs_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,8 @@ func (ss *sqsSource) Stop() {
func (ss *sqsSource) GetID() string {
return fmt.Sprintf("arn:aws:sqs:%s:%s:%s", ss.region, ss.accountID, ss.queueName)
}

func (ss *sqsSource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}

4 changes: 4 additions & 0 deletions pkg/source/stdin/stdin_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,7 @@ func (ss *stdinSource) Stop() {
func (ss *stdinSource) GetID() string {
return "stdin"
}

func (ss *stdinSource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}

0 comments on commit 5108ac9

Please sign in to comment.