Skip to content

Commit

Permalink
Add finalizeS3FileSystem (facebookincubator#6398)
Browse files Browse the repository at this point in the history
Summary:
Add finalizeS3FileSystem to explicitly teardown the AWS SDK C++.
Velox users need to manually invoke this before exiting an application.
This is because Velox uses a static object to hold the S3 FileSystem instance.
AWS C++ SDK library also uses static global objects in its code.
The order of static object destruction is not defined by the C++ standard.
This could lead to a segmentation fault during the program exit.
Reference: aws/aws-sdk-cpp#1550 (comment)

Remove intializeClient() as this can be done during S3FileSystem construction.

Clarify AWS SDK log level is set during the S3 initialization. Fix test.

Refactor S3FileSystem tests to separate filesystem tests and registration tests.

Pull Request resolved: facebookincubator#6398

Reviewed By: kgpai

Differential Revision: D49114381

Pulled By: pedroerp

fbshipit-source-id: e124d5d5e74ac60f73df729c2deff2392eef6efe
  • Loading branch information
majetideepak authored and ericyuliu committed Oct 12, 2023
1 parent 4afe77b commit 51be7e7
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,32 @@
#include "velox/core/Config.h"
#endif

#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"

namespace facebook::velox::filesystems {

#ifdef VELOX_ENABLE_S3
folly::once_flag S3FSInstantiationFlag;

// Only one instance of S3FileSystem is supported for now.
// TODO: Support multiple S3FileSystem instances using a cache
static std::shared_ptr<S3FileSystem> s3fs = nullptr;

std::function<std::shared_ptr<
FileSystem>(std::shared_ptr<const Config>, std::string_view)>
fileSystemGenerator() {
static auto filesystemGenerator = [](std::shared_ptr<const Config> properties,
std::string_view filePath) {
// Only one instance of S3FileSystem is supported for now.
// TODO: Support multiple S3FileSystem instances using a cache
// Initialize on first access and reuse after that.
static std::shared_ptr<FileSystem> s3fs;
folly::call_once(S3FSInstantiationFlag, [&properties]() {
std::shared_ptr<S3FileSystem> fs;
if (properties != nullptr) {
initializeS3(properties.get());
fs = std::make_shared<S3FileSystem>(properties);
} else {
fs =
std::make_shared<S3FileSystem>(std::make_shared<core::MemConfig>());
auto config = std::make_shared<core::MemConfig>();
initializeS3(config.get());
fs = std::make_shared<S3FileSystem>(config);
}
fs->initializeClient();
s3fs = fs;
});
return s3fs;
Expand All @@ -53,7 +56,19 @@ fileSystemGenerator() {

void registerS3FileSystem() {
#ifdef VELOX_ENABLE_S3
registerFileSystem(isS3File, fileSystemGenerator());
if (!s3fs) {
registerFileSystem(isS3File, fileSystemGenerator());
}
#endif
}

void finalizeS3FileSystem() {
#ifdef VELOX_ENABLE_S3
VELOX_CHECK(
!s3fs || (s3fs && s3fs.use_count() == 1),
"Cannot finalize S3FileSystem while in use");
s3fs.reset();
finalizeS3();
#endif
}

Expand Down
10 changes: 10 additions & 0 deletions velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,14 @@ namespace facebook::velox::filesystems {
// Register the S3 filesystem.
void registerS3FileSystem();

/// Teardown the AWS SDK C++.
/// Velox users need to manually invoke this before exiting an application.
/// This is because Velox uses a static object to hold the S3 FileSystem
/// instance. AWS C++ SDK library also uses static global objects in its code.
/// The order of static object destruction is not determined by the C++
/// standard.
/// This could lead to a segmentation fault during the program exit.
/// Ref https://github.com/aws/aws-sdk-cpp/issues/1550#issuecomment-1412601061
void finalizeS3FileSystem();

} // namespace facebook::velox::filesystems
157 changes: 106 additions & 51 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,35 +199,118 @@ Aws::Utils::Logging::LogLevel inferS3LogLevel(std::string level) {

namespace filesystems {
using namespace connector::hive;

// Initialize and Finalize the AWS SDK C++ library.
// Initialization must be done before creating a S3FileSystem.
// Finalization must be done after all S3FileSystem instances have been deleted.
// After Finalize, no new S3FileSystem can be created.
struct AwsInstance {
AwsInstance() : isInitialized_(false), isFinalized_(false) {}
~AwsInstance() {
finalize(/*from_destructor=*/true);
}

// Returns true iff the instance was newly initialized with config.
bool initialize(const Config* config) {
if (isFinalized_.load()) {
VELOX_FAIL("Attempt to initialize S3 after it has been finalized.");
}
if (!isInitialized_.exchange(true)) {
// Not already initialized.
doInitialize(config);
return true;
}
return false;
}

bool isInitialized() {
return !isFinalized_ && isInitialized_;
}

void finalize(bool fromDestructor = false) {
if (isFinalized_.exchange(true)) {
// Already finalized.
return;
}
if (isInitialized_.exchange(false)) {
// Was initialized.
if (fromDestructor) {
VLOG(0)
<< "finalizeS3FileSystem() was not called even though S3 was initialized."
"This could lead to a segmentation fault at exit";
}
Aws::ShutdownAPI(awsOptions_);
}
}

std::string getLogLevelName() {
return Aws::Utils::Logging::GetLogLevelName(
awsOptions_.loggingOptions.logLevel);
}

private:
void doInitialize(const Config* config) {
awsOptions_.loggingOptions.logLevel =
inferS3LogLevel(HiveConfig::s3GetLogLevel(config));
// In some situations, curl triggers a SIGPIPE signal causing the entire
// process to be terminated without any notification.
// This behavior is seen via Prestissimo on AmazonLinux2 on AWS EC2.
// Relevant documentation in AWS SDK C++
// https://github.com/aws/aws-sdk-cpp/blob/276ee83080fcc521d41d456dbbe61d49392ddf77/src/aws-cpp-sdk-core/include/aws/core/Aws.h#L96
// This option allows the AWS SDK C++ to catch the SIGPIPE signal and
// log a message.
awsOptions_.httpOptions.installSigPipeHandler = true;
Aws::InitAPI(awsOptions_);
}

Aws::SDKOptions awsOptions_;
std::atomic<bool> isInitialized_;
std::atomic<bool> isFinalized_;
};

// Singleton to initialize AWS S3.
AwsInstance* getAwsInstance() {
static auto instance = std::make_unique<AwsInstance>();
return instance.get();
}

bool initializeS3(const Config* config) {
return getAwsInstance()->initialize(config);
}

static std::atomic<int> fileSystemCount = 0;

void finalizeS3() {
VELOX_CHECK((fileSystemCount == 0), "Cannot finalize S3 while in use");
getAwsInstance()->finalize();
}

class S3FileSystem::Impl {
public:
Impl(const Config* config) : config_(config) {
const size_t origCount = initCounter_++;
if (origCount == 0) {
Aws::SDKOptions awsOptions;
awsOptions.loggingOptions.logLevel =
inferS3LogLevel(HiveConfig::s3GetLogLevel(config_));
// In some situations, curl triggers a SIGPIPE signal causing the entire
// process to be terminated without any notification.
// This behavior is seen via Prestissimo on AmazonLinux2 on AWS EC2.
// Relevant documentation in AWS SDK C++
// https://github.com/aws/aws-sdk-cpp/blob/276ee83080fcc521d41d456dbbe61d49392ddf77/src/aws-cpp-sdk-core/include/aws/core/Aws.h#L96
// This option allows the AWS SDK C++ to catch the SIGPIPE signal and
// log a message.
awsOptions.httpOptions.installSigPipeHandler = true;
Aws::InitAPI(awsOptions);
VELOX_CHECK(getAwsInstance()->isInitialized(), "S3 is not initialized");
Aws::Client::ClientConfiguration clientConfig;
clientConfig.endpointOverride = HiveConfig::s3Endpoint(config_);

if (HiveConfig::s3UseSSL(config_)) {
clientConfig.scheme = Aws::Http::Scheme::HTTPS;
} else {
clientConfig.scheme = Aws::Http::Scheme::HTTP;
}

auto credentialsProvider = getCredentialsProvider();

client_ = std::make_shared<Aws::S3::S3Client>(
credentialsProvider,
clientConfig,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
HiveConfig::s3UseVirtualAddressing(config_));
++fileSystemCount;
}

~Impl() {
const size_t newCount = --initCounter_;
if (newCount == 0) {
client_.reset();
Aws::SDKOptions awsOptions;
awsOptions.loggingOptions.logLevel =
inferS3LogLevel(HiveConfig::s3GetLogLevel(config_));
Aws::ShutdownAPI(awsOptions);
}
client_.reset();
--fileSystemCount;
}

// Configure and return an AWSCredentialsProvider with access key and secret
Expand Down Expand Up @@ -293,27 +376,6 @@ class S3FileSystem::Impl {
return getDefaultCredentialsProvider();
}

// Use the input Config parameters and initialize the S3Client.
void initializeClient() {
Aws::Client::ClientConfiguration clientConfig;

clientConfig.endpointOverride = HiveConfig::s3Endpoint(config_);

if (HiveConfig::s3UseSSL(config_)) {
clientConfig.scheme = Aws::Http::Scheme::HTTPS;
} else {
clientConfig.scheme = Aws::Http::Scheme::HTTP;
}

auto credentialsProvider = getCredentialsProvider();

client_ = std::make_shared<Aws::S3::S3Client>(
credentialsProvider,
clientConfig,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
HiveConfig::s3UseVirtualAddressing(config_));
}

// Make it clear that the S3FileSystem instance owns the S3Client.
// Once the S3FileSystem is destroyed, the S3Client fails to work
// due to the Aws::ShutdownAPI invocation in the destructor.
Expand All @@ -322,26 +384,19 @@ class S3FileSystem::Impl {
}

std::string getLogLevelName() const {
return GetLogLevelName(inferS3LogLevel(HiveConfig::s3GetLogLevel(config_)));
return getAwsInstance()->getLogLevelName();
}

private:
const Config* config_;
std::shared_ptr<Aws::S3::S3Client> client_;
static std::atomic<size_t> initCounter_;
};

std::atomic<size_t> S3FileSystem::Impl::initCounter_(0);

S3FileSystem::S3FileSystem(std::shared_ptr<const Config> config)
: FileSystem(config) {
impl_ = std::make_shared<Impl>(config.get());
}

void S3FileSystem::initializeClient() {
impl_->initializeClient();
}

std::string S3FileSystem::getLogLevelName() const {
return impl_->getLogLevelName();
}
Expand Down
14 changes: 7 additions & 7 deletions velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@

namespace facebook::velox::filesystems {

// Implementation of S3 filesystem and file interface.
// We provide a registration method for read and write files so the appropriate
// type of file can be constructed based on a filename. See the
// (register|generate)ReadFile and (register|generate)WriteFile functions.
bool initializeS3(const Config* config);

void finalizeS3();

/// Implementation of S3 filesystem and file interface.
/// We provide a registration method for read and write files so the appropriate
/// type of file can be constructed based on a filename.
class S3FileSystem : public FileSystem {
public:
explicit S3FileSystem(std::shared_ptr<const Config> config);

// Initialize the Aws::S3::S3Client from the input Config parameters.
void initializeClient();

std::string name() const override;

std::unique_ptr<ReadFile> openFileForRead(
Expand Down
14 changes: 13 additions & 1 deletion velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@ target_link_libraries(
velox_s3fs
velox_core
velox_exec_test_lib
velox_hive_connector
velox_dwio_common_exception
velox_exec
gtest
gtest_main)

add_executable(velox_s3registration_test S3FileSystemRegistrationTest.cpp)
add_test(velox_s3registration_test velox_s3registration_test)
target_link_libraries(
velox_s3registration_test
velox_file
velox_s3fs
velox_core
velox_exec_test_lib
velox_dwio_common_exception
velox_exec
gtest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ namespace {
constexpr char const* kMinioExecutableName{"minio"};
constexpr char const* kMinioAccessKey{"minio"};
constexpr char const* kMinioSecretKey{"miniopass"};
constexpr char const* kMinioConnectionString{"127.0.0.1:9000"};
} // namespace

// A minio server, managed as a child process.
// Adapted from the Apache Arrow library.
class MinioServer {
public:
MinioServer() : tempPath_(::exec::test::TempDirectoryPath::create()) {}
MinioServer(const std::string_view& connectionString)
: tempPath_(::exec::test::TempDirectoryPath::create()),
connectionString_(connectionString) {}

void start();

Expand Down Expand Up @@ -68,7 +69,7 @@ class MinioServer {

private:
const std::shared_ptr<exec::test::TempDirectoryPath> tempPath_;
const std::string connectionString_ = kMinioConnectionString;
const std::string connectionString_;
const std::string accessKey_ = kMinioAccessKey;
const std::string secretKey_ = kMinioSecretKey;
std::shared_ptr<::boost::process::child> serverProcess_;
Expand Down
Loading

0 comments on commit 51be7e7

Please sign in to comment.