Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add health check endpoint #395

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup
}
o.Start()

runHealthServer(s)

stopTelemetry := telemetry.InitTelemetryWithCollector(cfg)

// Handle SIGTERM
Expand Down Expand Up @@ -340,3 +342,18 @@ func exitWithError(err error, flushSentry bool) {
}
os.Exit(1)
}

func runHealthServer(source sourceiface.Source) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sharing this just for interest:

In common-streams the health probe actually responds to all requests with a ok. Not just requests to the /health endpoint.

That might be a mistake in common-streams: time will tell! I did it because it felt strange to hard-code the completely arbitrary string /health.

healthServer := http.NewServeMux()
healthServer.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
status := source.Health()
if status.IsHealthy {
w.Write([]byte("ok"))
} else {
log.Warnf("Service is unhealthy, reson: %s\n", status.Message)
w.WriteHeader(http.StatusServiceUnavailable)
}
})

go http.ListenAndServe(":9000", healthServer)
}
4 changes: 4 additions & 0 deletions pkg/source/inmemory/in_memory_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,7 @@ func (ss *inMemorySource) Stop() {
func (ss *inMemorySource) GetID() string {
return "inMemory"
}

func (ss *inMemorySource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}
4 changes: 4 additions & 0 deletions pkg/source/kafka/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,7 @@ func newKafkaSourceWithInterfaces(client sarama.ConsumerGroup, s *kafkaSource) (
func (ks *kafkaSource) GetID() string {
return fmt.Sprintf("brokers:%s:topic:%s", ks.brokers, ks.topic)
}

func (ks *kafkaSource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}
75 changes: 71 additions & 4 deletions pkg/source/kinesis/kinesis_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package kinesissource

import (
"fmt"
"maps"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -56,8 +58,11 @@ type kinesisSource struct {
concurrentWrites int
region string
accountID string

log *log.Entry
statsReceiver *kinsumerActivityRecorder
unackedMsgs map[string]int64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider making this map[uuid.UUID]int64? It looks like the keys are always stringified UUIDs.

maxLatency time.Duration
mutex sync.Mutex
log *log.Entry
}

// -- Config
Expand Down Expand Up @@ -180,13 +185,15 @@ func newKinesisSourceWithInterfaces(
leaderActionFreq int,
clientName string) (*kinesisSource, error) {

statsReceiver := &kinsumerActivityRecorder{}
config := kinsumer.NewConfig().
WithShardCheckFrequency(time.Duration(shardCheckFreq) * time.Second).
WithLeaderActionFrequency(time.Duration(leaderActionFreq) * time.Second).
WithManualCheckpointing(true).
WithLogger(&KinsumerLogrus{}).
WithIteratorStartTimestamp(startTimestamp).
WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond)
WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond).
WithStats(statsReceiver) // to record kinsumer activity and check it's not stuck, see `EventsFromKinesis` function implementation.

k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, clientName, config)
if err != nil {
Expand All @@ -200,6 +207,10 @@ func newKinesisSourceWithInterfaces(
region: region,
accountID: awsAccountID,
log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}),
statsReceiver: statsReceiver,
unackedMsgs: make(map[string]int64, concurrentWrites),
maxLatency: time.Duration(5) * time.Minute, //make configurable
mutex: sync.Mutex{}, //to protect our map of unacked messages in case of concurrent access
}, nil
}

Expand All @@ -224,10 +235,12 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error {
}

timePulled := time.Now().UTC()
randomUUID := uuid.New().String()

ackFunc := func() {
ks.log.Debugf("Ack'ing record with SequenceNumber: %s", *record.SequenceNumber)
checkpointer()
ks.removeUnacked(randomUUID)
Comment on lines 242 to +243

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How deeply to you understand what checkpointer() does? Does it block until this record is actually checkpointed to dynamodb? Or does it return immediately, so kinsumer can checkpoint it later?

From conversations I've had with others, I think it might be the latter.

If it fails to checkpoint later, then what does kinsumer do next? Does it stop calling the EventsFromKinesis function?

Does any of this matter? I think possibly no.... because I think your health checkpoint endpoint works correctly anyway. But it's worth thinking about.

}

if record != nil {
Expand All @@ -236,13 +249,14 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error {
messages := []*models.Message{
{
Data: record.Data,
PartitionKey: uuid.New().String(),
PartitionKey: randomUUID,
AckFunc: ackFunc,
TimeCreated: timeCreated,
TimePulled: timePulled,
},
}

ks.addUnacked(randomUUID, timePulled)
throttle <- struct{}{}
wg.Add(1)
go func() {
Expand Down Expand Up @@ -296,3 +310,56 @@ func (ks *kinesisSource) Stop() {
func (ks *kinesisSource) GetID() string {
return fmt.Sprintf("arn:aws:kinesis:%s:%s:stream/%s", ks.region, ks.accountID, ks.streamName)
}

type kinsumerActivityRecorder struct {
lastLiveness *time.Time
}

func (ks *kinsumerActivityRecorder) Checkpoint() {}
func (ks *kinsumerActivityRecorder) EventToClient(inserted, retrieved time.Time) {}

// Called every time after successful fetch executed by kinsumer, even if it a number of records is zero
func (ks *kinsumerActivityRecorder) EventsFromKinesis(num int, shardID string, lag time.Duration) {
now := time.Now().UTC()
ks.lastLiveness = &now
}

func (ks *kinesisSource) Health() sourceiface.HealthStatus {
ks.mutex.Lock()
defer ks.mutex.Unlock()

oldestAllowedTimestamp := time.Now().UTC().Add(-ks.maxLatency).UnixMilli()

// first check if there is anything pending in memory unacked...
unackedTimestamps := slices.Collect(maps.Values(ks.unackedMsgs))
if len(unackedTimestamps) > 0 {
oldestUnacked := slices.Min(unackedTimestamps)
if oldestAllowedTimestamp > oldestUnacked {
return sourceiface.Unhealthy("There is some stuck message being processed now for a while....")
}
}

// if there is nothing left unacked, let's check if kinsumer is healthy and not stuck...
if ks.statsReceiver.lastLiveness == nil {
return sourceiface.Unhealthy("We never recorded any activity from kinsumer...")
}

// There's been some activity, but it's been quite for a while since now.
if oldestAllowedTimestamp > ks.statsReceiver.lastLiveness.UnixMilli() {
return sourceiface.Unhealthy("We haven't recorded any activity from kinsumer for a while...")
}

return sourceiface.Healthy()
}

func (ks *kinesisSource) addUnacked(id string, timestamp time.Time) {
ks.mutex.Lock()
ks.unackedMsgs[id] = timestamp.UnixMilli()
ks.mutex.Unlock()
}

func (ks *kinesisSource) removeUnacked(id string) {
ks.mutex.Lock()
delete(ks.unackedMsgs, id)
ks.mutex.Unlock()
}
4 changes: 4 additions & 0 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,7 @@ func (ps *pubSubSource) Stop() {
func (ps *pubSubSource) GetID() string {
return fmt.Sprintf("projects/%s/subscriptions/%s", ps.projectID, ps.subscriptionID)
}

func (ps *pubSubSource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}
17 changes: 17 additions & 0 deletions pkg/source/sourceiface/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,26 @@

package sourceiface

type HealthStatus struct {
IsHealthy bool
Message string
}

func Unhealthy(message string) HealthStatus {
return HealthStatus {
IsHealthy: false,
Message: message,
}
}

func Healthy() HealthStatus {
return HealthStatus{IsHealthy: true}
}

// Source describes the interface for how to read the data pulled from the source
type Source interface {
Read(sf *SourceFunctions) error
Stop()
GetID() string
Health() HealthStatus
}
5 changes: 5 additions & 0 deletions pkg/source/sqs/sqs_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,8 @@ func (ss *sqsSource) Stop() {
func (ss *sqsSource) GetID() string {
return fmt.Sprintf("arn:aws:sqs:%s:%s:%s", ss.region, ss.accountID, ss.queueName)
}

func (ss *sqsSource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}

4 changes: 4 additions & 0 deletions pkg/source/stdin/stdin_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,7 @@ func (ss *stdinSource) Stop() {
func (ss *stdinSource) GetID() string {
return "stdin"
}

func (ss *stdinSource) Health() sourceiface.HealthStatus {
return sourceiface.HealthStatus{ IsHealthy: true}
}
Loading