-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add health check endpoint #395
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,8 @@ package kinesissource | |
|
||
import ( | ||
"fmt" | ||
"maps" | ||
"slices" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -56,8 +58,11 @@ type kinesisSource struct { | |
concurrentWrites int | ||
region string | ||
accountID string | ||
|
||
log *log.Entry | ||
statsReceiver *kinsumerActivityRecorder | ||
unackedMsgs map[string]int64 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you consider making this |
||
maxLatency time.Duration | ||
mutex sync.Mutex | ||
log *log.Entry | ||
} | ||
|
||
// -- Config | ||
|
@@ -180,13 +185,15 @@ func newKinesisSourceWithInterfaces( | |
leaderActionFreq int, | ||
clientName string) (*kinesisSource, error) { | ||
|
||
statsReceiver := &kinsumerActivityRecorder{} | ||
config := kinsumer.NewConfig(). | ||
WithShardCheckFrequency(time.Duration(shardCheckFreq) * time.Second). | ||
WithLeaderActionFrequency(time.Duration(leaderActionFreq) * time.Second). | ||
WithManualCheckpointing(true). | ||
WithLogger(&KinsumerLogrus{}). | ||
WithIteratorStartTimestamp(startTimestamp). | ||
WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond) | ||
WithThrottleDelay(time.Duration(readThrottleDelay) * time.Millisecond). | ||
WithStats(statsReceiver) // to record kinsumer activity and check it's not stuck, see `EventsFromKinesis` function implementation. | ||
|
||
k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, clientName, config) | ||
if err != nil { | ||
|
@@ -200,6 +207,10 @@ func newKinesisSourceWithInterfaces( | |
region: region, | ||
accountID: awsAccountID, | ||
log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}), | ||
statsReceiver: statsReceiver, | ||
unackedMsgs: make(map[string]int64, concurrentWrites), | ||
maxLatency: time.Duration(5) * time.Minute, //make configurable | ||
mutex: sync.Mutex{}, //to protect our map of unacked messages in case of concurrent access | ||
}, nil | ||
} | ||
|
||
|
@@ -224,10 +235,12 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error { | |
} | ||
|
||
timePulled := time.Now().UTC() | ||
randomUUID := uuid.New().String() | ||
|
||
ackFunc := func() { | ||
ks.log.Debugf("Ack'ing record with SequenceNumber: %s", *record.SequenceNumber) | ||
checkpointer() | ||
ks.removeUnacked(randomUUID) | ||
Comment on lines
242
to
+243
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How deeply to you understand what From conversations I've had with others, I think it might be the latter. If it fails to checkpoint later, then what does kinsumer do next? Does it stop calling the Does any of this matter? I think possibly no.... because I think your health checkpoint endpoint works correctly anyway. But it's worth thinking about. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand it quite well but did have to refresh my memory! You are correct, it's the latter. When So for example if you checkpoint a record with sequence number 5, it'll wait for sequence number 1-4 before it commits to DDB. This can cause blocking something like how you describe, but that's a slightly different case - if Snowbridge never calls There is a separate process to commit the checkpoint to DDB on regular intervals. If that goes wrong, kinsumer will return an error. This will in turn cause Snowbridge to encounter a source error and the app will crash. On reboot it will re-start and the new instance of kinsumer will start at the last sequence number that did get committed to DDB. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Imagine it didn't crash. For some unexpected reason. Bearing in mind... the whole reason you are working on this feature is to allow for unexpected scenarios. What I'm getting at is.... would the health endpoint become unhealthy in that scenario? |
||
} | ||
|
||
if record != nil { | ||
|
@@ -236,13 +249,14 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error { | |
messages := []*models.Message{ | ||
{ | ||
Data: record.Data, | ||
PartitionKey: uuid.New().String(), | ||
PartitionKey: randomUUID, | ||
AckFunc: ackFunc, | ||
TimeCreated: timeCreated, | ||
TimePulled: timePulled, | ||
}, | ||
} | ||
|
||
ks.addUnacked(randomUUID, timePulled) | ||
throttle <- struct{}{} | ||
wg.Add(1) | ||
go func() { | ||
|
@@ -296,3 +310,56 @@ func (ks *kinesisSource) Stop() { | |
func (ks *kinesisSource) GetID() string { | ||
return fmt.Sprintf("arn:aws:kinesis:%s:%s:stream/%s", ks.region, ks.accountID, ks.streamName) | ||
} | ||
|
||
type kinsumerActivityRecorder struct { | ||
lastLiveness *time.Time | ||
} | ||
|
||
func (ks *kinsumerActivityRecorder) Checkpoint() {} | ||
func (ks *kinsumerActivityRecorder) EventToClient(inserted, retrieved time.Time) {} | ||
|
||
// Called every time after successful fetch executed by kinsumer, even if it a number of records is zero | ||
func (ks *kinsumerActivityRecorder) EventsFromKinesis(num int, shardID string, lag time.Duration) { | ||
now := time.Now().UTC() | ||
ks.lastLiveness = &now | ||
} | ||
|
||
func (ks *kinesisSource) Health() sourceiface.HealthStatus { | ||
ks.mutex.Lock() | ||
defer ks.mutex.Unlock() | ||
|
||
oldestAllowedTimestamp := time.Now().UTC().Add(-ks.maxLatency).UnixMilli() | ||
|
||
// first check if there is anything pending in memory unacked... | ||
unackedTimestamps := slices.Collect(maps.Values(ks.unackedMsgs)) | ||
if len(unackedTimestamps) > 0 { | ||
oldestUnacked := slices.Min(unackedTimestamps) | ||
if oldestAllowedTimestamp > oldestUnacked { | ||
return sourceiface.Unhealthy("There is some stuck message being processed now for a while....") | ||
} | ||
} | ||
|
||
// if there is nothing left unacked, let's check if kinsumer is healthy and not stuck... | ||
if ks.statsReceiver.lastLiveness == nil { | ||
return sourceiface.Unhealthy("We never recorded any activity from kinsumer...") | ||
} | ||
|
||
// There's been some activity, but it's been quite for a while since now. | ||
if oldestAllowedTimestamp > ks.statsReceiver.lastLiveness.UnixMilli() { | ||
return sourceiface.Unhealthy("We haven't recorded any activity from kinsumer for a while...") | ||
} | ||
|
||
return sourceiface.Healthy() | ||
} | ||
|
||
func (ks *kinesisSource) addUnacked(id string, timestamp time.Time) { | ||
ks.mutex.Lock() | ||
ks.unackedMsgs[id] = timestamp.UnixMilli() | ||
ks.mutex.Unlock() | ||
} | ||
|
||
func (ks *kinesisSource) removeUnacked(id string) { | ||
ks.mutex.Lock() | ||
delete(ks.unackedMsgs, id) | ||
ks.mutex.Unlock() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sharing this just for interest:
In common-streams the health probe actually responds to all requests with a
ok
. Not just requests to the/health
endpoint.That might be a mistake in common-streams: time will tell! I did it because it felt strange to hard-code the completely arbitrary string
/health
.