-
Notifications
You must be signed in to change notification settings - Fork 23
Proxy heartbeat causes warning log message #147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments
proxy/pkg/zdmproxy/clusterconn.go
Outdated
@@ -403,10 +420,13 @@ func (cc *ClusterConnector) handleAsyncResponse(response *frame.RawFrame) *frame | |||
return nil | |||
} | |||
|
|||
func (cc *ClusterConnector) sendRequestToCluster(frame *frame.RawFrame) error { | |||
func (cc *ClusterConnector) sendRequestToCluster(frame *frame.RawFrame, source RequestSource) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it make more sense to just have a asssignStreamId bool
parameter instead? Same for func (sip *streamIdProcessor) AssignUniqueId(rawFrame *frame.RawFrame, source RequestSource)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would not func (sip *streamIdProcessor) AssignUniqueId(rawFrame *frame.RawFrame, assignStreamId bool)
look strange? If we would like boolean parameter, what do you think about AssignUniqueId(rawFrame *frame.RawFrame, internalRequest bool)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry assignStreamId
is the wrong name for this bool
, internalRequest
looks good to me.
proxy/pkg/zdmproxy/clusterconn.go
Outdated
@@ -257,7 +266,15 @@ func (cc *ClusterConnector) runResponseListeningLoop() { | |||
for { | |||
response, err := readRawFrame(bufferedReader, connectionAddr, cc.clusterConnContext) | |||
protocolErrResponseFrame, err, errCode := checkProtocolError(response, cc.ccProtoVer, err, protocolErrOccurred, string(cc.connectorType)) | |||
|
|||
if response != nil { | |||
if _, ok := cc.internalRequests[response.Header.StreamId]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused here, the response stream id for "internal requests" will always be -1 here so you can't have more than 1 internal request at the same time.
If you change the stream id so this is the stream id that is assigned then you will have the problem of not knowing whether a response is for an internal request or not.
Do we even need to track these internal requests? Can't we just check that the stream id of the frame is -1 after the call to ReleaseId()
in clusterconn.runResponseListeningLoop()
and if it is then we just don't send it to the response channel? In this case we could just delete internalRequests
map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused here, the response stream id for "internal requests" will always be -1 here so you can't have more than 1 internal request at the same time.
It will actually be the original stream ID from the cluster. Mapping to -1 would happen later at: response, releaseErr = cc.frameProcessor.ReleaseId(response)
.
Do we even need to track these internal requests? Can't we just check that the stream id of the frame is -1 after the call to ReleaseId() in clusterconn.runResponseListeningLoop() and if it is then we just don't send it to the response channel? In this case we could just delete internalRequests map
I think we can do that. All comes to how defensive code should be. Currently, we log a warning for all unrecognised stream IDs:
if reqCtx == nil {
if ch.clientHandlerContext.Err() == nil {
log.Warnf("Could not find request context for stream id %d received from %v. "+
"It either timed out or a protocol error occurred.", streamId, response.connectorType)
}
return
}
After the change, we would ignore all stream IDs with -1. But true, it might be a good alternative, because we will not need to store stream IDs for internal requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think it would reduce complexity to just discard the responses of internal requests, in the future if we need to we can add proper internal request tracking
Updated code as per review comments. |
// below logs would indicate we have issues sending heartbeat messages or handling response | ||
require.NotContains(t, allLogs, "negative stream id") | ||
require.NotContains(t, allLogs, "Could not find request context for stream id") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you're using simulacron it would be better long term to assert using the query logs API from simulacron instead of relying on log messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! just added 1 NIT about the assert on the new integration test but it's not a big deal if you want to keep it this way and go ahead with the merge
Fixes #109.