Skip to content

Commit

Permalink
Merge pull request #212 from apkar/conn-fixes
Browse files Browse the repository at this point in the history
Misc fixes
  • Loading branch information
apkar authored Sep 5, 2019
2 parents ce9be68 + ebdb4c1 commit 04bb740
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/Constants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const char* INDEX_STATUS_BUILDING = "building";
const char* INDEX_STATUS_ERROR = "error";

const char* MT_GUAGE_ACTIVE_CONNECTIONS = "dl_active_connections";
const char* MT_RATE_NEW_CONNECTIONS = "dl_new_connections";
const char* MT_GUAGE_ACTIVE_CURSORS = "dl_active_cursors";
const char* MT_HIST_MESSAGE_SZ = "dl_message_size_bytes";
const char* MT_TIME_QUERY_LATENCY_US = "dl_query_latency_useconds";
Expand Down
1 change: 1 addition & 0 deletions src/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ extern const char* INDEX_STATUS_ERROR;

// Metrics
extern const char* MT_GUAGE_ACTIVE_CONNECTIONS;
extern const char* MT_RATE_NEW_CONNECTIONS;
extern const char* MT_GUAGE_ACTIVE_CURSORS;
extern const char* MT_HIST_MESSAGE_SZ;
extern const char* MT_TIME_QUERY_LATENCY_US;
Expand Down
55 changes: 31 additions & 24 deletions src/DocLayer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,30 @@ ACTOR Future<Void> housekeeping(Reference<ExtConnection> ec) {
}
}

ACTOR Future<int32_t> processMessage(Reference<ExtConnection> ec, Promise<Void> finished) {
wait(ec->bc->onBytesAvailable(sizeof(ExtMsgHeader)));
auto headerBytes = ec->bc->peekExact(sizeof(ExtMsgHeader));

state ExtMsgHeader* header = (ExtMsgHeader*)headerBytes.begin();

wait(ec->bc->onBytesAvailable(header->messageLength));
auto messageBytes = ec->bc->peekExact(header->messageLength);

DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_MESSAGE_SZ, header->messageLength);

/* We don't use hdr in this call because the second peek may
have triggered a copy that the first did not, but it's nice
for everything at and below processRequest to assume that
body - header == sizeof(ExtMsgHeader) */
ec->updateMaxReceivedRequestID(header->requestID);
wait(
processRequest(ec, (ExtMsgHeader*)messageBytes.begin(), messageBytes.begin() + sizeof(ExtMsgHeader), finished));

ec->bc->advance(header->messageLength);

return header->messageLength;
}

ACTOR Future<Void> extServerConnection(Reference<DocumentLayer> docLayer,
Reference<BufferedConnection> bc,
int64_t connectionId) {
Expand All @@ -207,6 +231,8 @@ ACTOR Future<Void> extServerConnection(Reference<DocumentLayer> docLayer,

DocumentLayer::metricReporter->captureGauge(DocLayerConstants::MT_GUAGE_ACTIVE_CONNECTIONS,
++docLayer->nrConnections);
DocumentLayer::metricReporter->captureMeter(DocLayerConstants::MT_RATE_NEW_CONNECTIONS, 1);

try {
try {
loop {
Expand All @@ -219,29 +245,8 @@ ACTOR Future<Void> extServerConnection(Reference<DocumentLayer> docLayer,
TraceEvent("BD_serverClosedConnection").detail("connId", connectionId);
throw connection_failed();
}
when(wait(ec->bc->onBytesAvailable(sizeof(ExtMsgHeader)))) {
auto headerBytes = ec->bc->peekExact(sizeof(ExtMsgHeader));

state ExtMsgHeader* header = (ExtMsgHeader*)headerBytes.begin();

// FIXME: Check for unreasonable lengths

wait(ec->bc->onBytesAvailable(header->messageLength));
auto messageBytes = ec->bc->peekExact(header->messageLength);

DocumentLayer::metricReporter->captureHistogram(DocLayerConstants::MT_HIST_MESSAGE_SZ,
header->messageLength);

/* We don't use hdr in this call because the second peek may
have triggered a copy that the first did not, but it's nice
for everything at and below processRequest to assume that
body - header == sizeof(ExtMsgHeader) */
ec->updateMaxReceivedRequestID(header->requestID);
wait(processRequest(ec, (ExtMsgHeader*)messageBytes.begin(),
messageBytes.begin() + sizeof(ExtMsgHeader), finished));

ec->bc->advance(header->messageLength);
msg_size_inuse.send(std::make_pair(header->messageLength, finished.getFuture()));
when(int32_t messageLength = wait(processMessage(ec, finished))) {
msg_size_inuse.send(std::make_pair(messageLength, finished.getFuture()));
}
}
}
Expand Down Expand Up @@ -464,7 +469,9 @@ ACTOR void publishProcessMetrics() {
TraceEvent("BD_processMetricsPublisher");
try {
loop {
wait(delay(5.0));
// Update metrics at high priority.
wait(delay(5.0, TaskMaxPriority));

auto processMetrics = latestEventCache.get("ProcessMetrics");
double processMetricsElapsed = processMetrics.getDouble("Elapsed");
double cpuSeconds = processMetrics.getDouble("CPUSeconds");
Expand Down
5 changes: 5 additions & 0 deletions src/IMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class IMetricReporter {
IMetricReporter() = delete;
virtual ~IMetricReporter() = default;

/**
* Its very important implementation of this function is fast. For example, if the plugin
* is publishing to remote endpoint it shouldn't happen part of this call. Publishing
* should happen aynchronous to this call.
*/
virtual void captureMetric(const char* metricName, int64_t metricValue, IMetricType metricType) = 0;

void captureCount(const char* metricName);
Expand Down

0 comments on commit 04bb740

Please sign in to comment.