Skip to content

Commit

Permalink
apacheGH-36346: [C++][S3] Shutdown aws-sdk-cpp related resources on f…
Browse files Browse the repository at this point in the history
…inalize

All S3 related operations are failed after we call
arrow::fs::FinalizeS3().
  • Loading branch information
kou committed Jul 3, 2023
1 parent 0b7bd74 commit 214a561
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 6 deletions.
79 changes: 73 additions & 6 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,19 @@ namespace {
Status CheckS3Initialized() {
if (!IsS3Initialized()) {
return Status::Invalid(
"S3 subsystem not initialized; please call InitializeS3() "
"S3 subsystem is not initialized; please call InitializeS3() "
"before carrying out any S3-related operation");
}
return Status::OK();
}

Status CheckS3Finalized() {
if (IsS3Finalized()) {
return Status::Invalid("S3 subsystem is finalized");
}
return Status::OK();
}

// XXX Sanitize paths by removing leading slash?

struct S3Path {
Expand Down Expand Up @@ -1008,6 +1015,8 @@ class ObjectInputFile final : public io::RandomAccessFile {
content_length_(size) {}

Status Init() {
RETURN_NOT_OK(CheckS3Finalized());

// Issue a HEAD Object to get the content-length and ensure any
// errors (e.g. file not found) don't wait until the first Read() call.
if (content_length_ != kNoSize) {
Expand Down Expand Up @@ -1099,6 +1108,8 @@ class ObjectInputFile final : public io::RandomAccessFile {
return 0;
}

RETURN_NOT_OK(CheckS3Finalized());

// Read the desired range of bytes
ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
GetObjectRange(client_.get(), path_, position, nbytes, out));
Expand Down Expand Up @@ -1182,6 +1193,8 @@ class ObjectOutputStream final : public io::OutputStream {
}

Status Init() {
RETURN_NOT_OK(CheckS3Finalized());

// Initiate the multi-part upload
S3Model::CreateMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
Expand Down Expand Up @@ -1217,6 +1230,8 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::OK();
}

RETURN_NOT_OK(CheckS3Finalized());

S3Model::AbortMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
Expand Down Expand Up @@ -1245,6 +1260,8 @@ class ObjectOutputStream final : public io::OutputStream {
Future<> CloseAsync() override {
if (closed_) return Status::OK();

RETURN_NOT_OK(CheckS3Finalized());

if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
Expand Down Expand Up @@ -1307,6 +1324,8 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::Invalid("Operation on closed stream");
}

RETURN_NOT_OK(CheckS3Finalized());

const int8_t* data_ptr = reinterpret_cast<const int8_t*>(data);
auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) {
data_ptr += offset;
Expand Down Expand Up @@ -1359,6 +1378,7 @@ class ObjectOutputStream final : public io::OutputStream {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
RETURN_NOT_OK(CheckS3Finalized());
// Wait for background writes to finish
std::unique_lock<std::mutex> lock(upload_state_->mutex);
return upload_state_->pending_parts_completed;
Expand All @@ -1367,6 +1387,7 @@ class ObjectOutputStream final : public io::OutputStream {
// Upload-related helpers

Status CommitCurrentPart() {
RETURN_NOT_OK(CheckS3Finalized());
ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
current_part_.reset();
current_part_size_ = 0;
Expand All @@ -1379,6 +1400,8 @@ class ObjectOutputStream final : public io::OutputStream {

Status UploadPart(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
RETURN_NOT_OK(CheckS3Finalized());

S3Model::UploadPartRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
Expand Down Expand Up @@ -1574,6 +1597,8 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
S3Model::ListObjectsV2Request req;

Status operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
RETURN_NOT_OK(CheckS3Finalized());

// Serialize calls to operation-specific handlers
if (!walker->ok()) {
// Early exit: avoid executing handlers if DoWalk() returned
Expand Down Expand Up @@ -1692,6 +1717,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

// Tests to see if a bucket exists
Result<bool> BucketExists(const std::string& bucket) {
RETURN_NOT_OK(CheckS3Finalized());

S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));

Expand All @@ -1709,6 +1736,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
RETURN_NOT_OK(CheckS3Finalized());

// Check bucket exists first.
{
S3Model::HeadBucketRequest req;
Expand Down Expand Up @@ -1753,6 +1782,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

// Create an object with empty contents. Successful if object already exists.
Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
RETURN_NOT_OK(CheckS3Finalized());

S3Model::PutObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
Expand All @@ -1768,6 +1799,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}

Status DeleteObject(const std::string& bucket, const std::string& key) {
RETURN_NOT_OK(CheckS3Finalized());
S3Model::DeleteObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
Expand All @@ -1777,6 +1809,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}

Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
RETURN_NOT_OK(CheckS3Finalized());
S3Model::CopyObjectRequest req;
req.SetBucket(ToAwsString(dest_path.bucket));
req.SetKey(ToAwsString(dest_path.key));
Expand All @@ -1799,6 +1832,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
Result<bool> IsEmptyDirectory(
const std::string& bucket, const std::string& key,
const S3Model::HeadObjectOutcome* previous_outcome = nullptr) {
RETURN_NOT_OK(CheckS3Finalized());

if (previous_outcome) {
// Fetch the backend from the previous error
DCHECK(!previous_outcome->IsSuccess());
Expand Down Expand Up @@ -1850,6 +1885,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}

Result<bool> IsNonEmptyDirectory(const S3Path& path) {
RETURN_NOT_OK(CheckS3Finalized());
S3Model::ListObjectsV2Request req;
req.SetBucket(ToAwsString(path.bucket));
req.SetPrefix(ToAwsString(path.key) + kSep);
Expand Down Expand Up @@ -1939,6 +1975,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Workhorse for GetFileInfo(FileSelector...)
Status Walk(const FileSelector& select, const std::string& bucket,
const std::string& key, std::vector<FileInfo>* out) {
RETURN_NOT_OK(CheckS3Finalized());

FileInfoCollector collector(bucket, key, select);

auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
Expand Down Expand Up @@ -2027,6 +2065,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
};
Future<std::shared_ptr<WalkResult>> WalkForDeleteDirAsync(const std::string& bucket,
const std::string& key) {
RETURN_NOT_OK(CheckS3Finalized());

auto state = std::make_shared<WalkResult>();

auto handle_results = [state](const std::string& prefix,
Expand Down Expand Up @@ -2064,6 +2104,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Delete multiple objects at once
Future<> DeleteObjectsAsync(const std::string& bucket,
const std::vector<std::string>& keys) {
RETURN_NOT_OK(CheckS3Finalized());

struct DeleteCallback {
const std::string bucket;

Expand Down Expand Up @@ -2156,6 +2198,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

static Result<std::vector<std::string>> ProcessListBuckets(
const Aws::S3::Model::ListBucketsOutcome& outcome) {
RETURN_NOT_OK(CheckS3Finalized());
if (!outcome.IsSuccess()) {
return ErrorToStatus(std::forward_as_tuple("When listing buckets: "), "ListBuckets",
outcome.GetError());
Expand All @@ -2169,11 +2212,13 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}

Result<std::vector<std::string>> ListBuckets() {
RETURN_NOT_OK(CheckS3Finalized());
auto outcome = client_->ListBuckets();
return ProcessListBuckets(outcome);
}

Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
RETURN_NOT_OK(CheckS3Finalized());
auto self = shared_from_this();
return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); }))
// TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
Expand All @@ -2187,6 +2232,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
RETURN_NOT_OK(CheckS3Finalized());

auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), path);
RETURN_NOT_OK(ptr->Init());
Expand All @@ -2205,6 +2251,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));
RETURN_NOT_OK(CheckS3Finalized());

auto ptr =
std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, info.size());
Expand All @@ -2223,6 +2270,7 @@ S3FileSystem::~S3FileSystem() {}
Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(
const S3Options& options, const io::IOContext& io_context) {
RETURN_NOT_OK(CheckS3Initialized());
RETURN_NOT_OK(CheckS3Finalized());

std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options, io_context));
RETURN_NOT_OK(ptr->impl_->Init());
Expand Down Expand Up @@ -2250,6 +2298,8 @@ S3Options S3FileSystem::options() const { return impl_->options(); }
std::string S3FileSystem::region() const { return impl_->region(); }

Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
FileInfo info;
info.set_path(s);
Expand Down Expand Up @@ -2313,6 +2363,8 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
}

Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector& select) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));

FileInfoVector results;
Expand Down Expand Up @@ -2383,6 +2435,8 @@ FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select)
}

Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));

if (path.key.empty()) {
Expand Down Expand Up @@ -2426,6 +2480,8 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
}

Status S3FileSystem::DeleteDir(const std::string& s) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));

if (path.empty()) {
Expand Down Expand Up @@ -2455,6 +2511,8 @@ Status S3FileSystem::DeleteDirContents(const std::string& s, bool missing_dir_ok
}

Future<> S3FileSystem::DeleteDirContentsAsync(const std::string& s, bool missing_dir_ok) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));

if (path.empty()) {
Expand All @@ -2480,6 +2538,8 @@ Status S3FileSystem::DeleteRootDirContents() {
}

Status S3FileSystem::DeleteFile(const std::string& s) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

Expand All @@ -2506,6 +2566,8 @@ Status S3FileSystem::DeleteFile(const std::string& s) {
}

Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
RETURN_NOT_OK(CheckS3Finalized());

// XXX We don't implement moving directories as it would be too expensive:
// one must copy all directory contents one by one (including object data),
// then delete the original contents.
Expand All @@ -2525,6 +2587,8 @@ Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
}

Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
RETURN_NOT_OK(CheckS3Finalized());

ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
RETURN_NOT_OK(ValidateFilePath(src_path));
ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
Expand Down Expand Up @@ -2562,6 +2626,8 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));

RETURN_NOT_OK(CheckS3Finalized());

auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, io_context(), path,
impl_->options(), metadata);
RETURN_NOT_OK(ptr->Init());
Expand Down Expand Up @@ -2600,6 +2666,8 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {

bool IsInitialized() { return !is_finalized_ && is_initialized_; }

bool IsFinalized() { return is_finalized_; }

void Finalize(bool from_destructor = false) {
bool expected = true;
is_finalized_.store(true);
Expand All @@ -2608,9 +2676,9 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {
ARROW_LOG(WARNING)
<< " arrow::fs::FinalizeS3 was not called even though S3 was initialized. "
"This could lead to a segmentation fault at exit";
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options_);
}
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options_);
}
}

Expand Down Expand Up @@ -2672,9 +2740,6 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {

std::shared_ptr<AwsInstance> CreateAwsInstance() {
auto instance = std::make_shared<AwsInstance>();
// Don't let S3 be shutdown until all Arrow threads are done using it
arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
io::internal::GetIOThreadPool()->KeepAlive(instance);
return instance;
}

Expand Down Expand Up @@ -2713,6 +2778,8 @@ Status EnsureS3Finalized() { return FinalizeS3(); }

bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); }

bool IsS3Finalized() { return GetAwsInstance().IsFinalized(); }

// -----------------------------------------------------------------------
// Top-level utility functions

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ Status EnsureS3Initialized();
ARROW_EXPORT
bool IsS3Initialized();

/// Whether S3 was finalized.
ARROW_EXPORT
bool IsS3Finalized();

/// Shutdown the S3 APIs.
ARROW_EXPORT
Status FinalizeS3();
Expand Down

0 comments on commit 214a561

Please sign in to comment.