Skip to content

Commit

Permalink
GH-15054: [C++] Change s3 finalization to happen after arrow threads …
Browse files Browse the repository at this point in the history
…finished, add pyarrow exit hook (#33858)

CRITICAL FIX: When statically linking error with AWS it was possible to have a crash on shutdown/exit.  Now that should no longer be possible.

BREAKING CHANGE: S3 can only be initialized and finalized once.

BREAKING CHANGE: S3 (the AWS SDK) will not be finalized until after all CPU & I/O threads are finished.
* Closes: #15054

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
westonpace authored and raulcd committed Apr 17, 2023
1 parent 89a9b2f commit ffa2ac5
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 65 deletions.
161 changes: 99 additions & 62 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2571,100 +2571,137 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(

namespace {

std::mutex aws_init_lock;
Aws::SDKOptions aws_options;
std::atomic<bool> aws_initialized(false);
struct AwsInstance : public ::arrow::internal::Executor::Resource {
AwsInstance() : is_initialized_(false), is_finalized_(false) {}
~AwsInstance() { Finalize(/*from_destructor=*/true); }

// Returns true iff the instance was newly initialized with `options`
Result<bool> EnsureInitialized(const S3GlobalOptions& options) {
bool expected = false;
if (is_finalized_.load()) {
return Status::Invalid("Attempt to initialize S3 after it has been finalized");
}
if (is_initialized_.compare_exchange_strong(expected, true)) {
DoInitialize(options);
return true;
}
return false;
}

Status DoInitializeS3(const S3GlobalOptions& options) {
Aws::Utils::Logging::LogLevel aws_log_level;
bool IsInitialized() { return !is_finalized_ && is_initialized_; }

void Finalize(bool from_destructor = false) {
bool expected = true;
is_finalized_.store(true);
if (is_initialized_.compare_exchange_strong(expected, false)) {
if (from_destructor) {
ARROW_LOG(WARNING)
<< " arrow::fs::FinalizeS3 was not called even though S3 was initialized. "
"This could lead to a segmentation fault at exit";
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options_);
}
}
}

private:
void DoInitialize(const S3GlobalOptions& options) {
Aws::Utils::Logging::LogLevel aws_log_level;

#define LOG_LEVEL_CASE(level_name) \
case S3LogLevel::level_name: \
aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
break;

switch (options.log_level) {
LOG_LEVEL_CASE(Fatal)
LOG_LEVEL_CASE(Error)
LOG_LEVEL_CASE(Warn)
LOG_LEVEL_CASE(Info)
LOG_LEVEL_CASE(Debug)
LOG_LEVEL_CASE(Trace)
default:
aws_log_level = Aws::Utils::Logging::LogLevel::Off;
}
switch (options.log_level) {
LOG_LEVEL_CASE(Fatal)
LOG_LEVEL_CASE(Error)
LOG_LEVEL_CASE(Warn)
LOG_LEVEL_CASE(Info)
LOG_LEVEL_CASE(Debug)
LOG_LEVEL_CASE(Trace)
default:
aws_log_level = Aws::Utils::Logging::LogLevel::Off;
}

#undef LOG_LEVEL_CASE

#ifdef ARROW_S3_HAS_CRT
aws_options.ioOptions.clientBootstrap_create_fn =
[ev_threads = options.num_event_loop_threads]() {
// https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
Aws::Crt::Io::DefaultHostResolver default_host_resolver(
event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
"Aws_Init_Cleanup", event_loop_group, default_host_resolver);
client_bootstrap->EnableBlockingShutdown();
return client_bootstrap;
};
aws_options_.ioOptions.clientBootstrap_create_fn =
[ev_threads = options.num_event_loop_threads]() {
// https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
Aws::Crt::Io::DefaultHostResolver default_host_resolver(
event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
"Aws_Init_Cleanup", event_loop_group, default_host_resolver);
client_bootstrap->EnableBlockingShutdown();
return client_bootstrap;
};
#endif

aws_options.loggingOptions.logLevel = aws_log_level;
// By default the AWS SDK logs to files, log to console instead
aws_options.loggingOptions.logger_create_fn = [] {
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
aws_options.loggingOptions.logLevel);
};
aws_options_.loggingOptions.logLevel = aws_log_level;
// By default the AWS SDK logs to files, log to console instead
aws_options_.loggingOptions.logger_create_fn = [this] {
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
aws_options_.loggingOptions.logLevel);
};
#if (defined(AWS_SDK_VERSION_MAJOR) && \
(AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
(AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
// ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
// This configuration options is only available with AWS SDK 1.9.272 and later.
aws_options.httpOptions.compliantRfc3986Encoding = true;
// ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
// This configuration options is only available with AWS SDK 1.9.272 and later.
aws_options_.httpOptions.compliantRfc3986Encoding = true;
#endif
Aws::InitAPI(aws_options);
aws_initialized.store(true);
return Status::OK();
Aws::InitAPI(aws_options_);
}

Aws::SDKOptions aws_options_;
std::atomic<bool> is_initialized_;
std::atomic<bool> is_finalized_;
};

std::shared_ptr<AwsInstance> CreateAwsInstance() {
auto instance = std::make_shared<AwsInstance>();
// Don't let S3 be shutdown until all Arrow threads are done using it
arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
io::internal::GetIOThreadPool()->KeepAlive(instance);
return instance;
}

Status DoFinalizeS3() {
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options);
aws_initialized.store(false);
return Status::OK();
AwsInstance& GetAwsInstance() {
static auto instance = CreateAwsInstance();
return *instance;
}

Result<bool> EnsureAwsInstanceInitialized(const S3GlobalOptions& options) {
return GetAwsInstance().EnsureInitialized(options);
}

} // namespace

Status InitializeS3(const S3GlobalOptions& options) {
std::lock_guard<std::mutex> lock(aws_init_lock);
return DoInitializeS3(options);
}

Status EnsureS3Initialized() {
std::lock_guard<std::mutex> lock(aws_init_lock);
if (!aws_initialized.load()) {
S3GlobalOptions options{S3LogLevel::Fatal};
return DoInitializeS3(options);
ARROW_ASSIGN_OR_RAISE(bool successfully_initialized,
EnsureAwsInstanceInitialized(options));
if (!successfully_initialized) {
return Status::Invalid(
"S3 was already initialized. It is safe to use but the options passed in this "
"call have been ignored.");
}
return Status::OK();
}

Status FinalizeS3() {
std::lock_guard<std::mutex> lock(aws_init_lock);
return DoFinalizeS3();
Status EnsureS3Initialized() {
return EnsureAwsInstanceInitialized({S3LogLevel::Fatal}).status();
}

Status EnsureS3Finalized() {
std::lock_guard<std::mutex> lock(aws_init_lock);
if (aws_initialized.load()) {
return DoFinalizeS3();
}
Status FinalizeS3() {
GetAwsInstance().Finalize();
return Status::OK();
}

bool IsS3Initialized() { return aws_initialized.load(); }
Status EnsureS3Finalized() { return FinalizeS3(); }

bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); }

// -----------------------------------------------------------------------
// Top-level utility functions
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ struct ARROW_EXPORT S3GlobalOptions {

/// Initialize the S3 APIs. It is required to call this function at least once
/// before using S3FileSystem.
///
/// Once this function is called you MUST call FinalizeS3 before the end of the
/// application in order to avoid a segmentation fault at shutdown.
ARROW_EXPORT
Status InitializeS3(const S3GlobalOptions& options);

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class S3TestMixin : public AwsTestMixin {
Status connect_status;
int retries = kNumServerRetries;
do {
InitServerAndClient();
ASSERT_OK(InitServerAndClient());
connect_status = OutcomeToStatus("ListBuckets", client_->ListBuckets());
} while (!connect_status.ok() && --retries > 0);
ASSERT_OK(connect_status);
Expand All @@ -198,8 +198,8 @@ class S3TestMixin : public AwsTestMixin {
}

protected:
void InitServerAndClient() {
ASSERT_OK_AND_ASSIGN(minio_, GetMinioEnv()->GetOneServer());
Status InitServerAndClient() {
ARROW_ASSIGN_OR_RAISE(minio_, GetMinioEnv()->GetOneServer());
client_config_.reset(new Aws::Client::ClientConfiguration());
client_config_->endpointOverride = ToAwsString(minio_->connect_string());
client_config_->scheme = Aws::Http::Scheme::HTTP;
Expand All @@ -211,6 +211,7 @@ class S3TestMixin : public AwsTestMixin {
new Aws::S3::S3Client(credentials_, *client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing));
return Status::OK();
}

// How many times to try launching a server in a row before decreeing failure
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
_not_imported.append("S3FileSystem")
else:
ensure_s3_initialized()
import atexit
atexit.register(finalize_s3)


def __getattr__(name):
Expand Down
14 changes: 14 additions & 0 deletions r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ supported_dplyr_methods <- list(
explain = NULL
)

# This should be run at session exit and must be called
# to avoid a segmentation fault at shutdown
finalize_s3 <- function(env) {
FinalizeS3()
}

# Helper environment to register the exit hook
s3_finalizer <- new.env(parent = emptyenv())

#' @importFrom vctrs s3_register vec_size vec_cast vec_unique
.onLoad <- function(...) {
# Make sure C++ knows on which thread it is safe to call the R API
Expand Down Expand Up @@ -147,6 +156,11 @@ supported_dplyr_methods <- list(
# Register extension types that we use internally
reregister_extension_type(vctrs_extension_type(vctrs::unspecified()))

# Registers a callback to run at session exit
# This can't be done in .onUnload or .onDetach because those hooks are
# not guaranteed to run (e.g. they only run if the user unloads arrow)
reg.finalizer(s3_finalizer, finalize_s3, onexit = TRUE)

invisible()
}

Expand Down
4 changes: 4 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions r/src/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ std::string fs___S3FileSystem__region(const std::shared_ptr<fs::S3FileSystem>& f

#endif

// [[arrow::export]]
void FinalizeS3() {
#if defined(ARROW_R_WITH_S3)
StopIfNotOk(fs::FinalizeS3());
#endif
}

#if defined(ARROW_R_WITH_GCS)

#include <arrow/filesystem/gcsfs.h>
Expand Down

0 comments on commit ffa2ac5

Please sign in to comment.