diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go index fbd47047..bd710448 100644 --- a/cmd/cli/cli.go +++ b/cmd/cli/cli.go @@ -130,6 +130,8 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup } o.Start() + runHealthServer(s) + stopTelemetry := telemetry.InitTelemetryWithCollector(cfg) // Handle SIGTERM @@ -340,3 +342,18 @@ func exitWithError(err error, flushSentry bool) { } os.Exit(1) } + +func runHealthServer(source sourceiface.Source) { + healthServer := http.NewServeMux() + healthServer.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + status := source.Health() + if status.IsHealthy { + w.Write([]byte("ok")) + } else { + log.Warnf("Service is unhealthy, reson: %s\n", status.Message) + w.WriteHeader(http.StatusServiceUnavailable) + } + }) + + go http.ListenAndServe(":9000", healthServer) +} diff --git a/pkg/source/inmemory/in_memory_source.go b/pkg/source/inmemory/in_memory_source.go index 206a4a9a..d8328dac 100644 --- a/pkg/source/inmemory/in_memory_source.go +++ b/pkg/source/inmemory/in_memory_source.go @@ -115,3 +115,7 @@ func (ss *inMemorySource) Stop() { func (ss *inMemorySource) GetID() string { return "inMemory" } + +func (ss *inMemorySource) Health() sourceiface.HealthStatus { + return sourceiface.HealthStatus{ IsHealthy: true} +} diff --git a/pkg/source/kafka/kafka_source.go b/pkg/source/kafka/kafka_source.go index 757a949b..5c7373f5 100644 --- a/pkg/source/kafka/kafka_source.go +++ b/pkg/source/kafka/kafka_source.go @@ -295,3 +295,7 @@ func newKafkaSourceWithInterfaces(client sarama.ConsumerGroup, s *kafkaSource) ( func (ks *kafkaSource) GetID() string { return fmt.Sprintf("brokers:%s:topic:%s", ks.brokers, ks.topic) } + +func (ks *kafkaSource) Health() sourceiface.HealthStatus { + return sourceiface.HealthStatus{ IsHealthy: true} +} diff --git a/pkg/source/kinesis/kinesis_source.go b/pkg/source/kinesis/kinesis_source.go index 4ce1e7de..0e1d73cc 100644 --- a/pkg/source/kinesis/kinesis_source.go +++ b/pkg/source/kinesis/kinesis_source.go @@ -13,6 +13,8 @@ package kinesissource import ( "fmt" + "maps" + "slices" "sync" "time" @@ -56,8 +58,11 @@ type kinesisSource struct { concurrentWrites int region string accountID string - - log *log.Entry + statsReceiver *kinsumerActivityRecorder + unackedMsgs map[string]int64 + maxLatency time.Duration + mutex sync.Mutex + log *log.Entry } // -- Config @@ -180,13 +185,15 @@ func newKinesisSourceWithInterfaces( leaderActionFreq int, clientName string) (*kinesisSource, error) { + statsReceiver := &kinsumerActivityRecorder{} config := kinsumer.NewConfig(). WithShardCheckFrequency(time.Duration(shardCheckFreq) * time.Second). WithLeaderActionFrequency(time.Duration(leaderActionFreq) * time.Second). WithManualCheckpointing(true). WithLogger(&KinsumerLogrus{}). WithIteratorStartTimestamp(startTimestamp). - WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond) + WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond). + WithStats(statsReceiver) // to record kinsumer activity and check it's not stuc, see `EventsFromKinesis` function implementation. k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, clientName, config) if err != nil { @@ -200,6 +207,10 @@ func newKinesisSourceWithInterfaces( region: region, accountID: awsAccountID, log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}), + statsReceiver: statsReceiver, + unackedMsgs: make(map[string]int64, concurrentWrites), + maxLatency: time.Duration(5) * time.Minute, //make configurable + mutex: sync.Mutex{}, //to protect our map of unacked messages in case of concurrent access }, nil } @@ -224,10 +235,12 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error { } timePulled := time.Now().UTC() + randomUUID := uuid.New().String() ackFunc := func() { ks.log.Debugf("Ack'ing record with SequenceNumber: %s", *record.SequenceNumber) checkpointer() + ks.removeUnacked(randomUUID) } if record != nil { @@ -236,13 +249,14 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error { messages := []*models.Message{ { Data: record.Data, - PartitionKey: uuid.New().String(), + PartitionKey: randomUUID, AckFunc: ackFunc, TimeCreated: timeCreated, TimePulled: timePulled, }, } + ks.addUnacked(randomUUID, timePulled) throttle <- struct{}{} wg.Add(1) go func() { @@ -296,3 +310,56 @@ func (ks *kinesisSource) Stop() { func (ks *kinesisSource) GetID() string { return fmt.Sprintf("arn:aws:kinesis:%s:%s:stream/%s", ks.region, ks.accountID, ks.streamName) } + +type kinsumerActivityRecorder struct { + lastLiveness *time.Time +} + +func (ks *kinsumerActivityRecorder) Checkpoint() {} +func (ks *kinsumerActivityRecorder) EventToClient(inserted, retrieved time.Time) {} + +// Called every time after successful fetch executed by kinsumer, even if it a number of records is zero +func (ks *kinsumerActivityRecorder) EventsFromKinesis(num int, shardID string, lag time.Duration) { + now := time.Now().UTC() + ks.lastLiveness = &now +} + +func (ks *kinesisSource) Health() sourceiface.HealthStatus { + ks.mutex.Lock() + defer ks.mutex.Unlock() + + oldestAllowedTimestamp := time.Now().UTC().Add(-ks.maxLatency).UnixMilli() + + // first check if there is anything pending in memory unacked... + unackedTimestamps := slices.Collect(maps.Values(ks.unackedMsgs)) + if len(unackedTimestamps) > 0 { + oldestUnacked := slices.Min(unackedTimestamps) + if oldestAllowedTimestamp > oldestUnacked { + return sourceiface.Unhealthy("There is some stuck message being processed now for a while....") + } + } + + // if there is nothing left unacked, let's check if kinsumer is healthy and not stuck... + if ks.statsReceiver.lastLiveness == nil { + return sourceiface.Unhealthy("We never recorded any activity from kinsumer...") + } + + // There's been some activity, but it's been quite for a while since now. + if oldestAllowedTimestamp > ks.statsReceiver.lastLiveness.UnixMilli() { + return sourceiface.Unhealthy("We haven't recorded any activity from kinsumer for a while...") + } + + return sourceiface.Healthy() +} + +func (ks *kinesisSource) addUnacked(id string, timestamp time.Time) { + ks.mutex.Lock() + ks.unackedMsgs[id] = timestamp.UnixMilli() + ks.mutex.Unlock() +} + +func (ks *kinesisSource) removeUnacked(id string) { + ks.mutex.Lock() + delete(ks.unackedMsgs, id) + ks.mutex.Unlock() +} diff --git a/pkg/source/pubsub/pubsub_source.go b/pkg/source/pubsub/pubsub_source.go index 73d0e29e..39ce64a2 100644 --- a/pkg/source/pubsub/pubsub_source.go +++ b/pkg/source/pubsub/pubsub_source.go @@ -222,3 +222,7 @@ func (ps *pubSubSource) Stop() { func (ps *pubSubSource) GetID() string { return fmt.Sprintf("projects/%s/subscriptions/%s", ps.projectID, ps.subscriptionID) } + +func (ps *pubSubSource) Health() sourceiface.HealthStatus { + return sourceiface.HealthStatus{ IsHealthy: true} +} diff --git a/pkg/source/sourceiface/source.go b/pkg/source/sourceiface/source.go index ea1c0baf..67ab6249 100644 --- a/pkg/source/sourceiface/source.go +++ b/pkg/source/sourceiface/source.go @@ -11,9 +11,26 @@ package sourceiface +type HealthStatus struct { + IsHealthy bool + Message string +} + +func Unhealthy(message string) HealthStatus { + return HealthStatus { + IsHealthy: false, + Message: message, + } +} + +func Healthy() HealthStatus { + return HealthStatus{IsHealthy: true} +} + // Source describes the interface for how to read the data pulled from the source type Source interface { Read(sf *SourceFunctions) error Stop() GetID() string + Health() HealthStatus } diff --git a/pkg/source/sqs/sqs_source.go b/pkg/source/sqs/sqs_source.go index f0e3e1be..88eec687 100644 --- a/pkg/source/sqs/sqs_source.go +++ b/pkg/source/sqs/sqs_source.go @@ -254,3 +254,8 @@ func (ss *sqsSource) Stop() { func (ss *sqsSource) GetID() string { return fmt.Sprintf("arn:aws:sqs:%s:%s:%s", ss.region, ss.accountID, ss.queueName) } + +func (ss *sqsSource) Health() sourceiface.HealthStatus { + return sourceiface.HealthStatus{ IsHealthy: true} +} + diff --git a/pkg/source/stdin/stdin_source.go b/pkg/source/stdin/stdin_source.go index c88d2121..b77c35a6 100644 --- a/pkg/source/stdin/stdin_source.go +++ b/pkg/source/stdin/stdin_source.go @@ -139,3 +139,7 @@ func (ss *stdinSource) Stop() { func (ss *stdinSource) GetID() string { return "stdin" } + +func (ss *stdinSource) Health() sourceiface.HealthStatus { + return sourceiface.HealthStatus{ IsHealthy: true} +}