Skip to content

Commit d9a7249

Browse files
committed
Time out in-progress transfers
If an entire part is a single libcurl operation, a client that starts writing - but then leaves for an extended period of time - will leave dangling references inside libcurl (eventually exhausting the number of allowable transfers). This commit adds a background thread for S3File that will go through all pending uploads and check to see if they are still live; if not, then it'll timeout the operation.
1 parent 0d77e1e commit d9a7249

File tree

5 files changed

+235
-6
lines changed

5 files changed

+235
-6
lines changed

src/HTTPCommands.cc

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ std::shared_ptr<HandlerQueue> HTTPRequest::m_queue =
4343
std::make_unique<HandlerQueue>();
4444
bool HTTPRequest::m_workers_initialized = false;
4545
std::vector<CurlWorker *> HTTPRequest::m_workers;
46+
std::chrono::steady_clock::duration HTTPRequest::m_timeout_duration = std::chrono::seconds(10);
4647

4748
namespace {
4849

@@ -235,6 +236,12 @@ size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) {
235236
// been sent.
236237
HTTPRequest::Payload *payload = (HTTPRequest::Payload *)v;
237238

239+
if (payload->m_parent.Timeout()) {
240+
payload->m_parent.errorCode = "E_TIMEOUT";
241+
payload->m_parent.errorMessage = "Upload operation timed out";
242+
return CURL_READFUNC_ABORT;
243+
}
244+
238245
if (payload->sentSoFar == static_cast<off_t>(payload->data.size())) {
239246
payload->sentSoFar = 0;
240247
if (payload->final) {
@@ -269,6 +276,16 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri,
269276
if (!m_is_streaming && !final) {
270277
m_is_streaming = true;
271278
}
279+
if (m_timeout) {
280+
errorCode = "E_TIMEOUT";
281+
errorMessage = "Transfer has timed out due to inactivity.";
282+
return false;
283+
}
284+
if (!errorCode.empty()) {
285+
return false;
286+
}
287+
288+
m_last_request = std::chrono::steady_clock::now();
272289
m_final = final;
273290
// Detect whether we were given an undersized buffer in non-streaming mode
274291
if (!m_is_streaming && payload_size &&
@@ -294,6 +311,27 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri,
294311
return errorCode.empty();
295312
}
296313

314+
void HTTPRequest::Tick(std::chrono::steady_clock::time_point now) {
315+
if (!m_is_streaming) {
316+
return;
317+
}
318+
if (now - m_last_request <= m_timeout_duration) {
319+
return;
320+
}
321+
322+
if (m_timeout) {
323+
return;
324+
}
325+
m_timeout = true;
326+
327+
if (m_unpause_queue) {
328+
std::unique_lock<std::mutex> lk(m_mtx);
329+
m_result_ready = false;
330+
m_unpause_queue->Produce(this);
331+
m_cv.wait(lk, [&]{return m_result_ready;});
332+
}
333+
}
334+
297335
bool HTTPRequest::ReleaseHandle(CURL *curl) {
298336
m_curl_handle = nullptr;
299337

@@ -604,11 +642,13 @@ HTTPRequest::CurlResult HTTPRequest::ProcessCurlResult(CURL *curl,
604642
auto unique = std::unique_ptr<void, decltype(cleaner)>((void *)1, cleaner);
605643

606644
if (rv != 0) {
607-
errorCode = "E_CURL_IO";
608-
std::ostringstream error;
609-
error << "curl failed (" << rv << "): '" << curl_easy_strerror(rv)
610-
<< "'.";
611-
errorMessage = error.str();
645+
if (errorCode.empty()) {
646+
errorCode = "E_CURL_IO";
647+
std::ostringstream error;
648+
error << "curl failed (" << rv << "): '" << curl_easy_strerror(rv)
649+
<< "'.";
650+
errorMessage = error.str();
651+
}
612652

613653
return CurlResult::Fail;
614654
}

src/HTTPCommands.hh

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,20 @@ class HTTPRequest {
8181
// context.
8282
static void Init(XrdSysError &);
8383

84+
// Perform maintenance of the request.
85+
void Tick(std::chrono::steady_clock::time_point);
86+
87+
// Sets the duration after which an in-progress operation may be considered
88+
// stalled and hence timeout.
89+
static void SetStallTimeout(std::chrono::steady_clock::duration timeout) {
90+
m_timeout_duration = timeout;
91+
}
92+
93+
// Return the stall timeout duration currently in use.
94+
static std::chrono::steady_clock::duration GetStallTimeout() {
95+
return m_timeout_duration;
96+
}
97+
8498
protected:
8599
// Send the request to the HTTP server.
86100
// Blocks until the request has completed.
@@ -118,6 +132,10 @@ class HTTPRequest {
118132
m_unpause_queue = queue;
119133
}
120134

135+
// Return whether or not the request has timed out since the last
136+
// call to send more data.
137+
bool Timeout() const {return m_timeout;}
138+
121139
typedef std::map<std::string, std::string> AttributeValueMap;
122140
AttributeValueMap query_parameters;
123141
AttributeValueMap headers;
@@ -190,6 +208,7 @@ class HTTPRequest {
190208
// call of the overall HTTPRequest
191209
bool m_is_streaming{
192210
false}; // Flag indicating this command is a streaming request.
211+
bool m_timeout{false}; // Flag indicating the request has timed out.
193212
bool m_result_ready{false}; // Flag indicating the results data is ready.
194213
off_t m_payload_size{0}; // Size of the entire upload payload; 0 if unknown.
195214
std::string m_protocol;
@@ -198,6 +217,14 @@ class HTTPRequest {
198217
CURL *m_curl_handle{nullptr}; // The curl handle for the ongoing request
199218
char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl
200219
unsigned m_retry_count{0};
220+
221+
// Time when the last request was sent on this object; used to determine
222+
// whether the operation has timed out.
223+
std::chrono::steady_clock::time_point m_last_request{std::chrono::steady_clock::now()};
224+
225+
// Duration after which a partially-completed request will timeout if
226+
// no progress has been made.
227+
static std::chrono::steady_clock::duration m_timeout_duration;
201228
};
202229

203230
class HTTPUpload : public HTTPRequest {

src/S3File.cc

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232

3333
#include <curl/curl.h>
3434

35+
#include <algorithm>
3536
#include <charconv>
37+
#include <iostream>
3638
#include <filesystem>
3739
#include <map>
3840
#include <memory>
@@ -41,6 +43,7 @@
4143
#include <stdlib.h>
4244
#include <string>
4345
#include <string_view>
46+
#include <thread>
4447
#include <vector>
4548

4649
using namespace XrdHTTPServer;
@@ -49,6 +52,10 @@ S3FileSystem *g_s3_oss = nullptr;
4952

5053
XrdVERSIONINFO(XrdOssGetFileSystem, S3);
5154

55+
std::vector<std::pair<std::weak_ptr<std::mutex>, std::weak_ptr<AmazonS3SendMultipartPart>>> S3File::m_pending_ops;
56+
std::mutex S3File::m_pending_lk;
57+
std::once_flag S3File::m_monitor_launch;
58+
5259
S3File::S3File(XrdSysError &log, S3FileSystem *oss)
5360
: m_log(log), m_oss(oss), content_length(0), last_modified(0),
5461
partNumber(1) {}
@@ -61,6 +68,9 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
6168
if (Oflag & O_APPEND) {
6269
m_log.Log(LogMask::Info, "Open", "File opened for append:", path);
6370
}
71+
if (Oflag & (O_RDWR | O_WRONLY)) {
72+
m_write_lk.reset(new std::mutex);
73+
}
6474

6575
char *asize_char;
6676
if ((asize_char = env.Get("oss.asize"))) {
@@ -206,16 +216,28 @@ int S3File::Fstat(struct stat *buff) {
206216
}
207217

208218
ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) {
219+
auto write_mutex = m_write_lk;
220+
if (!write_mutex) {
221+
return -EBADF;
222+
}
223+
std::lock_guard lk(*write_mutex);
224+
209225
if (offset != m_write_offset) {
210226
m_log.Emsg(
211227
"Write",
212228
"Out-of-order write detected; S3 requires writes to be in order");
229+
m_write_offset = -1;
230+
return -EIO;
231+
}
232+
if (m_write_offset == -1) {
233+
// Previous I/O error has occurred. File is in bad state, immediately fail.
213234
return -EIO;
214235
}
215236
if (uploadId == "") {
216237
AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log);
217238
if (!startUpload.SendRequest()) {
218239
m_log.Emsg("Write", "S3 multipart request failed");
240+
m_write_offset = -1;
219241
return -ENOENT;
220242
}
221243
std::string errMsg;
@@ -240,6 +262,10 @@ ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) {
240262
}
241263

242264
m_write_op.reset(new AmazonS3SendMultipartPart(m_ai, m_object, m_log));
265+
{
266+
std::lock_guard lk(m_pending_lk);
267+
m_pending_ops.emplace_back(m_write_lk, m_write_op);
268+
}
243269

244270
// Calculate the size of the current chunk, if it's known.
245271
m_part_size = m_s3_part_size;
@@ -271,8 +297,15 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) {
271297
if (!m_write_op->SendRequest(
272298
std::string_view(static_cast<const char *>(buffer), write_size),
273299
std::to_string(partNumber), uploadId, m_object_size, is_final)) {
300+
m_write_offset = -1;
301+
if (m_write_op->getErrorCode() == "E_TIMEOUT") {
302+
m_log.Emsg("Write", "Timeout when uploading to S3");
303+
m_write_op.reset();
304+
return -ETIMEDOUT;
305+
}
274306
m_log.Emsg("Write", "Upload to S3 failed: ",
275307
m_write_op->getErrorMessage().c_str());
308+
m_write_op.reset();
276309
return -EIO;
277310
}
278311
if (is_final) {
@@ -283,13 +316,17 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) {
283316
if (startPos == std::string::npos) {
284317
m_log.Emsg("Write", "Result from S3 does not include ETag:",
285318
resultString.c_str());
319+
m_write_op.reset();
320+
m_write_offset = -1;
286321
return -EIO;
287322
}
288323
std::size_t endPos = resultString.find("\"", startPos + 7);
289324
if (startPos == std::string::npos) {
290325
m_log.Emsg("Write",
291326
"Result from S3 does not include ETag end-character:",
292327
resultString.c_str());
328+
m_write_op.reset();
329+
m_write_offset = -1;
293330
return -EIO;
294331
}
295332
eTags.push_back(
@@ -301,6 +338,58 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) {
301338
return write_size;
302339
}
303340

341+
void S3File::LaunchMonitorThread() {
342+
std::call_once(m_monitor_launch, [] {
343+
std::thread t(S3File::CleanupTransfers);
344+
t.detach();
345+
});
346+
}
347+
348+
void S3File::CleanupTransfers() {
349+
while (true) {
350+
std::this_thread::sleep_for(HTTPRequest::GetStallTimeout()/3);
351+
try {
352+
CleanupTransfersOnce();
353+
} catch (std::exception &exc) {
354+
std::cerr << "Warning: caught unexpected exception when trying to clean transfers: " << exc.what() << std::endl;
355+
}
356+
}
357+
}
358+
359+
void S3File::CleanupTransfersOnce() {
360+
// Make a list of live transfers; erase any dead ones still on the list.
361+
std::vector<std::pair<std::shared_ptr<std::mutex>, std::shared_ptr<AmazonS3SendMultipartPart>>> existing_ops;
362+
{
363+
std::lock_guard lk(m_pending_lk);
364+
existing_ops.reserve(m_pending_ops.size());
365+
m_pending_ops.erase(std::remove_if(m_pending_ops.begin(), m_pending_ops.end(),
366+
[&](const auto &op) -> bool {
367+
auto op_lk = op.first.lock();
368+
if (!op_lk) {
369+
// In this case, the S3File is no longer open for write. No need to potentially
370+
// clean up the transfer.
371+
return true;
372+
}
373+
auto op_part = op.second.lock();
374+
if (!op_part) {
375+
// In this case, the S3File object is still open for writes but the upload has completed.
376+
// Remove from the list.
377+
return true;
378+
}
379+
// The S3File is open and upload is in-progress; we'll tick the transfer.
380+
existing_ops.emplace_back(op_lk, op_part);
381+
return false;
382+
}
383+
), m_pending_ops.end());
384+
}
385+
// For each live transfer, call `Tick` to advance the clock and possibly time things out.
386+
auto now = std::chrono::steady_clock::now();
387+
for (auto &info : existing_ops) {
388+
std::lock_guard lk(*info.first);
389+
info.second->Tick(now);
390+
}
391+
}
392+
304393
int S3File::Close(long long *retsz) {
305394
// If we opened the object in create mode but did not actually write
306395
// anything, make a quick zero-length file.
@@ -315,6 +404,7 @@ int S3File::Close(long long *retsz) {
315404
}
316405
}
317406
if (m_write_op) {
407+
std::lock_guard lk(*m_write_lk);
318408
m_part_size = m_part_written;
319409
auto written = ContinueSendPart(nullptr, 0);
320410
if (written < 0) {
@@ -375,6 +465,7 @@ XrdOss *XrdOssGetStorageSystem2(XrdOss *native_oss, XrdSysLogger *Logger,
375465

376466
envP->Export("XRDXROOTD_NOPOSC", "1");
377467

468+
S3File::LaunchMonitorThread();
378469
try {
379470
AmazonRequest::Init(*log);
380471
g_s3_oss = new S3FileSystem(Logger, config_fn, envP);

src/S3File.hh

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <XrdVersion.hh>
2828

2929
#include <memory>
30+
#include <thread>
3031

3132
#include <fcntl.h>
3233

@@ -96,7 +97,21 @@ class S3File : public XrdOssDF {
9697
size_t getContentLength() { return content_length; }
9798
time_t getLastModified() { return last_modified; }
9899

100+
// Launch the global monitor thread associated with S3File objects.
101+
// Currently, the monitor thread is used to cleanup in-progress file
102+
// transfers that have been abandoned.
103+
static void LaunchMonitorThread();
104+
99105
private:
106+
// Periodic cleanup of in-progress transfers.
107+
//
108+
// Iterates through the global list of pending multipart uploads
109+
// that may be paused. For each, call `Tick` on the upload and
110+
// see if the transfer has aborted.
111+
static void CleanupTransfers();
112+
113+
static void CleanupTransfersOnce();
114+
100115
ssize_t ContinueSendPart(const void *buffer, size_t size);
101116
XrdSysError &m_log;
102117
S3FileSystem *m_oss;
@@ -121,6 +136,34 @@ class S3File : public XrdOssDF {
121136
-1}; // Expected size of the completed object; -1 if unknown.
122137
std::string uploadId; // For creates, upload ID as assigned by t
123138
std::vector<std::string> eTags;
124-
std::unique_ptr<AmazonS3SendMultipartPart>
139+
140+
// The mutex protecting write activities. Writes must currently be serialized
141+
// as we aggregate them into large operations and upload them to the S3 endpoint.
142+
// The mutex prevents corruption of internal state.
143+
//
144+
// The periodic cleanup thread may decide to abort the in-progress transfer; to
145+
// do so, it'll need a reference to this lock that is independent of the lifetime of
146+
// the open file; hence, it's a shared pointer.
147+
std::shared_ptr<std::mutex> m_write_lk;
148+
149+
// The in-progress operation for a multi-part upload; its lifetime may be spread
150+
// across multiple write calls.
151+
std::shared_ptr<AmazonS3SendMultipartPart>
125152
m_write_op; // The in-progress operation for a multi-part upload.
153+
154+
// The multipart uploads represent an in-progress request and the global cleanup
155+
// thread may decide to trigger a failure if the request does not advance after
156+
// some time period.
157+
//
158+
// To do so, we must be able to lock the associated write mutex and then call `Tick`
159+
// on the upload. To avoid prolonging the lifetime of the objects beyond the S3File,
160+
// we hold onto a reference via a weak pointer. Mutable operations on this vector are
161+
// protected by the `m_pending_lk`.
162+
static std::vector<std::pair<std::weak_ptr<std::mutex>, std::weak_ptr<AmazonS3SendMultipartPart>>> m_pending_ops;
163+
164+
// Mutex protecting the m_pending_ops variable.
165+
static std::mutex m_pending_lk;
166+
167+
// Flag determining whether the monitoring thread has been launched.
168+
static std::once_flag m_monitor_launch;
126169
};

0 commit comments

Comments
 (0)