diff --git a/CMakeLists.txt b/CMakeLists.txt index 6226b04..0b2deac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,7 @@ project( xrootd-http/s3 ) option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the scitokens-cpp unit tests" OFF ) option( XROOTD_PLUGINS_EXTERNAL_GTEST "Use an external/pre-installed copy of GTest" OFF ) +option( VALGRIND "Run select unit tests under valgrind" OFF ) set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake ) set( CMAKE_BUILD_TYPE Debug ) @@ -20,6 +21,10 @@ if(NOT XROOTD_PLUGIN_VERSION) set(XROOTD_PLUGIN_VERSION ${XROOTD_PLUGIN_VERSION} CACHE INTERNAL "") endif() +if(VALGRIND) + find_program(VALGRIND_BIN valgrind REQUIRED) +endif() + macro(use_cxx17) if (CMAKE_VERSION VERSION_LESS "3.1") if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") @@ -66,17 +71,29 @@ add_definitions( -D_FILE_OFFSET_BITS=64 ) include_directories(${XROOTD_INCLUDES} ${CURL_INCLUDE_DIRS} ${LIBCRYPTO_INCLUDE_DIRS}) -add_library(XrdS3 SHARED src/CurlUtil.cc src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) -add_library(XrdHTTPServer SHARED src/CurlUtil.cc src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) +# On Linux, we hide all the symbols for the final libraries, exposing only what's needed for the XRootD +# runtime loader. So here we create the object library and will create a separate one for testing with +# the symbols exposed. +add_library(XrdS3Obj OBJECT src/CurlUtil.cc src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) +target_include_directories(XrdS3Obj INTERFACE tinyxml2::tinyxml2) +set_target_properties(XrdS3Obj PROPERTIES POSITION_INDEPENDENT_CODE ON) +target_link_libraries(XrdS3Obj -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} tinyxml2::tinyxml2 Threads::Threads) + +add_library(XrdS3 MODULE "$") +target_link_libraries(XrdS3 XrdS3Obj) -target_link_libraries(XrdS3 -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} tinyxml2::tinyxml2 Threads::Threads) -target_link_libraries(XrdHTTPServer -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} Threads::Threads) +add_library(XrdHTTPServerObj OBJECT src/CurlUtil.cc src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) +set_target_properties(XrdHTTPServerObj PROPERTIES POSITION_INDEPENDENT_CODE ON) +target_link_libraries(XrdHTTPServerObj -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} Threads::Threads) + +add_library(XrdHTTPServer MODULE "$") +target_link_libraries(XrdHTTPServer XrdHTTPServerObj) # The CMake documentation strongly advises against using these macros; instead, the pkg_check_modules # is supposed to fill out the full path to ${LIBCRYPTO_LIBRARIES}. As of cmake 3.26.1, this does not # appear to be the case on Mac OS X. Remove once no longer necessary! -target_link_directories(XrdS3 PUBLIC ${LIBCRYPTO_LIBRARY_DIRS}) -target_link_directories(XrdHTTPServer PUBLIC ${LIBCRYPTO_LIBRARY_DIRS}) +target_link_directories(XrdS3Obj PUBLIC ${LIBCRYPTO_LIBRARY_DIRS}) +target_link_directories(XrdHTTPServerObj PUBLIC ${LIBCRYPTO_LIBRARY_DIRS}) if(NOT APPLE) set_target_properties(XrdS3 PROPERTIES OUTPUT_NAME "XrdS3-${XROOTD_PLUGIN_VERSION}" SUFFIX ".so" LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/configs/export-lib-symbols") @@ -94,6 +111,11 @@ install( ) if( XROOTD_PLUGINS_BUILD_UNITTESTS ) + add_library(XrdS3Testing SHARED "$") + target_link_libraries(XrdS3Testing XrdS3Obj) + add_library(XrdHTTPServerTesting SHARED "$") + target_link_libraries(XrdHTTPServerTesting XrdHTTPServerObj) + if( NOT XROOTD_PLUGINS_EXTERNAL_GTEST ) include(ExternalProject) ExternalProject_Add(gtest diff --git a/src/AWSv4-impl.cc b/src/AWSv4-impl.cc index 1306742..c7d5d7a 100644 --- a/src/AWSv4-impl.cc +++ b/src/AWSv4-impl.cc @@ -104,7 +104,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest, free(buffer); } -bool doSha256(const std::string &payload, unsigned char *messageDigest, +bool doSha256(const std::string_view payload, unsigned char *messageDigest, unsigned int *mdLength) { EVP_MD_CTX *mdctx = EVP_MD_CTX_create(); if (mdctx == NULL) { @@ -116,7 +116,7 @@ bool doSha256(const std::string &payload, unsigned char *messageDigest, return false; } - if (!EVP_DigestUpdate(mdctx, payload.c_str(), payload.length())) { + if (!EVP_DigestUpdate(mdctx, payload.data(), payload.length())) { EVP_MD_CTX_destroy(mdctx); return false; } diff --git a/src/AWSv4-impl.hh b/src/AWSv4-impl.hh index 8f17e2f..7cd0b84 100644 --- a/src/AWSv4-impl.hh +++ b/src/AWSv4-impl.hh @@ -20,6 +20,7 @@ #include #include +#include namespace AWSv4Impl { @@ -34,7 +35,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest, unsigned int mdLength, std::string &hexEncoded); -bool doSha256(const std::string &payload, unsigned char *messageDigest, +bool doSha256(const std::string_view payload, unsigned char *messageDigest, unsigned int *mdLength); bool createSignature(const std::string &secretAccessKey, diff --git a/src/CurlUtil.cc b/src/CurlUtil.cc index 9e23265..66b7ee8 100644 --- a/src/CurlUtil.cc +++ b/src/CurlUtil.cc @@ -187,7 +187,8 @@ void CurlWorker::Run() { // is waiting on it is undefined behavior. auto queue_ref = m_queue; auto &queue = *queue_ref.get(); - m_logger.Log(LogMask::Debug, "CurlWorker::Run", "Started a curl worker"); + m_unpause_queue.reset(new HandlerQueue()); + m_logger.Log(LogMask::Debug, "Run", "Started a curl worker"); CURLM *multi_handle = curl_multi_init(); if (multi_handle == nullptr) { @@ -199,21 +200,40 @@ void CurlWorker::Run() { CURLMcode mres = CURLM_OK; std::vector waitfds; - waitfds.resize(1); + waitfds.resize(2); + // The `curl_multi_wait` call in the event loop needs to be interrupted when + // additional work comes into one of the two queues (either the global queue + // or the per-worker unpause queue). To do this, the queue objects will + // write to a file descriptor when a new HTTP request is ready; we add these + // FDs to the list of FDs for libcurl to poll in order to trigger a wakeup. + // The `Consume`/`TryConsume` methods will have a side-effect of reading + // from the pipe if a request is available. waitfds[0].fd = queue.PollFD(); waitfds[0].events = CURL_WAIT_POLLIN; waitfds[0].revents = 0; + waitfds[1].fd = m_unpause_queue->PollFD(); + waitfds[1].events = CURL_WAIT_POLLIN; + waitfds[1].revents = 0; while (true) { + while (running_handles < static_cast(m_max_ops)) { + auto op = m_unpause_queue->TryConsume(); + if (!op) { + break; + } + op->ContinueHandle(); + } while (running_handles < static_cast(m_max_ops)) { auto op = running_handles == 0 ? queue.Consume() : queue.TryConsume(); if (!op) { break; } + op->SetUnpauseQueue(m_unpause_queue); + auto curl = queue.GetHandle(); if (curl == nullptr) { - m_logger.Log(LogMask::Debug, "CurlWorker", + m_logger.Log(LogMask::Debug, "Run", "Unable to allocate a curl handle"); op->Fail("E_NOMEM", "Unable to get allocate a curl handle"); continue; @@ -223,10 +243,10 @@ void CurlWorker::Run() { op->Fail(op->getErrorCode(), op->getErrorMessage()); } } catch (...) { - m_logger.Log(LogMask::Debug, "CurlWorker", - "Unable to setup the curl handle"); + m_logger.Log(LogMask::Debug, "Run", + "Unable to set up the curl handle"); op->Fail("E_NOMEM", - "Failed to setup the curl handle for the operation"); + "Failed to set up the curl handle for the operation"); continue; } m_op_map[curl] = op; @@ -236,8 +256,7 @@ void CurlWorker::Run() { std::stringstream ss; ss << "Unable to add operation to the curl multi-handle: " << curl_multi_strerror(mres); - m_logger.Log(LogMask::Debug, "CurlWorker", - ss.str().c_str()); + m_logger.Log(LogMask::Debug, "Run", ss.str().c_str()); } op->Fail("E_CURL_LIB", "Unable to add operation to the curl multi-handle"); @@ -253,7 +272,7 @@ void CurlWorker::Run() { if (m_logger.getMsgMask() & LogMask::Debug) { std::stringstream ss; ss << "Curl worker thread " << getpid() << " is running " - << running_handles << "operations"; + << running_handles << " operations"; m_logger.Log(LogMask::Debug, "CurlWorker", ss.str().c_str()); } last_marker = now; @@ -277,7 +296,8 @@ void CurlWorker::Run() { } else if (mres != CURLM_OK) { if (m_logger.getMsgMask() & LogMask::Warning) { std::stringstream ss; - ss << "Failed to perform multi-handle operation: " << mres; + ss << "Failed to perform multi-handle operation: " + << curl_multi_strerror(mres); m_logger.Log(LogMask::Warning, "CurlWorker", ss.str().c_str()); } break; @@ -298,8 +318,11 @@ void CurlWorker::Run() { } auto &op = iter->second; auto res = msg->data.result; + m_logger.Log(LogMask::Dump, "Run", + "Processing result from curl"); op->ProcessCurlResult(iter->first, res); op->ReleaseHandle(iter->first); + op->Notify(); running_handles -= 1; curl_multi_remove_handle(multi_handle, iter->first); if (res == CURLE_OK) { @@ -307,8 +330,8 @@ void CurlWorker::Run() { queue.RecycleHandle(iter->first); } else { curl_easy_cleanup(iter->first); - m_op_map.erase(iter); } + m_op_map.erase(iter); } } while (msg); } diff --git a/src/CurlUtil.hh b/src/CurlUtil.hh index d97fbe9..9457403 100644 --- a/src/CurlUtil.hh +++ b/src/CurlUtil.hh @@ -20,10 +20,10 @@ #include #include +#include #include #include #include -#include // Forward dec'ls typedef void CURL; @@ -43,25 +43,25 @@ CURL *GetHandle(bool verbose); * multi-curl driver thread is based on polling FD's */ class HandlerQueue { -public: - HandlerQueue(); + public: + HandlerQueue(); - void Produce(HTTPRequest *handler); + void Produce(HTTPRequest *handler); - HTTPRequest *Consume(); - HTTPRequest *TryConsume(); + HTTPRequest *Consume(); + HTTPRequest *TryConsume(); - int PollFD() const {return m_read_fd;} + int PollFD() const { return m_read_fd; } - CURL *GetHandle(); - void RecycleHandle(CURL *); + CURL *GetHandle(); + void RecycleHandle(CURL *); -private: - std::deque m_ops; - thread_local static std::vector m_handles; - std::condition_variable m_cv; - std::mutex m_mutex; - const static unsigned m_max_pending_ops{20}; - int m_read_fd{-1}; - int m_write_fd{-1}; + private: + std::deque m_ops; + thread_local static std::vector m_handles; + std::condition_variable m_cv; + std::mutex m_mutex; + const static unsigned m_max_pending_ops{20}; + int m_read_fd{-1}; + int m_write_fd{-1}; }; diff --git a/src/CurlWorker.hh b/src/CurlWorker.hh index 928dd11..6e14a84 100644 --- a/src/CurlWorker.hh +++ b/src/CurlWorker.hh @@ -29,24 +29,25 @@ class HTTPRequest; class HandlerQueue; class CurlWorker { -public: - CurlWorker(std::shared_ptr queue, XrdSysError &logger) : - m_queue(queue), - m_logger(logger) - {} - - CurlWorker(const CurlWorker &) = delete; - - void Run(); - static void RunStatic(CurlWorker *myself); - static unsigned GetPollThreads() {return m_workers;} - -private: - std::shared_ptr m_queue; - std::unordered_map m_op_map; - XrdSysError &m_logger; - - const static unsigned m_workers{5}; - const static unsigned m_max_ops{20}; - const static unsigned m_marker_period{5}; + public: + CurlWorker(std::shared_ptr queue, XrdSysError &logger) + : m_queue(queue), m_logger(logger) {} + + CurlWorker(const CurlWorker &) = delete; + + void Run(); + static void RunStatic(CurlWorker *myself); + static unsigned GetPollThreads() { return m_workers; } + + private: + std::shared_ptr m_queue; + std::shared_ptr + m_unpause_queue; // Queue for notifications that a handle can be + // unpaused. + std::unordered_map m_op_map; + XrdSysError &m_logger; + + const static unsigned m_workers{5}; + const static unsigned m_max_ops{20}; + const static unsigned m_marker_period{5}; }; diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index de6a6fa..9cdf2b5 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -43,6 +43,8 @@ std::shared_ptr HTTPRequest::m_queue = std::make_unique(); bool HTTPRequest::m_workers_initialized = false; std::vector HTTPRequest::m_workers; +std::chrono::steady_clock::duration HTTPRequest::m_timeout_duration = + std::chrono::seconds(10); namespace { @@ -105,71 +107,76 @@ bool HTTPRequest::SendHTTPRequest(const std::string &payload) { } headers["Content-Type"] = "binary/octet-stream"; - std::string contentLength; - formatstr(contentLength, "%zu", payload.size()); - headers["Content-Length"] = contentLength; - // Another undocumented CURL feature: transfer-encoding is "chunked" - // by default for "PUT", which we really don't want. - headers["Transfer-Encoding"] = ""; - - return sendPreparedRequest(hostUrl, payload); + + return sendPreparedRequest(hostUrl, payload, payload.size(), true); } -static void dump(const char *text, FILE *stream, unsigned char *ptr, +static void dump(XrdSysError *log, const char *text, unsigned char *ptr, size_t size) { size_t i; size_t c; unsigned int width = 0x10; + if (!log) + return; - fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n", text, (long)size, - (long)size); + std::stringstream ss; + std::string stream; + formatstr(stream, "%s, %10.10ld bytes (0x%8.8lx)\n", text, (long)size, + (long)size); + ss << stream; for (i = 0; i < size; i += width) { - fprintf(stream, "%4.4lx: ", (long)i); + formatstr(stream, "%4.4lx: ", (long)i); + ss << stream; /* show hex to the left */ for (c = 0; c < width; c++) { - if (i + c < size) - fprintf(stream, "%02x ", ptr[i + c]); - else - fputs(" ", stream); + if (i + c < size) { + formatstr(stream, "%02x ", ptr[i + c]); + ss << stream; + } else { + ss << " "; + } } /* show data on the right */ for (c = 0; (c < width) && (i + c < size); c++) { char x = (ptr[i + c] >= 0x20 && ptr[i + c] < 0x80) ? ptr[i + c] : '.'; - fputc(x, stream); + ss << x; } - - fputc('\n', stream); /* newline */ + ss << std::endl; } + log->Log(LogMask::Dump, "Curl", ss.str().c_str()); } -static void dump_plain(const char *text, FILE *stream, unsigned char *ptr, - size_t size) { - fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n", text, (long)size, - (long)size); - fwrite(ptr, 1, size, stream); - fputs("\n", stream); +static void dumpPlain(XrdSysError *log, const char *text, unsigned char *ptr, + size_t size) { + if (!log) + return; + std::string info; + formatstr(info, "%s, %10.10ld bytes (0x%8.8lx)\n", text, (long)size, + (long)size); + log->Log(LogMask::Dump, "Curl", info.c_str()); } int debugCallback(CURL *handle, curl_infotype ci, char *data, size_t size, void *clientp) { const char *text; (void)handle; /* prevent compiler warning */ - (void)clientp; + auto log = static_cast(clientp); + if (!log) + return 0; switch (ci) { case CURLINFO_TEXT: - fputs("== Info: ", stderr); - fwrite(data, size, 1, stderr); + log->Log(LogMask::Dump, "CurlInfo", std::string(data, size).c_str()); default: /* in case a new one is introduced to shock us */ return 0; case CURLINFO_HEADER_OUT: text = "=> Send header"; - dump_plain(text, stderr, (unsigned char *)data, size); + dumpPlain(log, text, (unsigned char *)data, size); break; } return 0; @@ -179,18 +186,25 @@ int debugAndDumpCallback(CURL *handle, curl_infotype ci, char *data, size_t size, void *clientp) { const char *text; (void)handle; /* prevent compiler warning */ - (void)clientp; + auto log = reinterpret_cast(clientp); + if (!log) + return 0; + std::stringstream ss; switch (ci) { case CURLINFO_TEXT: - fputs("== Info: ", stderr); - fwrite(data, size, 1, stderr); + if (size && data[size - 1] == '\n') { + ss << std::string(data, size - 1); + } else { + ss << std::string(data, size); + } + log->Log(LogMask::Dump, "CurlInfo", ss.str().c_str()); default: /* in case a new one is introduced to shock us */ return 0; case CURLINFO_HEADER_OUT: text = "=> Send header"; - dump_plain(text, stderr, (unsigned char *)data, size); + dumpPlain(log, text, (unsigned char *)data, size); break; case CURLINFO_DATA_OUT: text = "=> Send data"; @@ -208,52 +222,120 @@ int debugAndDumpCallback(CURL *handle, curl_infotype ci, char *data, text = "<= Recv SSL data"; break; } - dump(text, stderr, (unsigned char *)data, size); + dump(log, text, (unsigned char *)data, size); return 0; } +void HTTPRequest::Payload::NotifyPaused() { m_parent.Notify(); } + // A callback function that gets passed to curl_easy_setopt for reading data // from the payload -size_t read_callback(char *buffer, size_t size, size_t n, void *v) { +size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) { // The callback gets the void pointer that we set with CURLOPT_READDATA. In // this case, it's a pointer to an HTTPRequest::Payload struct that contains // the data to be sent, along with the offset of the data that has already // been sent. HTTPRequest::Payload *payload = (HTTPRequest::Payload *)v; - if (payload->sentSoFar == payload->data->size()) { + if (payload->m_parent.Timeout()) { + payload->m_parent.errorCode = "E_TIMEOUT"; + payload->m_parent.errorMessage = "Upload operation timed out"; + return CURL_READFUNC_ABORT; + } + + if (payload->sentSoFar == static_cast(payload->data.size())) { payload->sentSoFar = 0; - return 0; + if (payload->final) { + return 0; + } else { + payload->NotifyPaused(); + return CURL_READFUNC_PAUSE; + } } size_t request = size * n; - if (request > payload->data->size()) { - request = payload->data->size(); + if (request > payload->data.size()) { + request = payload->data.size(); } - if (payload->sentSoFar + request > payload->data->size()) { - request = payload->data->size() - payload->sentSoFar; + if (payload->sentSoFar + request > payload->data.size()) { + request = payload->data.size() - payload->sentSoFar; } - memcpy(buffer, payload->data->data() + payload->sentSoFar, request); + memcpy(buffer, payload->data.data() + payload->sentSoFar, request); payload->sentSoFar += request; return request; } bool HTTPRequest::sendPreparedRequest(const std::string &uri, - const std::string &payload) { + const std::string_view payload, + off_t payload_size, bool final) { m_uri = uri; m_payload = payload; + m_payload_size = payload_size; + if (!m_is_streaming && !final) { + m_is_streaming = true; + } + if (m_timeout) { + errorCode = "E_TIMEOUT"; + errorMessage = "Transfer has timed out due to inactivity."; + return false; + } + if (!errorCode.empty()) { + return false; + } + + m_last_request = std::chrono::steady_clock::now(); + m_final = final; + // Detect whether we were given an undersized buffer in non-streaming mode + if (!m_is_streaming && payload_size && + payload_size != static_cast(payload.size())) { + errorCode = "E_LOGIC"; + std::stringstream ss; + ss << "Logic error: given an undersized payload (have " + << payload.size() << ", expected " << payload_size + << ") in a non-streaming mode"; + errorMessage = ss.str(); + return false; + } - m_queue->Produce(this); + m_result_ready = false; + if (m_unpause_queue) { + m_unpause_queue->Produce(this); + } else { + m_queue->Produce(this); + } std::unique_lock lk(m_mtx); m_cv.wait(lk, [&] { return m_result_ready; }); return errorCode.empty(); } +void HTTPRequest::Tick(std::chrono::steady_clock::time_point now) { + if (!m_is_streaming) { + return; + } + if (now - m_last_request <= m_timeout_duration) { + return; + } + + if (m_timeout) { + return; + } + m_timeout = true; + + if (m_unpause_queue) { + std::unique_lock lk(m_mtx); + m_result_ready = false; + m_unpause_queue->Produce(this); + m_cv.wait(lk, [&] { return m_result_ready; }); + } +} + bool HTTPRequest::ReleaseHandle(CURL *curl) { + m_curl_handle = nullptr; + if (curl == nullptr) return false; // Note: Any option that's conditionally set in `HTTPRequest::SetupHandle` @@ -281,6 +363,18 @@ bool HTTPRequest::ReleaseHandle(CURL *curl) { return true; } +bool HTTPRequest::ContinueHandle() { + if (!m_curl_handle) { + return false; + } + + m_callback_payload->data = m_payload; + m_callback_payload->final = m_final; + m_callback_payload->sentSoFar = 0; + curl_easy_pause(m_curl_handle, CURLPAUSE_CONT); + return true; +} + bool HTTPRequest::SetupHandle(CURL *curl) { m_log.Log(XrdHTTPServer::Debug, "SetupHandle", "Sending HTTP request", m_uri.c_str()); @@ -322,13 +416,19 @@ bool HTTPRequest::SetupHandle(CURL *curl) { return false; } - rv = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, m_payload.c_str()); + rv = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, m_payload.data()); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_POSTFIELDS ) failed."; return false; } + + if (m_is_streaming) { + errorCode = "E_NOT_IMPL"; + errorMessage = + "Streaming posts not implemented in backend; internal error."; + } } if (httpVerb == "PUT") { @@ -343,7 +443,7 @@ bool HTTPRequest::SetupHandle(CURL *curl) { // and the offset of the data Here, we tell curl_easy_setopt to use the // read_callback function to read the data from the payload m_callback_payload = std::unique_ptr( - new HTTPRequest::Payload{&m_payload, 0}); + new HTTPRequest::Payload{m_payload, 0, m_final, *this}); rv = curl_easy_setopt(curl, CURLOPT_READDATA, m_callback_payload.get()); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; @@ -351,13 +451,23 @@ bool HTTPRequest::SetupHandle(CURL *curl) { return false; } - rv = curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback); + rv = curl_easy_setopt(curl, CURLOPT_READFUNCTION, + HTTPRequest::ReadCallback); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_READFUNCTION ) failed."; return false; } + + if (m_payload_size || !m_is_streaming) { + if (curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, + m_payload_size) != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = + "curl_easy_setopt( CURLOPT_INFILESIZE_LARGE ) failed."; + } + } } rv = curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1); @@ -467,20 +577,48 @@ bool HTTPRequest::SetupHandle(CURL *curl) { rv = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, m_header_list.get()); if (rv != CURLE_OK) { - this->errorCode = "E_CURL_LIB"; - this->errorMessage = "curl_easy_setopt( CURLOPT_HTTPHEADER ) failed."; + errorCode = "E_CURL_LIB"; + errorMessage = "curl_easy_setopt( CURLOPT_HTTPHEADER ) failed."; return false; } if (m_log.getMsgMask() & LogMask::Debug) { rv = curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, debugCallback); - rv = curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); + if (rv != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the debug function"; + return false; + } + rv = curl_easy_setopt(curl, CURLOPT_DEBUGDATA, &m_log); + if (rv != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the debug function handler data"; + return false; + } } if (m_log.getMsgMask() & LogMask::Dump) { rv = curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, debugAndDumpCallback); - rv = curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); + if (rv != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the debug function"; + return false; + } + rv = curl_easy_setopt(curl, CURLOPT_DEBUGDATA, &m_log); + if (rv != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the debug function handler data"; + return false; + } + } + if (m_log.getMsgMask() & (LogMask::Dump | LogMask::Debug)) { + if (curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L) != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to enable verbose mode for libcurl"; + return false; + } } + m_curl_handle = curl; return true; } @@ -501,15 +639,14 @@ void HTTPRequest::Notify() { HTTPRequest::CurlResult HTTPRequest::ProcessCurlResult(CURL *curl, CURLcode rv) { - auto cleaner = [&](void *) { Notify(); }; - auto unique = std::unique_ptr((void *)1, cleaner); - if (rv != 0) { - errorCode = "E_CURL_IO"; - std::ostringstream error; - error << "curl failed (" << rv << "): '" << curl_easy_strerror(rv) - << "'."; - errorMessage = error.str(); + if (errorCode.empty()) { + errorCode = "E_CURL_IO"; + std::ostringstream error; + error << "curl failed (" << rv << "): '" << curl_easy_strerror(rv) + << "'."; + errorMessage = error.str(); + } return CurlResult::Fail; } diff --git a/src/HTTPCommands.hh b/src/HTTPCommands.hh index 520c8fe..6f8f39d 100644 --- a/src/HTTPCommands.hh +++ b/src/HTTPCommands.hh @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -63,10 +64,15 @@ class HTTPRequest { const std::string &getErrorMessage() const { return errorMessage; } const std::string &getResultString() const { return m_result; } - // Currently only used in PUTS, but potentially useful elsewhere + // State of the payload upload for the curl callbacks struct Payload { - const std::string *data; - size_t sentSoFar; + std::string_view data; + off_t sentSoFar{0}; + bool final{true}; + HTTPRequest &m_parent; + + void NotifyPaused(); // Notify the parent request the curl handle has + // been paused }; // Initialize libraries for HTTP. @@ -75,12 +81,60 @@ class HTTPRequest { // context. static void Init(XrdSysError &); + // Perform maintenance of the request. + void Tick(std::chrono::steady_clock::time_point); + + // Sets the duration after which an in-progress operation may be considered + // stalled and hence timeout. + static void SetStallTimeout(std::chrono::steady_clock::duration timeout) { + m_timeout_duration = timeout; + } + + // Return the stall timeout duration currently in use. + static std::chrono::steady_clock::duration GetStallTimeout() { + return m_timeout_duration; + } + protected: - bool sendPreparedRequest(const std::string &uri, - const std::string &payload); + // Send the request to the HTTP server. + // Blocks until the request has completed. + // If `final` is set to `false`, the HTTPRequest object will start streaming + // a request and assume that `sendPreparedRequest` will be repeated until + // all data is provided (the sum total of the chunks given is the + // payload_size). If payload_size is 0 and final is false, this indicates + // the complete size of the PUT is unknown and chunked encoding will be + // used. + // + // - url: URL, including query parameters, to use. + // - payload: The payload contents when uploading. + // - payload_size: Size of the entire payload (not just the current chunk). + // - final: True if this is the last or only payload for the request. False + // otherwise. + bool sendPreparedRequest(const std::string &url, + const std::string_view payload, off_t payload_size, + bool final); + + // Called by the curl handler thread that the request has been finished. + virtual void Notify(); const std::string &getProtocol() { return m_protocol; } + // Returns true if the command is a streaming/partial request. + // A streaming request is one that requires multiple calls to + // `sendPreparedRequest` to complete. + bool isStreamingRequest() const { return m_is_streaming; } + + // Record the unpause queue associated with this request. + // + // Future continuations of this request will be sent directly to this queue. + void SetUnpauseQueue(std::shared_ptr queue) { + m_unpause_queue = queue; + } + + // Return whether or not the request has timed out since the last + // call to send more data. + bool Timeout() const { return m_timeout; } + typedef std::map AttributeValueMap; AttributeValueMap query_parameters; AttributeValueMap headers; @@ -109,10 +163,12 @@ class HTTPRequest { private: enum class CurlResult { Ok, Fail, Retry }; - void Notify(); // Notify the main request thread the request has been - // processed by a worker virtual bool SetupHandle( CURL *curl); // Configure the curl handle to be used by a given request. + + virtual bool + ContinueHandle(); // Continue the request processing after a pause. + CurlResult ProcessCurlResult( CURL *curl, CURLcode rv); // Process a curl command that ran to completion. @@ -122,6 +178,11 @@ class HTTPRequest { // (curl request did not complete) bool ReleaseHandle( CURL *curl); // Cleanup any resources associated with the curl handle + CURL *getHandle() const { return m_curl_handle; } + + // Callback for libcurl when the library is ready to read more data from our + // buffer. + static size_t ReadCallback(char *buffer, size_t size, size_t n, void *v); const TokenFile *m_token{nullptr}; @@ -130,20 +191,40 @@ class HTTPRequest { m_workers_initialized; // The global state of the worker initialization. static std::shared_ptr m_queue; // Global queue for all HTTP requests to be processed. + std::shared_ptr m_unpause_queue{ + nullptr}; // Queue to notify the request can be resumed. static std::vector m_workers; // Set of all the curl worker threads. // The following variables manage the state of the request. std::mutex m_mtx; // Mutex guarding the results from the curl worker's callback - std::condition_variable m_cv; // Condition variable to notify the curl - // worker completed the callback - bool m_result_ready{false}; // Flag indicating the results data is ready. + + // Condition variable to notify the curl worker completed the callback. + std::condition_variable m_cv; + + bool m_final{false}; // Flag indicating this is the last sendPreparedRequest + // call of the overall HTTPRequest + bool m_is_streaming{ + false}; // Flag indicating this command is a streaming request. + bool m_timeout{false}; // Flag indicating the request has timed out. + bool m_result_ready{false}; // Flag indicating the results data is ready. + off_t m_payload_size{0}; // Size of the entire upload payload; 0 if unknown. std::string m_protocol; std::string m_uri; // URL to request from libcurl - std::string m_payload; + std::string_view m_payload; + CURL *m_curl_handle{nullptr}; // The curl handle for the ongoing request char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl unsigned m_retry_count{0}; + + // Time when the last request was sent on this object; used to determine + // whether the operation has timed out. + std::chrono::steady_clock::time_point m_last_request{ + std::chrono::steady_clock::now()}; + + // Duration after which a partially-completed request will timeout if + // no progress has been made. + static std::chrono::steady_clock::duration m_timeout_duration; }; class HTTPUpload : public HTTPRequest { diff --git a/src/S3Commands.cc b/src/S3Commands.cc index 587994b..a8b0848 100644 --- a/src/S3Commands.cc +++ b/src/S3Commands.cc @@ -33,6 +33,7 @@ #include #include #include +#include AmazonRequest::~AmazonRequest() {} @@ -40,8 +41,10 @@ bool AmazonRequest::SendRequest() { query_parameters.insert(std::make_pair("Version", "2012-10-01")); switch (signatureVersion) { - case 4: - return sendV4Request(canonicalizeQueryString()); + case 4: { + auto qs = canonicalizeQueryString(); + return sendV4Request(qs, qs.size(), true, true); + } default: this->errorCode = "E_INTERNAL"; this->errorMessage = "Invalid signature version."; @@ -133,7 +136,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest, hexEncoded); } -bool doSha256(const std::string &payload, unsigned char *messageDigest, +bool doSha256(const std::string_view payload, unsigned char *messageDigest, unsigned int *mdLength) { return AWSv4Impl::doSha256(payload, messageDigest, mdLength); } @@ -142,7 +145,7 @@ std::string pathEncode(const std::string &original) { return AWSv4Impl::pathEncode(original); } -bool AmazonRequest::createV4Signature(const std::string &payload, +bool AmazonRequest::createV4Signature(const std::string_view payload, std::string &authorizationValue, bool sendContentSHA) { // If we're using temporary credentials, we need to add the token @@ -225,18 +228,21 @@ bool AmazonRequest::createV4Signature(const std::string &payload, // The canonical payload hash is the lowercase hexadecimal string of the // (SHA256) hash value of the payload. - unsigned int mdLength = 0; - unsigned char messageDigest[EVP_MAX_MD_SIZE]; - if (!doSha256(payload, messageDigest, &mdLength)) { - this->errorCode = "E_INTERNAL"; - this->errorMessage = "Unable to hash payload."; - return false; - } std::string payloadHash; - convertMessageDigestToLowercaseHex(messageDigest, mdLength, payloadHash); if (sendContentSHA) { - headers["X-Amz-Content-Sha256"] = payloadHash; + unsigned int mdLength = 0; + unsigned char messageDigest[EVP_MAX_MD_SIZE]; + if (!doSha256(payload, messageDigest, &mdLength)) { + this->errorCode = "E_INTERNAL"; + this->errorMessage = "Unable to hash payload."; + return false; + } + convertMessageDigestToLowercaseHex(messageDigest, mdLength, + payloadHash); + } else { + payloadHash = "UNSIGNED-PAYLOAD"; } + headers["X-Amz-Content-Sha256"] = payloadHash; // The canonical list of headers is a sorted list of lowercase header // names paired via ':' with the trimmed header value, each pair @@ -315,16 +321,18 @@ bool AmazonRequest::createV4Signature(const std::string &payload, // // Hash the canonical request the way we did the payload. + std::string canonicalRequestHash; + unsigned int mdLength = 0; + unsigned char messageDigest[EVP_MAX_MD_SIZE]; if (!doSha256(canonicalRequest, messageDigest, &mdLength)) { - this->errorCode = "E_INTERNAL"; - this->errorMessage = "Unable to hash canonical request."; + errorCode = "E_INTERNAL"; + errorMessage = "Unable to hash canonical request."; return false; } - std::string canonicalRequestHash; convertMessageDigestToLowercaseHex(messageDigest, mdLength, canonicalRequestHash); - std::string s = this->service; + std::string s = service; if (s.empty()) { size_t i = host.find("."); if (i != std::string::npos) { @@ -408,8 +416,9 @@ bool AmazonRequest::createV4Signature(const std::string &payload, return true; } -bool AmazonRequest::sendV4Request(const std::string &payload, - bool sendContentSHA) { +bool AmazonRequest::sendV4Request(const std::string_view payload, + off_t payload_size, bool sendContentSHA, + bool final) { if ((getProtocol() != "http") && (getProtocol() != "https")) { this->errorCode = "E_INVALID_SERVICE_URL"; this->errorMessage = "Service URL not of a known protocol (http[s])."; @@ -438,41 +447,38 @@ bool AmazonRequest::sendV4Request(const std::string &payload, if (!canonicalQueryString.empty()) { url += "?" + canonicalQueryString; } - return sendPreparedRequest(url, payload); + return sendPreparedRequest(url, payload, payload_size, final); } // It's stated in the API documentation that you can upload to any region // via us-east-1, which is moderately crazy. -bool AmazonRequest::SendS3Request(const std::string &payload) { +bool AmazonRequest::SendS3Request(const std::string_view payload, + off_t payload_size, bool final) { + if (!m_streamingRequest && !final) { + if (payload_size == 0) { + errorCode = "E_INTERNAL"; + errorMessage = "S3 does not support streaming requests where the " + "payload size is unknown"; + return false; + } + m_streamingRequest = true; + } headers["Content-Type"] = "binary/octet-stream"; - std::string contentLength; - formatstr(contentLength, "%zu", payload.size()); - headers["Content-Length"] = contentLength; - // Another undocumented CURL feature: transfer-encoding is "chunked" - // by default for "PUT", which we really don't want. - headers["Transfer-Encoding"] = ""; + service = "s3"; if (region.empty()) { region = "us-east-1"; } - return sendV4Request(payload, true); + return sendV4Request(payload, payload_size, !m_streamingRequest, final); } // --------------------------------------------------------------------------- AmazonS3Upload::~AmazonS3Upload() {} -bool AmazonS3Upload::SendRequest(const std::string &payload, off_t offset, - size_t size) { - if (offset != 0 || size != 0) { - std::string range; - formatstr(range, "bytes=%lld-%lld", static_cast(offset), - static_cast(offset + size - 1)); - headers["Range"] = range.c_str(); - } - +bool AmazonS3Upload::SendRequest(const std::string_view &payload) { httpVerb = "PUT"; - return SendS3Request(payload); + return SendS3Request(payload, payload.size(), true); } // --------------------------------------------------------------------------- @@ -496,7 +502,7 @@ bool AmazonS3CompleteMultipartUpload::SendRequest( } payload += ""; - return SendS3Request(payload); + return SendS3Request(payload, payload.size(), true); } // --------------------------------------------------------------------------- @@ -508,17 +514,18 @@ bool AmazonS3CreateMultipartUpload::SendRequest() { query_parameters["x-id"] = "CreateMultipartUpload"; httpVerb = "POST"; - return SendS3Request(""); + return SendS3Request("", 0, true); } -bool AmazonS3SendMultipartPart::SendRequest(const std::string &payload, +bool AmazonS3SendMultipartPart::SendRequest(const std::string_view payload, const std::string &partNumber, - const std::string &uploadId) { + const std::string &uploadId, + size_t payloadSize, bool final) { query_parameters["partNumber"] = partNumber; query_parameters["uploadId"] = uploadId; includeResponseHeader = true; httpVerb = "PUT"; - return SendS3Request(payload); + return SendS3Request(payload, payloadSize, final); } // --------------------------------------------------------------------------- @@ -536,7 +543,7 @@ bool AmazonS3Download::SendRequest(off_t offset, size_t size) { httpVerb = "GET"; std::string noPayloadAllowed; - return SendS3Request(noPayloadAllowed); + return SendS3Request(noPayloadAllowed, 0, true); } // --------------------------------------------------------------------------- @@ -547,7 +554,7 @@ bool AmazonS3Head::SendRequest() { httpVerb = "HEAD"; includeResponseHeader = true; std::string noPayloadAllowed; - return SendS3Request(noPayloadAllowed); + return SendS3Request(noPayloadAllowed, 0, true); } // --------------------------------------------------------------------------- @@ -565,7 +572,7 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) { // Operation is on the bucket itself; alter the URL to remove the object hostUrl = getProtocol() + "://" + host + bucketPath; - return SendS3Request(""); + return SendS3Request("", 0, true); } bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId, diff --git a/src/S3Commands.hh b/src/S3Commands.hh index 56a5a29..3e72d2e 100644 --- a/src/S3Commands.hh +++ b/src/S3Commands.hh @@ -22,8 +22,12 @@ #include "S3AccessInfo.hh" #include +#include #include +// The base class for all requests to the S3 endpoint. +// Handles common activities like signing requests and forwarding to the +// underlying HTTPRequest object. class AmazonRequest : public HTTPRequest { public: AmazonRequest(const S3AccessInfo &ai, const std::string objectName, @@ -81,14 +85,35 @@ class AmazonRequest : public HTTPRequest { std::string &path); virtual bool SendRequest(); - virtual bool SendS3Request(const std::string &payload); + + // Send a request to the S3 service. + // + // - payload: contents of the request itself + // - payload_size: final size of the payload for uploads; 0 if unknown. + // - final: True if this is the last (or only) payload of the request; false + // otherwise + virtual bool SendS3Request(const std::string_view payload, + off_t payload_size, bool final); static void Init(XrdSysError &log) { HTTPRequest::Init(log); } protected: - bool sendV4Request(const std::string &payload, bool sendContentSHA = false); + // Send a request to the S3 service using the V4 signing method. + // + // - payload: contents of the request (for uploads or for XML-based + // commands) + // - payload_size: final size of the payload for uploads; 0 if unknown. + // - sendContentSHA: Whether to add the header indicating the checksum of + // the final payload. Servers may verify this is what they received. + // - final: True if this is the last (or only) payload of the request; false + // otherwise. + bool sendV4Request(const std::string_view payload, off_t payload_size, + bool sendContentSHA, bool final); bool retainObject; + bool m_streamingRequest{ + false}; // Is this a streaming request? Streaming requests will not + // include a SHA-256 signature in the header std::string accessKeyFile; std::string secretKeyFile; @@ -110,14 +135,14 @@ class AmazonRequest : public HTTPRequest { std::string m_style; private: - bool createV4Signature(const std::string &payload, + bool createV4Signature(const std::string_view payload, std::string &authorizationHeader, bool sendContentSHA = false); std::string canonicalizeQueryString(); }; -class AmazonS3Upload : public AmazonRequest { +class AmazonS3Upload final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -133,14 +158,13 @@ class AmazonS3Upload : public AmazonRequest { virtual ~AmazonS3Upload(); - virtual bool SendRequest(const std::string &payload, off_t offset, - size_t size); + bool SendRequest(const std::string_view &payload); protected: std::string path; }; -class AmazonS3CreateMultipartUpload : public AmazonRequest { +class AmazonS3CreateMultipartUpload final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -189,7 +213,7 @@ class AmazonS3CompleteMultipartUpload : public AmazonRequest { protected: }; -class AmazonS3SendMultipartPart : public AmazonRequest { +class AmazonS3SendMultipartPart final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -207,14 +231,22 @@ class AmazonS3SendMultipartPart : public AmazonRequest { virtual ~AmazonS3SendMultipartPart(); - virtual bool SendRequest(const std::string &payload, - const std::string &partNumber, - const std::string &uploadId); + // Send (potentially a partial) payload up to S3. + // Blocks until all the data in payload has been sent to AWS. + // + // - payload: The data corresponding to this partial upload. + // - partNumber: The portion of the multipart upload. + // - uploadId: The upload ID assigned by the creation of the multipart + // upload + // - final: Set to true if this is the last of the part; false otherwise + bool SendRequest(const std::string_view payload, + const std::string &partNumber, const std::string &uploadId, + size_t payloadSize, bool final); protected: }; -class AmazonS3Download : public AmazonRequest { +class AmazonS3Download final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -233,7 +265,7 @@ class AmazonS3Download : public AmazonRequest { virtual bool SendRequest(off_t offset, size_t size); }; -class AmazonS3Head : public AmazonRequest { +class AmazonS3Head final : public AmazonRequest { using AmazonRequest::SendRequest; public: @@ -250,6 +282,11 @@ class AmazonS3Head : public AmazonRequest { virtual ~AmazonS3Head(); virtual bool SendRequest(); + + off_t getSize() const { return m_size; } + + private: + off_t m_size{0}; }; struct S3ObjectInfo { @@ -257,7 +294,7 @@ struct S3ObjectInfo { std::string m_key; }; -class AmazonS3List : public AmazonRequest { +class AmazonS3List final : public AmazonRequest { using AmazonRequest::SendRequest; public: diff --git a/src/S3File.cc b/src/S3File.cc index 6ad133e..fa6a56f 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -32,13 +32,18 @@ #include +#include +#include #include +#include #include #include #include #include #include #include +#include +#include #include using namespace XrdHTTPServer; @@ -47,16 +52,40 @@ S3FileSystem *g_s3_oss = nullptr; XrdVERSIONINFO(XrdOssGetFileSystem, S3); +std::vector, + std::weak_ptr>> + S3File::m_pending_ops; +std::mutex S3File::m_pending_lk; +std::once_flag S3File::m_monitor_launch; + S3File::S3File(XrdSysError &log, S3FileSystem *oss) : m_log(log), m_oss(oss), content_length(0), last_modified(0), - write_buffer(""), partNumber(1) {} + partNumber(1) {} int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { if (Oflag & O_CREAT) { - m_log.Log(LogMask::Info, "File opened for creation: ", path); + m_log.Log(LogMask::Info, "Open", "File opened for creation:", path); + m_create = true; } if (Oflag & O_APPEND) { - m_log.Log(LogMask::Info, "File opened for append: ", path); + m_log.Log(LogMask::Info, "Open", "File opened for append:", path); + } + if (Oflag & (O_RDWR | O_WRONLY)) { + m_write_lk.reset(new std::mutex); + } + + char *asize_char; + if ((asize_char = env.Get("oss.asize"))) { + off_t result{0}; + auto [ptr, ec] = std::from_chars( + asize_char, asize_char + strlen(asize_char), result); + if (ec == std::errc()) { + m_object_size = result; + } else { + m_log.Log(LogMask::Warning, + "Opened file has oss.asize set to an unparseable value: ", + asize_char); + } } if (m_log.getMsgMask() & XrdHTTPServer::Debug) { @@ -81,12 +110,13 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { // This flag is not set when it's going to be a read operation // so we check if the file exists in order to be able to return a 404 - if (!Oflag) { + if (!Oflag || (Oflag & O_APPEND)) { AmazonS3Head head(m_ai, m_object, m_log); if (!head.SendRequest()) { return -ENOENT; } + head.getSize(); } return 0; @@ -188,66 +218,285 @@ int S3File::Fstat(struct stat *buff) { } ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) { + auto write_mutex = m_write_lk; + if (!write_mutex) { + return -EBADF; + } + std::lock_guard lk(*write_mutex); + + // Small object optimization -- if this is the full object, upload + // it immediately. + if (!m_write_offset && m_object_size == static_cast(size)) { + AmazonS3Upload upload(m_ai, m_object, m_log); + m_write_lk.reset(); + if (!upload.SendRequest( + std::string_view(static_cast(buffer), size))) { + m_log.Log(LogMask::Warning, "Write", + "Failed to create small object"); + return -EIO; + } else { + m_write_offset += size; + m_log.Log(LogMask::Debug, "Write", + "Creation of small object succeeded", + std::to_string(size).c_str()); + return size; + } + } + + if (offset != m_write_offset) { + m_log.Emsg( + "Write", + "Out-of-order write detected; S3 requires writes to be in order"); + m_write_offset = -1; + return -EIO; + } + if (m_write_offset == -1) { + // Previous I/O error has occurred. File is in bad state, immediately + // fail. + return -EIO; + } if (uploadId == "") { AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log); if (!startUpload.SendRequest()) { - m_log.Emsg("Open", "S3 multipart request failed"); + m_log.Emsg("Write", "S3 multipart request failed"); + m_write_offset = -1; return -ENOENT; } std::string errMsg; startUpload.Results(uploadId, errMsg); } - std::string payload((char *)buffer, size); - size_t payload_size = payload.length(); - if (payload_size != size) { - return -ENOENT; + // If we don't know the final object size, we must use the streaming + // variant. + if (m_object_size == -1) { + return WriteStreaming(buffer, offset, size); } - write_buffer += payload; - // XXX should this be configurable? 100mb gives us a TB of file size. It - // doesn't seem terribly useful to be much smaller and it's not clear the S3 - // API will work if it's much larger. - if (write_buffer.length() > 100000000) { - if (SendPart() == -ENOENT) { - return -ENOENT; + size_t written = 0; + while (written != size) { + if (m_write_op) { + auto write_size = ContinueSendPart(buffer, size); + if (write_size < 0) { + return write_size; + } + offset += write_size; + m_write_offset += write_size; + buffer = static_cast(buffer) + write_size; + size -= write_size; + written += write_size; + if (!size) { + return written; + } + } + + m_write_op.reset(new AmazonS3SendMultipartPart(m_ai, m_object, m_log)); + { + std::lock_guard lk(m_pending_lk); + m_pending_ops.emplace_back(m_write_lk, m_write_op); + } + + // Calculate the size of the current chunk, if it's known. + m_part_size = m_s3_part_size; + if (!m_object_size) { + m_part_size = 0; + } else if (m_write_offset + static_cast(m_part_size) > + m_object_size) { + m_part_size = m_object_size - m_write_offset; } } - return size; + return written; +} + +ssize_t S3File::WriteStreaming(const void *buffer, off_t offset, size_t size) { + m_streaming_buffer.append( + std::string_view(static_cast(buffer), size)); + m_write_offset += size; + + ssize_t rv = size; + if (m_streaming_buffer.size() > 100'000'000) { + rv = SendPartStreaming(); + } + return rv; } -int S3File::SendPart() { - int length = write_buffer.length(); +ssize_t S3File::SendPartStreaming() { + int length = m_streaming_buffer.length(); AmazonS3SendMultipartPart upload_part_request = AmazonS3SendMultipartPart(m_ai, m_object, m_log); - if (!upload_part_request.SendRequest( - write_buffer, std::to_string(partNumber), uploadId)) { - m_log.Emsg("SendPart", "upload.SendRequest() failed"); - return -ENOENT; + if (!upload_part_request.SendRequest(m_streaming_buffer, + std::to_string(partNumber), uploadId, + m_streaming_buffer.size(), true)) { + m_log.Log(LogMask::Debug, "SendPart", "upload.SendRequest() failed"); + return -EIO; } else { - m_log.Emsg("SendPart", "upload.SendRequest() succeeded"); + m_log.Log(LogMask::Debug, "SendPart", "upload.SendRequest() succeeded"); std::string resultString = upload_part_request.getResultString(); std::size_t startPos = resultString.find("ETag:"); std::size_t endPos = resultString.find("\"", startPos + 7); eTags.push_back( resultString.substr(startPos + 7, endPos - startPos - 7)); - partNumber++; - write_buffer = ""; + m_streaming_buffer.clear(); } return length; } +ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { + m_part_written += size; + auto write_size = size; + if (m_part_written > m_s3_part_size) { + write_size = size - (m_part_written - m_s3_part_size); + m_part_written = m_s3_part_size; + } + auto is_final = (m_part_size > 0 && m_part_written == m_part_size) || + m_part_written == m_s3_part_size; + if (m_log.getMsgMask() & LogMask::Dump) { + std::stringstream ss; + ss << "Sending request with buffer of size=" << write_size + << " and is_final=" << is_final; + m_log.Log(LogMask::Dump, "ContinueSendPart", ss.str().c_str()); + } + if (!m_write_op->SendRequest( + std::string_view(static_cast(buffer), write_size), + std::to_string(partNumber), uploadId, m_object_size, is_final)) { + m_write_offset = -1; + if (m_write_op->getErrorCode() == "E_TIMEOUT") { + m_log.Emsg("Write", "Timeout when uploading to S3"); + m_write_op.reset(); + return -ETIMEDOUT; + } + m_log.Emsg("Write", "Upload to S3 failed: ", + m_write_op->getErrorMessage().c_str()); + m_write_op.reset(); + return -EIO; + } + if (is_final) { + m_part_written = 0; + m_part_size = 0; + auto &resultString = m_write_op->getResultString(); + std::size_t startPos = resultString.find("ETag:"); + if (startPos == std::string::npos) { + m_log.Emsg("Write", "Result from S3 does not include ETag:", + resultString.c_str()); + m_write_op.reset(); + m_write_offset = -1; + return -EIO; + } + std::size_t endPos = resultString.find("\"", startPos + 7); + if (startPos == std::string::npos) { + m_log.Emsg("Write", + "Result from S3 does not include ETag end-character:", + resultString.c_str()); + m_write_op.reset(); + m_write_offset = -1; + return -EIO; + } + eTags.push_back( + resultString.substr(startPos + 7, endPos - startPos - 7)); + m_write_op.reset(); + partNumber++; + } + + return write_size; +} + +void S3File::LaunchMonitorThread() { + std::call_once(m_monitor_launch, [] { + std::thread t(S3File::CleanupTransfers); + t.detach(); + }); +} + +void S3File::CleanupTransfers() { + while (true) { + std::this_thread::sleep_for(HTTPRequest::GetStallTimeout() / 3); + try { + CleanupTransfersOnce(); + } catch (std::exception &exc) { + std::cerr << "Warning: caught unexpected exception when trying to " + "clean transfers: " + << exc.what() << std::endl; + } + } +} + +void S3File::CleanupTransfersOnce() { + // Make a list of live transfers; erase any dead ones still on the list. + std::vector, + std::shared_ptr>> + existing_ops; + { + std::lock_guard lk(m_pending_lk); + existing_ops.reserve(m_pending_ops.size()); + m_pending_ops.erase( + std::remove_if(m_pending_ops.begin(), m_pending_ops.end(), + [&](const auto &op) -> bool { + auto op_lk = op.first.lock(); + if (!op_lk) { + // In this case, the S3File is no longer open + // for write. No need to potentially clean + // up the transfer. + return true; + } + auto op_part = op.second.lock(); + if (!op_part) { + // In this case, the S3File object is still + // open for writes but the upload has + // completed. Remove from the list. + return true; + } + // The S3File is open and upload is in-progress; + // we'll tick the transfer. + existing_ops.emplace_back(op_lk, op_part); + return false; + }), + m_pending_ops.end()); + } + // For each live transfer, call `Tick` to advance the clock and possibly + // time things out. + auto now = std::chrono::steady_clock::now(); + for (auto &info : existing_ops) { + std::lock_guard lk(*info.first); + info.second->Tick(now); + } +} + int S3File::Close(long long *retsz) { - // this is only true if a buffer exists that needs to be drained - if (write_buffer.length() > 0) { - if (SendPart() == -ENOENT) { + // If we opened the object in create mode but did not actually write + // anything, make a quick zero-length file. + if (m_create && !m_write_offset) { + AmazonS3Upload upload(m_ai, m_object, m_log); + if (!upload.SendRequest("")) { + m_log.Log(LogMask::Warning, "Close", + "Failed to create zero-length object"); return -ENOENT; } else { - m_log.Emsg("Close", "Closed our S3 file"); + m_log.Log(LogMask::Debug, "Close", + "Creation of zero-length object succeeded"); + return 0; + } + } + if (m_write_lk) { + std::lock_guard lk(*m_write_lk); + if (m_object_size == -1 && !m_streaming_buffer.empty()) { + m_log.Emsg("Close", "Sending final part of length", + std::to_string(m_streaming_buffer.size()).c_str()); + auto rv = SendPartStreaming(); + if (rv < 0) { + return rv; + } + } else if (m_write_op) { + m_part_size = m_part_written; + auto written = ContinueSendPart(nullptr, 0); + if (written < 0) { + m_log.Log(LogMask::Warning, "Close", + "Failed to complete the last S3 upload"); + return -EIO; + } } } + // this is only true if some parts have been written and need to be // finalized if (partNumber > 1) { @@ -262,16 +511,6 @@ int S3File::Close(long long *retsz) { } return 0; - - /* Original write code - std::string payload((char *)buffer, size); - if (!upload.SendRequest(payload, offset, size)) { - m_log.Emsg("Open", "upload.SendRequest() failed"); - return -ENOENT; - } else { - m_log.Emsg("Open", "upload.SendRequest() succeeded"); - return 0; - } */ } extern "C" { @@ -300,6 +539,7 @@ XrdOss *XrdOssGetStorageSystem2(XrdOss *native_oss, XrdSysLogger *Logger, envP->Export("XRDXROOTD_NOPOSC", "1"); + S3File::LaunchMonitorThread(); try { AmazonRequest::Init(*log); g_s3_oss = new S3FileSystem(Logger, config_fn, envP); diff --git a/src/S3File.hh b/src/S3File.hh index 250cb77..abd69ee 100644 --- a/src/S3File.hh +++ b/src/S3File.hh @@ -27,12 +27,16 @@ #include #include +#include +#include #include int parse_path(const S3FileSystem &fs, const char *path, std::string &exposedPath, std::string &object); +class AmazonS3SendMultipartPart; + class S3File : public XrdOssDF { public: S3File(XrdSysError &log, S3FileSystem *oss); @@ -94,8 +98,32 @@ class S3File : public XrdOssDF { size_t getContentLength() { return content_length; } time_t getLastModified() { return last_modified; } + // Launch the global monitor thread associated with S3File objects. + // Currently, the monitor thread is used to cleanup in-progress file + // transfers that have been abandoned. + static void LaunchMonitorThread(); + private: - int SendPart(); + // Periodic cleanup of in-progress transfers. + // + // Iterates through the global list of pending multipart uploads + // that may be paused. For each, call `Tick` on the upload and + // see if the transfer has aborted. + static void CleanupTransfers(); + + // Single cleanup run for in-progress transfers. + static void CleanupTransfersOnce(); + + // Write data while in "streaming mode" where we don't know the + // ultimate size of the file (and hence can't start streaming + // partitions immediately). + ssize_t WriteStreaming(const void *buffer, off_t offset, size_t size); + + // Send a fully-buffered part of the file; only used while in + // "streaming" mode. + ssize_t SendPartStreaming(); + + ssize_t ContinueSendPart(const void *buffer, size_t size); XrdSysError &m_log; S3FileSystem *m_oss; @@ -105,8 +133,53 @@ class S3File : public XrdOssDF { size_t content_length; time_t last_modified; - std::string write_buffer; - std::string uploadId; + static const size_t m_s3_part_size = + 100'000'000; // The size of each S3 chunk. + + bool m_create{false}; int partNumber; + size_t m_part_written{ + 0}; // Number of bytes written for the current upload chunk. + size_t m_part_size{0}; // Size of the current upload chunk (0 if unknon); + off_t m_write_offset{0}; // Offset of the file pointer for writes (helps + // detect out-of-order writes). + off_t m_object_size{ + -1}; // Expected size of the completed object; -1 if unknown. + std::string uploadId; // For creates, upload ID as assigned by t std::vector eTags; + // When using the "streaming mode", the upload part has to be completely + // buffered within the S3File object; this is the current buffer. + std::string m_streaming_buffer; + + // The mutex protecting write activities. Writes must currently be + // serialized as we aggregate them into large operations and upload them to + // the S3 endpoint. The mutex prevents corruption of internal state. + // + // The periodic cleanup thread may decide to abort the in-progress transfer; + // to do so, it'll need a reference to this lock that is independent of the + // lifetime of the open file; hence, it's a shared pointer. + std::shared_ptr m_write_lk; + + // The in-progress operation for a multi-part upload; its lifetime may be + // spread across multiple write calls. + std::shared_ptr + m_write_op; // The in-progress operation for a multi-part upload. + + // The multipart uploads represent an in-progress request and the global + // cleanup thread may decide to trigger a failure if the request does not + // advance after some time period. + // + // To do so, we must be able to lock the associated write mutex and then + // call `Tick` on the upload. To avoid prolonging the lifetime of the + // objects beyond the S3File, we hold onto a reference via a weak pointer. + // Mutable operations on this vector are protected by the `m_pending_lk`. + static std::vector, + std::weak_ptr>> + m_pending_ops; + + // Mutex protecting the m_pending_ops variable. + static std::mutex m_pending_lk; + + // Flag determining whether the monitoring thread has been launched. + static std::once_flag m_monitor_launch; }; diff --git a/src/logging.cc b/src/logging.cc index db1492b..6a398c7 100644 --- a/src/logging.cc +++ b/src/logging.cc @@ -37,7 +37,7 @@ std::string XrdHTTPServer::LogMaskToString(int mask) { has_entry = true; } if (mask & LogMask::Debug) { - ss << "debug"; + ss << (has_entry ? ", " : "") << "debug"; has_entry = true; } if (mask & LogMask::Info) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 21d5c8a..49676d6 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,29 +1,10 @@ -add_executable( s3-gtest s3_tests.cc - ../src/AWSv4-impl.cc - ../src/CurlUtil.cc - ../src/logging.cc - ../src/S3AccessInfo.cc - ../src/S3Directory.cc - ../src/S3File.cc - ../src/S3FileSystem.cc - ../src/shortfile.cc - ../src/stl_string_utils.cc - ../src/TokenFile.cc - ../src/HTTPCommands.cc - ../src/S3Commands.cc -) +add_executable( s3-gtest s3_tests.cc ) -add_executable( http-gtest http_tests.cc - ../src/CurlUtil.cc - ../src/HTTPFile.cc - ../src/HTTPFileSystem.cc - ../src/HTTPCommands.cc - ../src/stl_string_utils.cc - ../src/TokenFile.cc - ../src/shortfile.cc - ../src/logging.cc -) +add_executable( s3-unit-test s3_unit_tests.cc ) +add_executable( http-gtest http_tests.cc ) + +include(GoogleTest) if(XROOTD_PLUGINS_EXTERNAL_GTEST) set(LIBGTEST "gtest") @@ -34,9 +15,16 @@ else() set(LIBGTEST "${CMAKE_BINARY_DIR}/external/gtest/src/gtest-build/lib/libgtest.a") endif() -target_link_libraries(s3-gtest XrdS3 "${LIBGTEST}" pthread) -target_link_libraries(http-gtest XrdHTTPServer "${LIBGTEST}" pthread) +target_link_libraries(s3-gtest XrdS3Testing "${LIBGTEST}" pthread) +target_link_libraries(s3-unit-test XrdS3Testing "${LIBGTEST}" pthread) +target_link_libraries(http-gtest XrdHTTPServerTesting "${LIBGTEST}" pthread) +gtest_add_tests(TARGET s3-unit-test TEST_LIST s3UnitTests) +set_tests_properties(${s3UnitTests} + PROPERTIES + FIXTURES_REQUIRED S3::s3_basic + ENVIRONMENT "ENV_FILE=${CMAKE_BINARY_DIR}/tests/s3_basic/setup.sh" +) add_test( NAME @@ -52,6 +40,20 @@ add_test( ${CMAKE_CURRENT_BINARY_DIR}/http-gtest "${CMAKE_BINARY_DIR}/tests/basic/setup.sh" ) +if (VALGRIND) + add_test( + NAME + valgrind-s3 + COMMAND + ${VALGRIND_BIN} ${CMAKE_CURRENT_BINARY_DIR}/s3-unit-test -R FileSystemS3Fixture.UploadLargePartAligned + ) + + set_tests_properties(valgrind-s3 + PROPERTIES + FIXTURES_REQUIRED S3::s3_basic + ) +endif() + set_tests_properties(http-unit PROPERTIES FIXTURES_REQUIRED HTTP::basic diff --git a/test/s3-setup.sh b/test/s3-setup.sh index 1665ad2..dd8e3c8 100755 --- a/test/s3-setup.sh +++ b/test/s3-setup.sh @@ -150,6 +150,8 @@ export MINIO_ROOT_USER=minioadmin export MINIO_ROOT_PASSWORD=QXDEiQxQw8qY MINIO_USER=miniouser MINIO_PASSWORD=2Z303QCzRI7s +printf "%s" "$MINIO_USER" > "$RUNDIR/access_key" +printf "%s" "$MINIO_PASSWORD" > "$RUNDIR/secret_key" # Launch minio "$MINIO_BIN" --certs-dir "$MINIO_CERTSDIR" server --address "$(hostname):0" "$MINIO_DATADIR" 0<&- >"$BINARY_DIR/tests/$TEST_NAME/server.log" 2>&1 & @@ -176,6 +178,8 @@ echo "Minio API server started on $MINIO_URL" cat > "$BINARY_DIR/tests/$TEST_NAME/setup.sh" < "$RUNDIR/hello_world.txt" #### export XROOTD_CONFIG="$XROOTD_CONFIGDIR/xrootd.cfg" +BUCKET_NAME=test-bucket cat > "$XROOTD_CONFIG" <> "$BINARY_DIR/tests/$TEST_NAME/setup.sh" < #include -#include #include class TestAmazonRequest : public AmazonRequest { @@ -65,41 +64,6 @@ TEST(TestS3URLGeneration, Test1) { ASSERT_EQ(generatedHostUrl, "https://s3-service.com:443/test-object"); } -class FileSystemFixtureBase : public testing::Test { - protected: - FileSystemFixtureBase() - : m_log(new XrdSysLogger(2, 0)) // Log to stderr, no log rotation - {} - - void SetUp() override { - setenv("XRDINSTANCE", "xrootd", 1); - char tmp_configfn[] = "/tmp/xrootd-s3-gtest.cfg.XXXXXX"; - auto result = mkstemp(tmp_configfn); - ASSERT_NE(result, -1) << "Failed to create temp file (" - << strerror(errno) << ", errno=" << errno << ")"; - m_configfn = std::string(tmp_configfn); - - auto contents = GetConfig(); - ASSERT_FALSE(contents.empty()); - ASSERT_TRUE(writeShortFile(m_configfn, contents, 0)) - << "Failed to write to temp file (" << strerror(errno) - << ", errno=" << errno << ")"; - } - - void TearDown() override { - if (!m_configfn.empty()) { - auto rv = unlink(m_configfn.c_str()); - ASSERT_EQ(rv, 0) << "Failed to delete temp file (" - << strerror(errno) << ", errno=" << errno << ")"; - } - } - - virtual std::string GetConfig() = 0; - - std::string m_configfn; - std::unique_ptr m_log; -}; - class FileSystemS3VirtualBucket : public FileSystemFixtureBase { protected: virtual std::string GetConfig() override { diff --git a/test/s3_tests_common.hh b/test/s3_tests_common.hh new file mode 100644 index 0000000..d5e797c --- /dev/null +++ b/test/s3_tests_common.hh @@ -0,0 +1,64 @@ +/*************************************************************** + * + * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +#include "../src/shortfile.hh" + +#include +#include + +#include +#include + +#include +#include +#include + +class FileSystemFixtureBase : public testing::Test { + protected: + FileSystemFixtureBase() + : m_log(new XrdSysLogger(2, 0)) // Log to stderr, no log rotation + {} + + void SetUp() override { + setenv("XRDINSTANCE", "xrootd", 1); + char tmp_configfn[] = "/tmp/xrootd-s3-gtest.cfg.XXXXXX"; + auto result = mkstemp(tmp_configfn); + ASSERT_NE(result, -1) << "Failed to create temp file (" + << strerror(errno) << ", errno=" << errno << ")"; + m_configfn = std::string(tmp_configfn); + + auto contents = GetConfig(); + ASSERT_FALSE(contents.empty()); + ASSERT_TRUE(writeShortFile(m_configfn, contents, 0)) + << "Failed to write to temp file (" << strerror(errno) + << ", errno=" << errno << ")"; + } + + void TearDown() override { + if (!m_configfn.empty()) { + auto rv = unlink(m_configfn.c_str()); + ASSERT_EQ(rv, 0) << "Failed to delete temp file (" + << strerror(errno) << ", errno=" << errno << ")"; + } + } + + virtual std::string GetConfig() = 0; + + std::string m_configfn; + std::unique_ptr m_log; +}; diff --git a/test/s3_unit_tests.cc b/test/s3_unit_tests.cc new file mode 100644 index 0000000..d09bab4 --- /dev/null +++ b/test/s3_unit_tests.cc @@ -0,0 +1,301 @@ +/*************************************************************** + * + * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +// +// The tests in this file are meant to work with the minio-based fixture, +// meaning no internet connectivity is needed to run them. +// + +#include "../src/S3Commands.hh" +#include "../src/S3File.hh" +#include "../src/S3FileSystem.hh" +#include "s3_tests_common.hh" + +#include +#include +#include + +#include +#include +#include + +std::once_flag g_init_once; +std::string g_ca_file; +std::string g_minio_url; +std::string g_bucket_name; +std::string g_access_key_file; +std::string g_secret_key_file; + +void parseEnvFile(const std::string &fname) { + std::ifstream fh(fname); + if (!fh.is_open()) { + std::cerr << "Failed to open env file: " << strerror(errno); + exit(1); + } + std::string line; + while (std::getline(fh, line)) { + auto idx = line.find("="); + if (idx == std::string::npos) { + continue; + } + auto key = line.substr(0, idx); + auto val = line.substr(idx + 1); + if (key == "X509_CA_FILE") { + g_ca_file = val; + setenv("X509_CERT_FILE", g_ca_file.c_str(), 1); + } else if (key == "MINIO_URL") { + g_minio_url = val; + } else if (key == "BUCKET_NAME") { + g_bucket_name = val; + } else if (key == "ACCESS_KEY_FILE") { + g_access_key_file = val; + } else if (key == "SECRET_KEY_FILE") { + g_secret_key_file = val; + } + } +} + +// Tests where we query S3 test fixture +class FileSystemS3Fixture : public FileSystemFixtureBase { + void SetUp() override { + std::call_once(g_init_once, [&] { + char *env_file = getenv("ENV_FILE"); + ASSERT_NE(env_file, nullptr) << "$ENV_FILE environment variable " + "not set; required to run test"; + parseEnvFile(env_file); + + auto logger = new XrdSysLogger(2, 0); + auto log = new XrdSysError(logger, "curl_"); + AmazonRequest::Init(*log); + }); + + FileSystemFixtureBase::SetUp(); + } + + virtual std::string GetConfig() override { + return R"( +xrd.tlsca certfile )" + + g_ca_file + R"( +#s3.trace all dump +s3.trace all +s3.begin +s3.path_name /test +s3.access_key_file )" + + g_access_key_file + R"( +s3.secret_key_file )" + + g_secret_key_file + R"( +s3.service_name s3.example.com +s3.region us-east-1 +s3.bucket_name )" + + g_bucket_name + R"( +s3.service_url )" + + g_minio_url + R"( +s3.url_style path +s3.end + )"; + } + + public: + void WritePattern(const std::string &name, const off_t writeSize, + const unsigned char chunkByte, const size_t chunkSize, + bool known_size) { + XrdSysLogger log; + S3FileSystem fs(&log, m_configfn.c_str(), nullptr); + + std::unique_ptr fh(fs.newFile()); + ASSERT_TRUE(fh); + + XrdOucEnv env; + // Only set oss.asize for test cases where we want the server to know + // the final size. + if (known_size) { + env.Put("oss.asize", std::to_string(writeSize).c_str()); + } + auto rv = fh->Open(name.c_str(), O_CREAT | O_WRONLY, 0755, env); + ASSERT_EQ(rv, 0); + + size_t sizeToWrite = (static_cast(chunkSize) >= writeSize) + ? static_cast(writeSize) + : chunkSize; + off_t curWriteSize = writeSize; + auto curChunkByte = chunkByte; + off_t offset = 0; + while (sizeToWrite) { + std::string writeBuffer(sizeToWrite, curChunkByte); + + rv = fh->Write(writeBuffer.data(), offset, sizeToWrite); + ASSERT_EQ(rv, static_cast(sizeToWrite)); + + curWriteSize -= rv; + offset += rv; + sizeToWrite = (static_cast(chunkSize) >= curWriteSize) + ? static_cast(curWriteSize) + : chunkSize; + curChunkByte += 1; + } + + rv = fh->Close(); + ASSERT_EQ(rv, 0); + + VerifyContents(fs, name, writeSize, chunkByte, chunkSize); + } + + private: + void VerifyContents(S3FileSystem &fs, const std::string &obj, + off_t expectedSize, unsigned char chunkByte, + size_t chunkSize) { + std::unique_ptr fh(fs.newFile()); + ASSERT_TRUE(fh); + + XrdOucEnv env; + auto rv = fh->Open(obj.c_str(), O_RDONLY, 0, env); + ASSERT_EQ(rv, 0); + + size_t sizeToRead = (static_cast(chunkSize) >= expectedSize) + ? expectedSize + : chunkSize; + unsigned char curChunkByte = chunkByte; + off_t offset = 0; + while (sizeToRead) { + std::string readBuffer(sizeToRead, curChunkByte - 1); + rv = fh->Read(readBuffer.data(), offset, sizeToRead); + ASSERT_EQ(rv, static_cast(sizeToRead)); + readBuffer.resize(rv); + + std::string correctBuffer(sizeToRead, curChunkByte); + ASSERT_EQ(readBuffer, correctBuffer); + + expectedSize -= rv; + offset += rv; + sizeToRead = (static_cast(chunkSize) >= expectedSize) + ? expectedSize + : chunkSize; + curChunkByte += 1; + } + + rv = fh->Close(); + ASSERT_EQ(rv, 0); + } +}; + +// Upload a single byte into S3 +TEST_F(FileSystemS3Fixture, UploadOneByte) { + WritePattern("/test/write_one.txt", 1, 'X', 32 * 1024, true); + WritePattern("/test/write_one_stream.txt", 1, 'X', 32 * 1024, false); +} + +// Upload across multiple calls, single part +TEST_F(FileSystemS3Fixture, UploadMultipleCalls) { + WritePattern("/test/write_alphabet.txt", 26, 'a', 1, true); + WritePattern("/test/write_alphabet_stream.txt", 26, 'a', 1, false); +} + +// Upload a zero-byte object +TEST_F(FileSystemS3Fixture, UploadZero) { + WritePattern("/test/write_zero.txt", 0, 'X', 32 * 1024, true); + WritePattern("/test/write_zero_stream.txt", 0, 'X', 32 * 1024, false); +} + +// Upload larger - two chunks. +TEST_F(FileSystemS3Fixture, UploadTwoChunks) { + WritePattern("/test/write_two_chunks.txt", 1'024 + 42, 'a', 1'024, true); + WritePattern("/test/write_two_chunks_stream.txt", 1'024 + 42, 'a', 1'024, + false); +} + +// Upload larger - a few chunks. +TEST_F(FileSystemS3Fixture, UploadMultipleChunks) { + WritePattern("/test/write_multi_chunks.txt", (10'000 / 1'024) * 1'024 + 42, + 'a', 1'024, true); + WritePattern("/test/write_multi_chunks_stream.txt", + (10'000 / 1'024) * 1'024 + 42, 'a', 1'024, false); +} + +// Upload across multiple parts, not aligned to partition. +TEST_F(FileSystemS3Fixture, UploadLarge) { + WritePattern("/test/write_large_1.txt", + (100'000'000 / 1'310'720) * 1'310'720 + 42, 'a', 1'310'720, + true); + WritePattern("/test/write_large_1_stream.txt", + (100'000'000 / 1'310'720) * 1'310'720 + 42, 'a', 1'310'720, + false); +} + +// Upload a file into S3 that's the same size as the partition size +TEST_F(FileSystemS3Fixture, UploadLargePart) { + WritePattern("/test/write_large_2.txt", 100'000'000, 'a', 131'072, true); + WritePattern("/test/write_large_2_stream.txt", 100'000'000, 'a', 131'072, + false); +} + +// Upload a small file where the partition size is aligned with the chunk size +TEST_F(FileSystemS3Fixture, UploadSmallAligned) { + WritePattern("/test/write_large_3.txt", 1'000, 'a', 1'000, true); +} + +// Upload a file into S3 that's the same size as the partition size, using +// chunks that align with the partition size +TEST_F(FileSystemS3Fixture, UploadLargePartAligned) { + WritePattern("/test/write_large_4.txt", 100'000'000, 'a', 1'000'000, true); +} + +// Upload a file into S3 resulting in multiple partitions +TEST_F(FileSystemS3Fixture, UploadMultiPartAligned) { + WritePattern("/test/write_large_5.txt", 100'000'000, 'a', 1'000'000, true); +} + +// Upload a file into S3 resulting in multiple partitioned using not-aligned +// chunks +TEST_F(FileSystemS3Fixture, UploadMultiPartUnaligned) { + WritePattern("/test/write_large_1.txt", 100'000'000, 'a', 32'768, true); + WritePattern("/test/write_large_1_stream.txt", 100'000'000, 'a', 32'768, + false); +} + +// Ensure that uploads timeout if no action occurs. +TEST_F(FileSystemS3Fixture, UploadStall) { + HTTPRequest::SetStallTimeout(std::chrono::milliseconds(200)); + S3File::LaunchMonitorThread(); + + XrdSysLogger log; + S3FileSystem fs(&log, m_configfn.c_str(), nullptr); + + std::unique_ptr fh(fs.newFile()); + ASSERT_TRUE(fh); + + XrdOucEnv env; + env.Put("oss.asize", std::to_string(16'384).c_str()); + auto rv = fh->Open("/test/write_stall.txt", O_CREAT | O_WRONLY, 0755, env); + ASSERT_EQ(rv, 0); + + ssize_t sizeToWrite = 4'096; + std::string writeBuffer(sizeToWrite, 'a'); + rv = fh->Write(writeBuffer.data(), 0, sizeToWrite); + ASSERT_EQ(rv, sizeToWrite); + + std::this_thread::sleep_for(HTTPRequest::GetStallTimeout() * 4 / 3 + + std::chrono::milliseconds(10)); + writeBuffer = std::string(sizeToWrite, 'b'); + rv = fh->Write(writeBuffer.data(), sizeToWrite, sizeToWrite); + ASSERT_EQ(rv, -ETIMEDOUT); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}