Skip to content

Commit 833d499

Browse files
committed
Time out in-progress transfers
If an entire part is a single libcurl operation, a client that starts writing - but then leaves for an extended period of time - will leave dangling references inside libcurl (eventually exhausting the number of allowable transfers). This commit adds a background thread for S3File that will go through all pending uploads and check to see if they are still live; if not, then it'll timeout the operation.
1 parent 0d77e1e commit 833d499

File tree

5 files changed

+252
-7
lines changed

5 files changed

+252
-7
lines changed

src/HTTPCommands.cc

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ std::shared_ptr<HandlerQueue> HTTPRequest::m_queue =
4343
std::make_unique<HandlerQueue>();
4444
bool HTTPRequest::m_workers_initialized = false;
4545
std::vector<CurlWorker *> HTTPRequest::m_workers;
46+
std::chrono::steady_clock::duration HTTPRequest::m_timeout_duration =
47+
std::chrono::seconds(10);
4648

4749
namespace {
4850

@@ -235,6 +237,12 @@ size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) {
235237
// been sent.
236238
HTTPRequest::Payload *payload = (HTTPRequest::Payload *)v;
237239

240+
if (payload->m_parent.Timeout()) {
241+
payload->m_parent.errorCode = "E_TIMEOUT";
242+
payload->m_parent.errorMessage = "Upload operation timed out";
243+
return CURL_READFUNC_ABORT;
244+
}
245+
238246
if (payload->sentSoFar == static_cast<off_t>(payload->data.size())) {
239247
payload->sentSoFar = 0;
240248
if (payload->final) {
@@ -269,6 +277,16 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri,
269277
if (!m_is_streaming && !final) {
270278
m_is_streaming = true;
271279
}
280+
if (m_timeout) {
281+
errorCode = "E_TIMEOUT";
282+
errorMessage = "Transfer has timed out due to inactivity.";
283+
return false;
284+
}
285+
if (!errorCode.empty()) {
286+
return false;
287+
}
288+
289+
m_last_request = std::chrono::steady_clock::now();
272290
m_final = final;
273291
// Detect whether we were given an undersized buffer in non-streaming mode
274292
if (!m_is_streaming && payload_size &&
@@ -294,6 +312,27 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri,
294312
return errorCode.empty();
295313
}
296314

315+
void HTTPRequest::Tick(std::chrono::steady_clock::time_point now) {
316+
if (!m_is_streaming) {
317+
return;
318+
}
319+
if (now - m_last_request <= m_timeout_duration) {
320+
return;
321+
}
322+
323+
if (m_timeout) {
324+
return;
325+
}
326+
m_timeout = true;
327+
328+
if (m_unpause_queue) {
329+
std::unique_lock<std::mutex> lk(m_mtx);
330+
m_result_ready = false;
331+
m_unpause_queue->Produce(this);
332+
m_cv.wait(lk, [&] { return m_result_ready; });
333+
}
334+
}
335+
297336
bool HTTPRequest::ReleaseHandle(CURL *curl) {
298337
m_curl_handle = nullptr;
299338

@@ -604,11 +643,13 @@ HTTPRequest::CurlResult HTTPRequest::ProcessCurlResult(CURL *curl,
604643
auto unique = std::unique_ptr<void, decltype(cleaner)>((void *)1, cleaner);
605644

606645
if (rv != 0) {
607-
errorCode = "E_CURL_IO";
608-
std::ostringstream error;
609-
error << "curl failed (" << rv << "): '" << curl_easy_strerror(rv)
610-
<< "'.";
611-
errorMessage = error.str();
646+
if (errorCode.empty()) {
647+
errorCode = "E_CURL_IO";
648+
std::ostringstream error;
649+
error << "curl failed (" << rv << "): '" << curl_easy_strerror(rv)
650+
<< "'.";
651+
errorMessage = error.str();
652+
}
612653

613654
return CurlResult::Fail;
614655
}

src/HTTPCommands.hh

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,20 @@ class HTTPRequest {
8181
// context.
8282
static void Init(XrdSysError &);
8383

84+
// Perform maintenance of the request.
85+
void Tick(std::chrono::steady_clock::time_point);
86+
87+
// Sets the duration after which an in-progress operation may be considered
88+
// stalled and hence timeout.
89+
static void SetStallTimeout(std::chrono::steady_clock::duration timeout) {
90+
m_timeout_duration = timeout;
91+
}
92+
93+
// Return the stall timeout duration currently in use.
94+
static std::chrono::steady_clock::duration GetStallTimeout() {
95+
return m_timeout_duration;
96+
}
97+
8498
protected:
8599
// Send the request to the HTTP server.
86100
// Blocks until the request has completed.
@@ -100,7 +114,6 @@ class HTTPRequest {
100114
const std::string_view payload, off_t payload_size,
101115
bool final);
102116

103-
104117
// Called by the curl handler thread that the request has been finished.
105118
virtual void Notify();
106119

@@ -118,6 +131,10 @@ class HTTPRequest {
118131
m_unpause_queue = queue;
119132
}
120133

134+
// Return whether or not the request has timed out since the last
135+
// call to send more data.
136+
bool Timeout() const { return m_timeout; }
137+
121138
typedef std::map<std::string, std::string> AttributeValueMap;
122139
AttributeValueMap query_parameters;
123140
AttributeValueMap headers;
@@ -190,6 +207,7 @@ class HTTPRequest {
190207
// call of the overall HTTPRequest
191208
bool m_is_streaming{
192209
false}; // Flag indicating this command is a streaming request.
210+
bool m_timeout{false}; // Flag indicating the request has timed out.
193211
bool m_result_ready{false}; // Flag indicating the results data is ready.
194212
off_t m_payload_size{0}; // Size of the entire upload payload; 0 if unknown.
195213
std::string m_protocol;
@@ -198,6 +216,15 @@ class HTTPRequest {
198216
CURL *m_curl_handle{nullptr}; // The curl handle for the ongoing request
199217
char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl
200218
unsigned m_retry_count{0};
219+
220+
// Time when the last request was sent on this object; used to determine
221+
// whether the operation has timed out.
222+
std::chrono::steady_clock::time_point m_last_request{
223+
std::chrono::steady_clock::now()};
224+
225+
// Duration after which a partially-completed request will timeout if
226+
// no progress has been made.
227+
static std::chrono::steady_clock::duration m_timeout_duration;
201228
};
202229

203230
class HTTPUpload : public HTTPRequest {

src/S3File.cc

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,18 @@
3232

3333
#include <curl/curl.h>
3434

35+
#include <algorithm>
3536
#include <charconv>
3637
#include <filesystem>
38+
#include <iostream>
3739
#include <map>
3840
#include <memory>
3941
#include <mutex>
4042
#include <sstream>
4143
#include <stdlib.h>
4244
#include <string>
4345
#include <string_view>
46+
#include <thread>
4447
#include <vector>
4548

4649
using namespace XrdHTTPServer;
@@ -49,6 +52,12 @@ S3FileSystem *g_s3_oss = nullptr;
4952

5053
XrdVERSIONINFO(XrdOssGetFileSystem, S3);
5154

55+
std::vector<std::pair<std::weak_ptr<std::mutex>,
56+
std::weak_ptr<AmazonS3SendMultipartPart>>>
57+
S3File::m_pending_ops;
58+
std::mutex S3File::m_pending_lk;
59+
std::once_flag S3File::m_monitor_launch;
60+
5261
S3File::S3File(XrdSysError &log, S3FileSystem *oss)
5362
: m_log(log), m_oss(oss), content_length(0), last_modified(0),
5463
partNumber(1) {}
@@ -61,6 +70,9 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
6170
if (Oflag & O_APPEND) {
6271
m_log.Log(LogMask::Info, "Open", "File opened for append:", path);
6372
}
73+
if (Oflag & (O_RDWR | O_WRONLY)) {
74+
m_write_lk.reset(new std::mutex);
75+
}
6476

6577
char *asize_char;
6678
if ((asize_char = env.Get("oss.asize"))) {
@@ -206,16 +218,29 @@ int S3File::Fstat(struct stat *buff) {
206218
}
207219

208220
ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) {
221+
auto write_mutex = m_write_lk;
222+
if (!write_mutex) {
223+
return -EBADF;
224+
}
225+
std::lock_guard lk(*write_mutex);
226+
209227
if (offset != m_write_offset) {
210228
m_log.Emsg(
211229
"Write",
212230
"Out-of-order write detected; S3 requires writes to be in order");
231+
m_write_offset = -1;
232+
return -EIO;
233+
}
234+
if (m_write_offset == -1) {
235+
// Previous I/O error has occurred. File is in bad state, immediately
236+
// fail.
213237
return -EIO;
214238
}
215239
if (uploadId == "") {
216240
AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log);
217241
if (!startUpload.SendRequest()) {
218242
m_log.Emsg("Write", "S3 multipart request failed");
243+
m_write_offset = -1;
219244
return -ENOENT;
220245
}
221246
std::string errMsg;
@@ -240,6 +265,10 @@ ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) {
240265
}
241266

242267
m_write_op.reset(new AmazonS3SendMultipartPart(m_ai, m_object, m_log));
268+
{
269+
std::lock_guard lk(m_pending_lk);
270+
m_pending_ops.emplace_back(m_write_lk, m_write_op);
271+
}
243272

244273
// Calculate the size of the current chunk, if it's known.
245274
m_part_size = m_s3_part_size;
@@ -271,8 +300,15 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) {
271300
if (!m_write_op->SendRequest(
272301
std::string_view(static_cast<const char *>(buffer), write_size),
273302
std::to_string(partNumber), uploadId, m_object_size, is_final)) {
303+
m_write_offset = -1;
304+
if (m_write_op->getErrorCode() == "E_TIMEOUT") {
305+
m_log.Emsg("Write", "Timeout when uploading to S3");
306+
m_write_op.reset();
307+
return -ETIMEDOUT;
308+
}
274309
m_log.Emsg("Write", "Upload to S3 failed: ",
275310
m_write_op->getErrorMessage().c_str());
311+
m_write_op.reset();
276312
return -EIO;
277313
}
278314
if (is_final) {
@@ -283,13 +319,17 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) {
283319
if (startPos == std::string::npos) {
284320
m_log.Emsg("Write", "Result from S3 does not include ETag:",
285321
resultString.c_str());
322+
m_write_op.reset();
323+
m_write_offset = -1;
286324
return -EIO;
287325
}
288326
std::size_t endPos = resultString.find("\"", startPos + 7);
289327
if (startPos == std::string::npos) {
290328
m_log.Emsg("Write",
291329
"Result from S3 does not include ETag end-character:",
292330
resultString.c_str());
331+
m_write_op.reset();
332+
m_write_offset = -1;
293333
return -EIO;
294334
}
295335
eTags.push_back(
@@ -301,6 +341,67 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) {
301341
return write_size;
302342
}
303343

344+
void S3File::LaunchMonitorThread() {
345+
std::call_once(m_monitor_launch, [] {
346+
std::thread t(S3File::CleanupTransfers);
347+
t.detach();
348+
});
349+
}
350+
351+
void S3File::CleanupTransfers() {
352+
while (true) {
353+
std::this_thread::sleep_for(HTTPRequest::GetStallTimeout() / 3);
354+
try {
355+
CleanupTransfersOnce();
356+
} catch (std::exception &exc) {
357+
std::cerr << "Warning: caught unexpected exception when trying to "
358+
"clean transfers: "
359+
<< exc.what() << std::endl;
360+
}
361+
}
362+
}
363+
364+
void S3File::CleanupTransfersOnce() {
365+
// Make a list of live transfers; erase any dead ones still on the list.
366+
std::vector<std::pair<std::shared_ptr<std::mutex>,
367+
std::shared_ptr<AmazonS3SendMultipartPart>>>
368+
existing_ops;
369+
{
370+
std::lock_guard lk(m_pending_lk);
371+
existing_ops.reserve(m_pending_ops.size());
372+
m_pending_ops.erase(
373+
std::remove_if(m_pending_ops.begin(), m_pending_ops.end(),
374+
[&](const auto &op) -> bool {
375+
auto op_lk = op.first.lock();
376+
if (!op_lk) {
377+
// In this case, the S3File is no longer open
378+
// for write. No need to potentially clean
379+
// up the transfer.
380+
return true;
381+
}
382+
auto op_part = op.second.lock();
383+
if (!op_part) {
384+
// In this case, the S3File object is still
385+
// open for writes but the upload has
386+
// completed. Remove from the list.
387+
return true;
388+
}
389+
// The S3File is open and upload is in-progress;
390+
// we'll tick the transfer.
391+
existing_ops.emplace_back(op_lk, op_part);
392+
return false;
393+
}),
394+
m_pending_ops.end());
395+
}
396+
// For each live transfer, call `Tick` to advance the clock and possibly
397+
// time things out.
398+
auto now = std::chrono::steady_clock::now();
399+
for (auto &info : existing_ops) {
400+
std::lock_guard lk(*info.first);
401+
info.second->Tick(now);
402+
}
403+
}
404+
304405
int S3File::Close(long long *retsz) {
305406
// If we opened the object in create mode but did not actually write
306407
// anything, make a quick zero-length file.
@@ -315,6 +416,7 @@ int S3File::Close(long long *retsz) {
315416
}
316417
}
317418
if (m_write_op) {
419+
std::lock_guard lk(*m_write_lk);
318420
m_part_size = m_part_written;
319421
auto written = ContinueSendPart(nullptr, 0);
320422
if (written < 0) {
@@ -375,6 +477,7 @@ XrdOss *XrdOssGetStorageSystem2(XrdOss *native_oss, XrdSysLogger *Logger,
375477

376478
envP->Export("XRDXROOTD_NOPOSC", "1");
377479

480+
S3File::LaunchMonitorThread();
378481
try {
379482
AmazonRequest::Init(*log);
380483
g_s3_oss = new S3FileSystem(Logger, config_fn, envP);

0 commit comments

Comments
 (0)