-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add health check endpoint #395
base: develop
Are you sure you want to change the base?
Conversation
For now the only source of 'health' is a source, e.g. in this draft for kinesis: * We keep track of events currently being processed downstream. If there is any message in memory that hasn't been acked for a while (exceeding certain configured threshold) == unhealthy. * We check if underlying kinsumer client attempts to fetch records from Kinesis. Even we have no records on input - it's fine, we just have to know if kinsumer is not stuck for some unknown reason. If there is no fetch coming from kinsumer in a while (exceeding certain configured threshold) == unhealthy. Health is exposed through `/health` HTTP endpoint.
5108ac9
to
77c8e17
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In your implementation, two different things contribute to bad health:
- Problems receiving events from the external source
- Events stuck in the app, i.e. a processing problem or sink problem.
For 1, it completely makes sense to implement it in the source.
But for 2... isn't this a helpful health check for all sources? You could implement 2 exactly the same in the pubsub source and kafka source. But then you would be repeating the same code in all sources.
In other words, is there any part of this that can be moved out of the source?
I appreciate this is a difficult problem! Because I have struggled with these same questions in common-streams.
@@ -340,3 +342,18 @@ func exitWithError(err error, flushSentry bool) { | |||
} | |||
os.Exit(1) | |||
} | |||
|
|||
func runHealthServer(source sourceiface.Source) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sharing this just for interest:
In common-streams the health probe actually responds to all requests with a ok
. Not just requests to the /health
endpoint.
That might be a mistake in common-streams: time will tell! I did it because it felt strange to hard-code the completely arbitrary string /health
.
|
||
log *log.Entry | ||
statsReceiver *kinsumerActivityRecorder | ||
unackedMsgs map[string]int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider making this map[uuid.UUID]int64
? It looks like the keys are always stringified UUIDs.
checkpointer() | ||
ks.removeUnacked(randomUUID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How deeply to you understand what checkpointer()
does? Does it block until this record is actually checkpointed to dynamodb? Or does it return immediately, so kinsumer can checkpoint it later?
From conversations I've had with others, I think it might be the latter.
If it fails to checkpoint later, then what does kinsumer do next? Does it stop calling the EventsFromKinesis
function?
Does any of this matter? I think possibly no.... because I think your health checkpoint endpoint works correctly anyway. But it's worth thinking about.
PDP-1557
For now the only source of 'health' is a source, e.g. in this draft for kinesis:
Health is exposed through
/health
HTTP endpoint.