Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch S3 writes to use streaming I/O #51

Merged
merged 11 commits into from
Dec 5, 2024
34 changes: 28 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ project( xrootd-http/s3 )

option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the scitokens-cpp unit tests" OFF )
option( XROOTD_PLUGINS_EXTERNAL_GTEST "Use an external/pre-installed copy of GTest" OFF )
option( VALGRIND "Run select unit tests under valgrind" OFF )

set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake )
set( CMAKE_BUILD_TYPE Debug )
Expand All @@ -20,6 +21,10 @@ if(NOT XROOTD_PLUGIN_VERSION)
set(XROOTD_PLUGIN_VERSION ${XROOTD_PLUGIN_VERSION} CACHE INTERNAL "")
endif()

if(VALGRIND)
find_program(VALGRIND_BIN valgrind REQUIRED)
endif()

macro(use_cxx17)
if (CMAKE_VERSION VERSION_LESS "3.1")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
Expand Down Expand Up @@ -66,17 +71,29 @@ add_definitions( -D_FILE_OFFSET_BITS=64 )

include_directories(${XROOTD_INCLUDES} ${CURL_INCLUDE_DIRS} ${LIBCRYPTO_INCLUDE_DIRS})

add_library(XrdS3 SHARED src/CurlUtil.cc src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc)
add_library(XrdHTTPServer SHARED src/CurlUtil.cc src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc)
# On Linux, we hide all the symbols for the final libraries, exposing only what's needed for the XRootD
# runtime loader. So here we create the object library and will create a separate one for testing with
# the symbols exposed.
add_library(XrdS3Obj OBJECT src/CurlUtil.cc src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc)
target_include_directories(XrdS3Obj INTERFACE tinyxml2::tinyxml2)
set_target_properties(XrdS3Obj PROPERTIES POSITION_INDEPENDENT_CODE ON)
target_link_libraries(XrdS3Obj -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} tinyxml2::tinyxml2 Threads::Threads)

add_library(XrdS3 MODULE "$<TARGET_OBJECTS:XrdS3Obj>")
target_link_libraries(XrdS3 XrdS3Obj)

target_link_libraries(XrdS3 -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} tinyxml2::tinyxml2 Threads::Threads)
target_link_libraries(XrdHTTPServer -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} Threads::Threads)
add_library(XrdHTTPServerObj OBJECT src/CurlUtil.cc src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc)
set_target_properties(XrdHTTPServerObj PROPERTIES POSITION_INDEPENDENT_CODE ON)
target_link_libraries(XrdHTTPServerObj -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} Threads::Threads)

add_library(XrdHTTPServer MODULE "$<TARGET_OBJECTS:XrdHTTPServerObj>")
target_link_libraries(XrdHTTPServer XrdHTTPServerObj)

# The CMake documentation strongly advises against using these macros; instead, the pkg_check_modules
# is supposed to fill out the full path to ${LIBCRYPTO_LIBRARIES}. As of cmake 3.26.1, this does not
# appear to be the case on Mac OS X. Remove once no longer necessary!
target_link_directories(XrdS3 PUBLIC ${LIBCRYPTO_LIBRARY_DIRS})
target_link_directories(XrdHTTPServer PUBLIC ${LIBCRYPTO_LIBRARY_DIRS})
target_link_directories(XrdS3Obj PUBLIC ${LIBCRYPTO_LIBRARY_DIRS})
target_link_directories(XrdHTTPServerObj PUBLIC ${LIBCRYPTO_LIBRARY_DIRS})

if(NOT APPLE)
set_target_properties(XrdS3 PROPERTIES OUTPUT_NAME "XrdS3-${XROOTD_PLUGIN_VERSION}" SUFFIX ".so" LINK_FLAGS "-Wl,--version-script=${CMAKE_SOURCE_DIR}/configs/export-lib-symbols")
Expand All @@ -94,6 +111,11 @@ install(
)

if( XROOTD_PLUGINS_BUILD_UNITTESTS )
add_library(XrdS3Testing SHARED "$<TARGET_OBJECTS:XrdS3Obj>")
target_link_libraries(XrdS3Testing XrdS3Obj)
add_library(XrdHTTPServerTesting SHARED "$<TARGET_OBJECTS:XrdHTTPServerObj>")
target_link_libraries(XrdHTTPServerTesting XrdHTTPServerObj)

if( NOT XROOTD_PLUGINS_EXTERNAL_GTEST )
include(ExternalProject)
ExternalProject_Add(gtest
Expand Down
4 changes: 2 additions & 2 deletions src/AWSv4-impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest,
free(buffer);
}

bool doSha256(const std::string &payload, unsigned char *messageDigest,
bool doSha256(const std::string_view payload, unsigned char *messageDigest,
unsigned int *mdLength) {
EVP_MD_CTX *mdctx = EVP_MD_CTX_create();
if (mdctx == NULL) {
Expand All @@ -116,7 +116,7 @@ bool doSha256(const std::string &payload, unsigned char *messageDigest,
return false;
}

if (!EVP_DigestUpdate(mdctx, payload.c_str(), payload.length())) {
if (!EVP_DigestUpdate(mdctx, payload.data(), payload.length())) {
EVP_MD_CTX_destroy(mdctx);
return false;
}
Expand Down
3 changes: 2 additions & 1 deletion src/AWSv4-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <map>
#include <string>
#include <string_view>

namespace AWSv4Impl {

Expand All @@ -34,7 +35,7 @@ void convertMessageDigestToLowercaseHex(const unsigned char *messageDigest,
unsigned int mdLength,
std::string &hexEncoded);

bool doSha256(const std::string &payload, unsigned char *messageDigest,
bool doSha256(const std::string_view payload, unsigned char *messageDigest,
unsigned int *mdLength);

bool createSignature(const std::string &secretAccessKey,
Expand Down
45 changes: 34 additions & 11 deletions src/CurlUtil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ void CurlWorker::Run() {
// is waiting on it is undefined behavior.
auto queue_ref = m_queue;
auto &queue = *queue_ref.get();
m_logger.Log(LogMask::Debug, "CurlWorker::Run", "Started a curl worker");
m_unpause_queue.reset(new HandlerQueue());
m_logger.Log(LogMask::Debug, "Run", "Started a curl worker");

CURLM *multi_handle = curl_multi_init();
if (multi_handle == nullptr) {
Expand All @@ -199,21 +200,40 @@ void CurlWorker::Run() {
CURLMcode mres = CURLM_OK;

std::vector<struct curl_waitfd> waitfds;
waitfds.resize(1);
waitfds.resize(2);
jhiemstrawisc marked this conversation as resolved.
Show resolved Hide resolved
// The `curl_multi_wait` call in the event loop needs to be interrupted when
// additional work comes into one of the two queues (either the global queue
// or the per-worker unpause queue). To do this, the queue objects will
// write to a file descriptor when a new HTTP request is ready; we add these
// FDs to the list of FDs for libcurl to poll in order to trigger a wakeup.
// The `Consume`/`TryConsume` methods will have a side-effect of reading
// from the pipe if a request is available.
waitfds[0].fd = queue.PollFD();
waitfds[0].events = CURL_WAIT_POLLIN;
waitfds[0].revents = 0;
waitfds[1].fd = m_unpause_queue->PollFD();
waitfds[1].events = CURL_WAIT_POLLIN;
waitfds[1].revents = 0;

while (true) {
while (running_handles < static_cast<int>(m_max_ops)) {
auto op = m_unpause_queue->TryConsume();
if (!op) {
break;
}
op->ContinueHandle();
}
while (running_handles < static_cast<int>(m_max_ops)) {
auto op =
running_handles == 0 ? queue.Consume() : queue.TryConsume();
if (!op) {
break;
}
op->SetUnpauseQueue(m_unpause_queue);

auto curl = queue.GetHandle();
if (curl == nullptr) {
m_logger.Log(LogMask::Debug, "CurlWorker",
m_logger.Log(LogMask::Debug, "Run",
"Unable to allocate a curl handle");
op->Fail("E_NOMEM", "Unable to get allocate a curl handle");
continue;
Expand All @@ -223,10 +243,10 @@ void CurlWorker::Run() {
op->Fail(op->getErrorCode(), op->getErrorMessage());
}
} catch (...) {
m_logger.Log(LogMask::Debug, "CurlWorker",
"Unable to setup the curl handle");
m_logger.Log(LogMask::Debug, "Run",
"Unable to set up the curl handle");
op->Fail("E_NOMEM",
"Failed to setup the curl handle for the operation");
"Failed to set up the curl handle for the operation");
continue;
}
m_op_map[curl] = op;
Expand All @@ -236,8 +256,7 @@ void CurlWorker::Run() {
std::stringstream ss;
ss << "Unable to add operation to the curl multi-handle: "
<< curl_multi_strerror(mres);
m_logger.Log(LogMask::Debug, "CurlWorker",
ss.str().c_str());
m_logger.Log(LogMask::Debug, "Run", ss.str().c_str());
}
op->Fail("E_CURL_LIB",
"Unable to add operation to the curl multi-handle");
Expand All @@ -253,7 +272,7 @@ void CurlWorker::Run() {
if (m_logger.getMsgMask() & LogMask::Debug) {
std::stringstream ss;
ss << "Curl worker thread " << getpid() << " is running "
<< running_handles << "operations";
<< running_handles << " operations";
m_logger.Log(LogMask::Debug, "CurlWorker", ss.str().c_str());
}
last_marker = now;
Expand All @@ -277,7 +296,8 @@ void CurlWorker::Run() {
} else if (mres != CURLM_OK) {
if (m_logger.getMsgMask() & LogMask::Warning) {
std::stringstream ss;
ss << "Failed to perform multi-handle operation: " << mres;
ss << "Failed to perform multi-handle operation: "
<< curl_multi_strerror(mres);
m_logger.Log(LogMask::Warning, "CurlWorker", ss.str().c_str());
}
break;
Expand All @@ -298,17 +318,20 @@ void CurlWorker::Run() {
}
auto &op = iter->second;
auto res = msg->data.result;
m_logger.Log(LogMask::Dump, "Run",
"Processing result from curl");
op->ProcessCurlResult(iter->first, res);
op->ReleaseHandle(iter->first);
op->Notify();
running_handles -= 1;
curl_multi_remove_handle(multi_handle, iter->first);
if (res == CURLE_OK) {
// If the handle was successful, then we can recycle it.
queue.RecycleHandle(iter->first);
} else {
curl_easy_cleanup(iter->first);
m_op_map.erase(iter);
}
m_op_map.erase(iter);
}
} while (msg);
}
Expand Down
34 changes: 17 additions & 17 deletions src/CurlUtil.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>
#include <memory>

// Forward dec'ls
typedef void CURL;
Expand All @@ -43,25 +43,25 @@ CURL *GetHandle(bool verbose);
* multi-curl driver thread is based on polling FD's
*/
class HandlerQueue {
public:
HandlerQueue();
public:
HandlerQueue();

void Produce(HTTPRequest *handler);
void Produce(HTTPRequest *handler);

HTTPRequest *Consume();
HTTPRequest *TryConsume();
HTTPRequest *Consume();
HTTPRequest *TryConsume();

int PollFD() const {return m_read_fd;}
int PollFD() const { return m_read_fd; }

CURL *GetHandle();
void RecycleHandle(CURL *);
CURL *GetHandle();
void RecycleHandle(CURL *);

private:
std::deque<HTTPRequest*> m_ops;
thread_local static std::vector<CURL*> m_handles;
std::condition_variable m_cv;
std::mutex m_mutex;
const static unsigned m_max_pending_ops{20};
int m_read_fd{-1};
int m_write_fd{-1};
private:
std::deque<HTTPRequest *> m_ops;
thread_local static std::vector<CURL *> m_handles;
std::condition_variable m_cv;
std::mutex m_mutex;
const static unsigned m_max_pending_ops{20};
int m_read_fd{-1};
int m_write_fd{-1};
};
41 changes: 21 additions & 20 deletions src/CurlWorker.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,25 @@ class HTTPRequest;
class HandlerQueue;

class CurlWorker {
public:
CurlWorker(std::shared_ptr<HandlerQueue> queue, XrdSysError &logger) :
m_queue(queue),
m_logger(logger)
{}

CurlWorker(const CurlWorker &) = delete;

void Run();
static void RunStatic(CurlWorker *myself);
static unsigned GetPollThreads() {return m_workers;}

private:
std::shared_ptr<HandlerQueue> m_queue;
std::unordered_map<CURL*, HTTPRequest *> m_op_map;
XrdSysError &m_logger;

const static unsigned m_workers{5};
const static unsigned m_max_ops{20};
const static unsigned m_marker_period{5};
public:
CurlWorker(std::shared_ptr<HandlerQueue> queue, XrdSysError &logger)
: m_queue(queue), m_logger(logger) {}

CurlWorker(const CurlWorker &) = delete;

void Run();
static void RunStatic(CurlWorker *myself);
static unsigned GetPollThreads() { return m_workers; }

private:
std::shared_ptr<HandlerQueue> m_queue;
std::shared_ptr<HandlerQueue>
m_unpause_queue; // Queue for notifications that a handle can be
// unpaused.
std::unordered_map<CURL *, HTTPRequest *> m_op_map;
XrdSysError &m_logger;

const static unsigned m_workers{5};
const static unsigned m_max_ops{20};
const static unsigned m_marker_period{5};
};
Loading
Loading