Skip to content

Commit 7038b02

Browse files
dmehaladgoffredo
andauthored
Set default timeout on curl handles (#66)
Co-authored-by: David Goffredo <david.goffredo@datadoghq.com>
1 parent f9e4217 commit 7038b02

20 files changed

+291
-166
lines changed

src/datadog/curl.cpp

Lines changed: 80 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <unordered_map>
1414
#include <unordered_set>
1515

16+
#include "clock.h"
1617
#include "dict_reader.h"
1718
#include "dict_writer.h"
1819
#include "http_client.h"
@@ -95,6 +96,10 @@ CURLcode CurlLibrary::easy_setopt_writefunction(CURL *handle,
9596
return curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, on_write);
9697
}
9798

99+
CURLcode CurlLibrary::easy_setopt_timeout_ms(CURL *handle, long timeout_ms) {
100+
return curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, timeout_ms);
101+
}
102+
98103
const char *CurlLibrary::easy_strerror(CURLcode error) {
99104
return curl_easy_strerror(error);
100105
}
@@ -162,6 +167,7 @@ class CurlImpl {
162167
std::mutex mutex_;
163168
CurlLibrary &curl_;
164169
const std::shared_ptr<Logger> logger_;
170+
Clock clock_;
165171
CURLM *multi_handle_;
166172
std::unordered_set<CURL *> request_handles_;
167173
std::list<CURL *> new_handles_;
@@ -179,6 +185,7 @@ class CurlImpl {
179185
char error_buffer[CURL_ERROR_SIZE] = "";
180186
std::unordered_map<std::string, std::string> response_headers_lower;
181187
std::string response_body;
188+
std::chrono::steady_clock::time_point deadline;
182189

183190
~Request();
184191
};
@@ -221,13 +228,14 @@ class CurlImpl {
221228
static StringView trim(StringView);
222229

223230
public:
224-
explicit CurlImpl(const std::shared_ptr<Logger> &, CurlLibrary &,
225-
const Curl::ThreadGenerator &);
231+
explicit CurlImpl(const std::shared_ptr<Logger> &, const Clock &,
232+
CurlLibrary &, const Curl::ThreadGenerator &);
226233
~CurlImpl();
227234

228235
Expected<void> post(const URL &url, HeadersSetter set_headers,
229236
std::string body, ResponseHandler on_response,
230-
ErrorHandler on_error);
237+
ErrorHandler on_error,
238+
std::chrono::steady_clock::time_point deadline);
231239

232240
void drain(std::chrono::steady_clock::time_point deadline);
233241
};
@@ -242,22 +250,25 @@ void throw_on_error(CURLcode result) {
242250

243251
} // namespace
244252

245-
Curl::Curl(const std::shared_ptr<Logger> &logger) : Curl(logger, libcurl) {}
253+
Curl::Curl(const std::shared_ptr<Logger> &logger, const Clock &clock)
254+
: Curl(logger, clock, libcurl) {}
246255

247-
Curl::Curl(const std::shared_ptr<Logger> &logger, CurlLibrary &curl)
248-
: Curl(logger, curl,
256+
Curl::Curl(const std::shared_ptr<Logger> &logger, const Clock &clock,
257+
CurlLibrary &curl)
258+
: Curl(logger, clock, curl,
249259
[](auto &&func) { return std::thread(std::move(func)); }) {}
250260

251-
Curl::Curl(const std::shared_ptr<Logger> &logger, CurlLibrary &curl,
252-
const Curl::ThreadGenerator &make_thread)
253-
: impl_(new CurlImpl{logger, curl, make_thread}) {}
261+
Curl::Curl(const std::shared_ptr<Logger> &logger, const Clock &clock,
262+
CurlLibrary &curl, const Curl::ThreadGenerator &make_thread)
263+
: impl_(new CurlImpl{logger, clock, curl, make_thread}) {}
254264

255265
Curl::~Curl() { delete impl_; }
256266

257267
Expected<void> Curl::post(const URL &url, HeadersSetter set_headers,
258268
std::string body, ResponseHandler on_response,
259-
ErrorHandler on_error) {
260-
return impl_->post(url, set_headers, body, on_response, on_error);
269+
ErrorHandler on_error,
270+
std::chrono::steady_clock::time_point deadline) {
271+
return impl_->post(url, set_headers, body, on_response, on_error, deadline);
261272
}
262273

263274
void Curl::drain(std::chrono::steady_clock::time_point deadline) {
@@ -268,10 +279,11 @@ nlohmann::json Curl::config_json() const {
268279
return nlohmann::json::object({{"type", "datadog::tracing::Curl"}});
269280
}
270281

271-
CurlImpl::CurlImpl(const std::shared_ptr<Logger> &logger, CurlLibrary &curl,
272-
const Curl::ThreadGenerator &make_thread)
282+
CurlImpl::CurlImpl(const std::shared_ptr<Logger> &logger, const Clock &clock,
283+
CurlLibrary &curl, const Curl::ThreadGenerator &make_thread)
273284
: curl_(curl),
274285
logger_(logger),
286+
clock_(clock),
275287
shutting_down_(false),
276288
num_active_handles_(0) {
277289
curl_.global_init(CURL_GLOBAL_ALL);
@@ -311,24 +323,35 @@ CurlImpl::~CurlImpl() {
311323
}
312324
log_on_error(curl_.multi_wakeup(multi_handle_));
313325
event_loop_.join();
326+
327+
log_on_error(curl_.multi_cleanup(multi_handle_));
328+
curl_.global_cleanup();
314329
}
315330

316-
Expected<void> CurlImpl::post(const HTTPClient::URL &url,
317-
HeadersSetter set_headers, std::string body,
318-
ResponseHandler on_response,
319-
ErrorHandler on_error) try {
331+
Expected<void> CurlImpl::post(
332+
const HTTPClient::URL &url, HeadersSetter set_headers, std::string body,
333+
ResponseHandler on_response, ErrorHandler on_error,
334+
std::chrono::steady_clock::time_point deadline) try {
320335
if (multi_handle_ == nullptr) {
321336
return Error{Error::CURL_HTTP_CLIENT_NOT_RUNNING,
322337
"Unable to send request via libcurl because the HTTP client "
323338
"failed to start."};
324339
}
325340

341+
HeaderWriter writer{curl_};
342+
set_headers(writer);
343+
auto cleanup_list = [&](auto list) { curl_.slist_free_all(list); };
344+
std::unique_ptr<curl_slist, decltype(cleanup_list)> headers{
345+
writer.release(), std::move(cleanup_list)};
346+
326347
auto request = std::make_unique<Request>();
327348

328349
request->curl = &curl_;
350+
request->request_headers = headers.get();
329351
request->request_body = std::move(body);
330352
request->on_response = std::move(on_response);
331353
request->on_error = std::move(on_error);
354+
request->deadline = std::move(deadline);
332355

333356
auto cleanup_handle = [&](auto handle) { curl_.easy_cleanup(handle); };
334357
std::unique_ptr<CURL, decltype(cleanup_handle)> handle{
@@ -339,6 +362,8 @@ Expected<void> CurlImpl::post(const HTTPClient::URL &url,
339362
"unable to initialize a curl handle for request sending"};
340363
}
341364

365+
throw_on_error(
366+
curl_.easy_setopt_httpheader(handle.get(), request->request_headers));
342367
throw_on_error(curl_.easy_setopt_private(handle.get(), request.get()));
343368
throw_on_error(
344369
curl_.easy_setopt_errorbuffer(handle.get(), request->error_buffer));
@@ -365,25 +390,17 @@ Expected<void> CurlImpl::post(const HTTPClient::URL &url,
365390
handle.get(), (url.scheme + "://" + url.authority + url.path).c_str()));
366391
}
367392

368-
HeaderWriter writer{curl_};
369-
set_headers(writer);
370-
auto cleanup_list = [&](auto list) { curl_.slist_free_all(list); };
371-
std::unique_ptr<curl_slist, decltype(cleanup_list)> headers{
372-
writer.release(), std::move(cleanup_list)};
373-
request->request_headers = headers.get();
374-
throw_on_error(
375-
curl_.easy_setopt_httpheader(handle.get(), request->request_headers));
376-
377393
std::list<CURL *> node;
378394
node.push_back(handle.get());
379395
{
380396
std::lock_guard<std::mutex> lock(mutex_);
381397
new_handles_.splice(new_handles_.end(), node);
382398

383-
headers.release();
384-
handle.release();
385-
request.release();
399+
(void)headers.release();
400+
(void)handle.release();
401+
(void)request.release();
386402
}
403+
387404
log_on_error(curl_.multi_wakeup(multi_handle_));
388405

389406
return nullopt;
@@ -464,6 +481,7 @@ CURLMcode CurlImpl::log_on_error(CURLMcode result) {
464481
void CurlImpl::run() {
465482
int num_messages_remaining;
466483
CURLMsg *message;
484+
const int max_wait_milliseconds = 10000;
467485
std::unique_lock<std::mutex> lock(mutex_);
468486

469487
for (;;) {
@@ -478,16 +496,46 @@ void CurlImpl::run() {
478496
&num_messages_remaining))) {
479497
handle_message(*message, lock);
480498
}
481-
482-
const int max_wait_milliseconds = 10 * 1000;
483499
lock.unlock();
484500
log_on_error(curl_.multi_poll(multi_handle_, nullptr, 0,
485501
max_wait_milliseconds, nullptr));
486502
lock.lock();
487503

488504
// New requests might have been added while we were sleeping.
489505
for (; !new_handles_.empty(); new_handles_.pop_front()) {
490-
CURL *const handle = new_handles_.front();
506+
CURL *handle = new_handles_.front();
507+
char *user_data;
508+
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) !=
509+
CURLE_OK) {
510+
curl_.easy_cleanup(handle);
511+
continue;
512+
}
513+
514+
auto *request = reinterpret_cast<Request *>(user_data);
515+
const auto timeout = request->deadline - clock_().tick;
516+
if (timeout <= std::chrono::steady_clock::time_point::duration::zero()) {
517+
std::string message;
518+
message +=
519+
"Request deadline exceeded before request was even added to "
520+
"libcurl "
521+
"event loop. Deadline was ";
522+
message += std::to_string(
523+
-std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
524+
.count());
525+
message += " nanoseconds ago.";
526+
request->on_error(
527+
Error{Error::CURL_DEADLINE_EXCEEDED_BEFORE_REQUEST_START,
528+
std::move(message)});
529+
530+
curl_.easy_cleanup(handle);
531+
delete request;
532+
533+
continue;
534+
}
535+
536+
log_on_error(curl_.easy_setopt_timeout_ms(
537+
handle, std::chrono::duration_cast<std::chrono::milliseconds>(timeout)
538+
.count()));
491539
log_on_error(curl_.multi_add_handle(multi_handle_, handle));
492540
request_handles_.insert(handle);
493541
}
@@ -510,8 +558,6 @@ void CurlImpl::run() {
510558
}
511559

512560
request_handles_.clear();
513-
log_on_error(curl_.multi_cleanup(multi_handle_));
514-
curl_.global_cleanup();
515561
}
516562

517563
void CurlImpl::handle_message(const CURLMsg &message,

src/datadog/curl.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <string>
1818
#include <thread>
1919

20+
#include "clock.h"
2021
#include "http_client.h"
2122
#include "json_fwd.hpp"
2223

@@ -59,6 +60,7 @@ class CurlLibrary {
5960
virtual CURLcode easy_setopt_url(CURL *handle, const char *url);
6061
virtual CURLcode easy_setopt_writedata(CURL *handle, void *data);
6162
virtual CURLcode easy_setopt_writefunction(CURL *handle, WriteCallback);
63+
virtual CURLcode easy_setopt_timeout_ms(CURL *handle, long timeout_ms);
6264
virtual const char *easy_strerror(CURLcode error);
6365
virtual void global_cleanup();
6466
virtual CURLcode global_init(long flags);
@@ -86,16 +88,18 @@ class Curl : public HTTPClient {
8688
public:
8789
using ThreadGenerator = std::function<std::thread(std::function<void()> &&)>;
8890

89-
explicit Curl(const std::shared_ptr<Logger> &);
90-
Curl(const std::shared_ptr<Logger> &, CurlLibrary &);
91-
Curl(const std::shared_ptr<Logger> &, CurlLibrary &, const ThreadGenerator &);
91+
explicit Curl(const std::shared_ptr<Logger> &, const Clock &);
92+
Curl(const std::shared_ptr<Logger> &, const Clock &, CurlLibrary &);
93+
Curl(const std::shared_ptr<Logger> &, const Clock &, CurlLibrary &,
94+
const ThreadGenerator &);
9295
~Curl();
9396

9497
Curl(const Curl &) = delete;
9598

9699
Expected<void> post(const URL &url, HeadersSetter set_headers,
97100
std::string body, ResponseHandler on_response,
98-
ErrorHandler on_error) override;
101+
ErrorHandler on_error,
102+
std::chrono::steady_clock::time_point deadline) override;
99103

100104
void drain(std::chrono::steady_clock::time_point deadline) override;
101105

src/datadog/datadog_agent.cpp

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -134,17 +134,19 @@ std::variant<CollectorResponse, std::string> parse_agent_traces_response(
134134
DatadogAgent::DatadogAgent(
135135
const FinalizedDatadogAgentConfig& config,
136136
const std::shared_ptr<TracerTelemetry>& tracer_telemetry,
137-
const Clock& clock, const std::shared_ptr<Logger>& logger)
137+
const std::shared_ptr<Logger>& logger)
138138
: tracer_telemetry_(tracer_telemetry),
139-
clock_(clock),
139+
clock_(config.clock),
140140
logger_(logger),
141141
traces_endpoint_(traces_endpoint(config.url)),
142142
telemetry_endpoint_(telemetry_endpoint(config.url)),
143143
http_client_(config.http_client),
144144
event_scheduler_(config.event_scheduler),
145145
cancel_scheduled_flush_(event_scheduler_->schedule_recurring_event(
146146
config.flush_interval, [this]() { flush(); })),
147-
flush_interval_(config.flush_interval) {
147+
flush_interval_(config.flush_interval),
148+
request_timeout_(config.request_timeout),
149+
shutdown_timeout_(config.shutdown_timeout) {
148150
assert(logger_);
149151
assert(tracer_telemetry_);
150152
if (tracer_telemetry_->enabled()) {
@@ -185,7 +187,7 @@ DatadogAgent::DatadogAgent(
185187
}
186188

187189
DatadogAgent::~DatadogAgent() {
188-
const auto deadline = clock_().tick + std::chrono::seconds(2);
190+
const auto deadline = clock_().tick + shutdown_timeout_;
189191
cancel_scheduled_flush_();
190192
flush();
191193
if (tracer_telemetry_->enabled()) {
@@ -208,17 +210,15 @@ Expected<void> DatadogAgent::send(
208210
}
209211

210212
nlohmann::json DatadogAgent::config_json() const {
211-
const auto flush_interval_milliseconds =
212-
std::chrono::duration_cast<std::chrono::milliseconds>(flush_interval_)
213-
.count();
214-
215213
// clang-format off
216214
return nlohmann::json::object({
217215
{"type", "datadog::tracing::DatadogAgent"},
218216
{"config", nlohmann::json::object({
219217
{"traces_url", (traces_endpoint_.scheme + "://" + traces_endpoint_.authority + traces_endpoint_.path)},
220218
{"telemetry_url", (telemetry_endpoint_.scheme + "://" + telemetry_endpoint_.authority + telemetry_endpoint_.path)},
221-
{"flush_interval_milliseconds", flush_interval_milliseconds},
219+
{"flush_interval_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(flush_interval_).count() },
220+
{"request_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(request_timeout_).count() },
221+
{"shutdown_timeout_milliseconds", std::chrono::duration_cast<std::chrono::milliseconds>(shutdown_timeout_).count() },
222222
{"http_client", http_client_->config_json()},
223223
{"event_scheduler", event_scheduler_->config_json()},
224224
})},
@@ -324,9 +324,10 @@ void DatadogAgent::flush() {
324324
};
325325

326326
tracer_telemetry_->metrics().trace_api.requests.inc();
327-
auto post_result = http_client_->post(
328-
traces_endpoint_, std::move(set_request_headers), std::move(body),
329-
std::move(on_response), std::move(on_error));
327+
auto post_result =
328+
http_client_->post(traces_endpoint_, std::move(set_request_headers),
329+
std::move(body), std::move(on_response),
330+
std::move(on_error), clock_().tick + request_timeout_);
330331
if (auto* error = post_result.if_error()) {
331332
logger_->log_error(
332333
error->with_prefix("Unexpected error submitting traces: "));
@@ -335,9 +336,10 @@ void DatadogAgent::flush() {
335336

336337
void DatadogAgent::send_app_started() {
337338
auto payload = tracer_telemetry_->app_started();
338-
auto post_result = http_client_->post(
339-
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
340-
telemetry_on_response_, telemetry_on_error_);
339+
auto post_result =
340+
http_client_->post(telemetry_endpoint_, telemetry_set_request_headers_,
341+
std::move(payload), telemetry_on_response_,
342+
telemetry_on_error_, clock_().tick + request_timeout_);
341343
if (auto* error = post_result.if_error()) {
342344
logger_->log_error(error->with_prefix(
343345
"Unexpected error submitting telemetry app-started event: "));
@@ -346,9 +348,10 @@ void DatadogAgent::send_app_started() {
346348

347349
void DatadogAgent::send_heartbeat_and_telemetry() {
348350
auto payload = tracer_telemetry_->heartbeat_and_telemetry();
349-
auto post_result = http_client_->post(
350-
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
351-
telemetry_on_response_, telemetry_on_error_);
351+
auto post_result =
352+
http_client_->post(telemetry_endpoint_, telemetry_set_request_headers_,
353+
std::move(payload), telemetry_on_response_,
354+
telemetry_on_error_, clock_().tick + request_timeout_);
352355
if (auto* error = post_result.if_error()) {
353356
logger_->log_error(error->with_prefix(
354357
"Unexpected error submitting telemetry app-heartbeat event: "));
@@ -357,9 +360,10 @@ void DatadogAgent::send_heartbeat_and_telemetry() {
357360

358361
void DatadogAgent::send_app_closing() {
359362
auto payload = tracer_telemetry_->app_closing();
360-
auto post_result = http_client_->post(
361-
telemetry_endpoint_, telemetry_set_request_headers_, std::move(payload),
362-
telemetry_on_response_, telemetry_on_error_);
363+
auto post_result =
364+
http_client_->post(telemetry_endpoint_, telemetry_set_request_headers_,
365+
std::move(payload), telemetry_on_response_,
366+
telemetry_on_error_, clock_().tick + request_timeout_);
363367
if (auto* error = post_result.if_error()) {
364368
logger_->log_error(error->with_prefix(
365369
"Unexpected error submitting telemetry app-closing event: "));

0 commit comments

Comments
 (0)