From 2cd18fe3c5ba4515752a6f32e3fd8adc4148ae0d Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Thu, 19 Dec 2024 22:45:23 +0100 Subject: [PATCH] fix(S3): store issues --- src/fdb5/s3/S3Common.cc | 67 +---------------- src/fdb5/s3/S3Common.h | 24 ++---- src/fdb5/s3/S3Store.cc | 160 +++------------------------------------- 3 files changed, 20 insertions(+), 231 deletions(-) diff --git a/src/fdb5/s3/S3Common.cc b/src/fdb5/s3/S3Common.cc index 131285e1b..818a2fe9e 100644 --- a/src/fdb5/s3/S3Common.cc +++ b/src/fdb5/s3/S3Common.cc @@ -13,9 +13,9 @@ #include "eckit/config/LocalConfiguration.h" #include "eckit/exception/Exceptions.h" #include "eckit/filesystem/URI.h" +#include "eckit/io/s3/S3BucketName.h" #include "eckit/io/s3/S3Config.h" #include "eckit/io/s3/S3Session.h" -#include "eckit/utils/Tokenizer.h" #include "fdb5/config/Config.h" #include "fdb5/database/Key.h" @@ -26,80 +26,22 @@ namespace fdb5 { //---------------------------------------------------------------------------------------------------------------------- -S3Common::S3Common(const fdb5::Config& config, const std::string& component, const fdb5::Key& key) { +S3Common::S3Common(const fdb5::Config& config, const std::string& /*component*/, const fdb5::Key& key) { parseConfig(config); - /// @note: code for bucket per DB - std::string keyStr = key.valuesToString(); std::replace(keyStr.begin(), keyStr.end(), ':', '-'); db_bucket_ = prefix_ + keyStr; - - /// @note: code for single bucket for all DBs - - // std::vector valid{"catalogue", "store"}; - // ASSERT(std::find(valid.begin(), valid.end(), component) != valid.end()); - - // bucket_ = "default"; - - // eckit::LocalConfiguration c{}; - - // if (config.has("s3")) c = config.getSubConfiguration("s3"); - // if (c.has(component)) bucket_ = c.getSubConfiguration(component).getString("bucket", bucket_); - - // std::string first_cap{component}; - // first_cap[0] = toupper(component[0]); - - // std::string all_caps{component}; - // for (auto & c: all_caps) c = toupper(c); - - // bucket_ = eckit::Resource("fdbS3" + first_cap + "Bucket;$FDB_S3_" + all_caps + "_BUCKET", bucket_); - - // db_prefix_ = key.valuesToString(); - - // if (c.has("client")) - // fdb5::DaosManager::instance().configure(c.getSubConfiguration("client")); - - /// @todo: check that the bucket name complies with name restrictions } -S3Common::S3Common(const fdb5::Config& config, const std::string& component, const eckit::URI& uri) { - - /// @note: validity of input URI is not checked here because this constructor is only triggered - /// by DB::buildReader in EntryVisitMechanism, where validity of URIs is ensured beforehand +S3Common::S3Common(const fdb5::Config& config, const std::string& /*component*/, const eckit::URI& uri) { parseConfig(config); endpoint_ = eckit::net::Endpoint {uri.host(), uri.port()}; - /// @note: code for bucket per DB - - const auto parts = eckit::Tokenizer("/").tokenize(uri.name()); - const auto n = parts.size(); - ASSERT(n == 1 | n == 2); - db_bucket_ = parts[0]; - - /// @note: code for single bucket for all DBs - - // eckit::S3Name n{uri}; - - // bucket_ = n.bucket().name(); - - // eckit::Tokenizer parse("_"); - // std::vector bits; - // parse(n.name(), bits); - - // ASSERT(bits.size() == 2); - - // db_prefix_ = bits[0]; - - // // eckit::LocalConfiguration c{}; - - // // if (config.has("s3")) c = config.getSubConfiguration("s3"); - - // // if (c.has("client")) - // // fdb5::DaosManager::instance().configure(c.getSubConfiguration("client")); + db_bucket_ = eckit::S3BucketName::parse(uri.name()); } void S3Common::parseConfig(const fdb5::Config& config) { @@ -126,7 +68,6 @@ void S3Common::parseConfig(const fdb5::Config& config) { eckit::S3Session::instance().addClient(s3Config); - /// @note: code for bucket per DB only prefix_ = s3.getString("bucketPrefix", prefix_); } diff --git a/src/fdb5/s3/S3Common.h b/src/fdb5/s3/S3Common.h index e0fc96a2b..290b8889f 100644 --- a/src/fdb5/s3/S3Common.h +++ b/src/fdb5/s3/S3Common.h @@ -15,36 +15,26 @@ #pragma once #include "eckit/filesystem/URI.h" - -#include "fdb5/database/Key.h" #include "fdb5/config/Config.h" +#include "fdb5/database/Key.h" namespace fdb5 { class S3Common { -public: // methods - +public: // methods S3Common(const fdb5::Config&, const std::string& component, const fdb5::Key&); S3Common(const fdb5::Config&, const std::string& component, const eckit::URI&); -private: // methods - +private: // methods void parseConfig(const fdb5::Config& config); -protected: // members - +protected: // members eckit::net::Endpoint endpoint_; - std::string db_bucket_; - - /// @note: code for single bucket for all DBs - // std::string bucket_; - // std::string db_prefix_; - -private: // members + std::string db_bucket_; +private: // members std::string prefix_; - }; -} \ No newline at end of file +} // namespace fdb5 diff --git a/src/fdb5/s3/S3Store.cc b/src/fdb5/s3/S3Store.cc index 5611502cf..e454904b8 100644 --- a/src/fdb5/s3/S3Store.cc +++ b/src/fdb5/s3/S3Store.cc @@ -21,7 +21,6 @@ #include "eckit/thread/AutoLock.h" #include "eckit/thread/StaticMutex.h" #include "eckit/utils/MD5.h" -#include "eckit/utils/Tokenizer.h" #include "fdb5/LibFdb5.h" #include "fdb5/database/Field.h" #include "fdb5/database/FieldLocation.h" @@ -63,15 +62,7 @@ bool S3Store::uriBelongs(const eckit::URI& uri) const { } bool S3Store::uriExists(const eckit::URI& uri) const { - - // auto tmp = uri; - // tmp.endpoint(endpoint_); - - std::cerr << "-----------> Checking if " << uri << " exists" << std::endl; - return eckit::S3Name::make(endpoint_, uri.name())->exists(); - - // return eckit::S3ObjectName(tmp).exists(); } bool S3Store::auxiliaryURIExists(const eckit::URI& uri) const { @@ -90,23 +81,6 @@ std::vector S3Store::collocatedDataURIs() const { for (const auto& key : bucket.listObjects()) { store_unit_uris.push_back(bucket.makeObject(key)->uri()); } return store_unit_uris; - - /// @note: code for single bucket for all DBs - // std::vector store_unit_uris; - - // eckit::S3Bucket bucket{endpoint_, bucket_}; - - // if (!bucket.exists()) return store_unit_uris; - - // /// @note if an S3Catalogue is implemented, more filtering will need to - // /// be done here to discriminate store keys from catalogue keys - // for (const auto& key : bucket.listObjects(filter = "^" + db_prefix_ + "_.*")) { - - // store_unit_uris.push_back(key.uri()); - - // } - - // return store_unit_uris; } std::set S3Store::asCollocatedDataURIs(const std::vector& uris) const { @@ -133,17 +107,11 @@ std::vector S3Store::getAuxiliaryURIs(const eckit::URI& uri) const { /// @todo: never used in actual fdb-read? eckit::DataHandle* S3Store::retrieve(Field& field) const { - return field.dataHandle(); } std::unique_ptr S3Store::archive(const Key& key, const void* data, eckit::Length length) { - /// @note: code for S3 object (key) per field: - - /// @note: generate unique key name - /// if single bucket, starting by dbkey_indexkey_ - /// if bucket per db, starting by indexkey_ eckit::S3ObjectName n = generateDataKey(key); /// @todo: ensure bucket if not yet seen by this process @@ -161,21 +129,6 @@ std::unique_ptr S3Store::archive(const Key& key, const void h->write(data, length); return std::unique_ptr(new S3FieldLocation(n.uri(), 0, length, fdb5::Key())); - - /// @note: code for S3 object (key) per index store: - - // /// @note: get or generate unique key name - // /// if single bucket, starting by dbkey_indexkey_ - // /// if bucket per db, starting by indexkey_ - // eckit::S3Name n = getDataKey(key); - - // eckit::DataHandle &dh = getDataHandle(key, n); - - // eckit::Offset offset{dh.position()}; - - // h.write(data, length); - - // return std::unique_ptr(new S3FieldLocation(n.URI(), offset, length, fdb5::Key())); } void S3Store::flush() { @@ -198,65 +151,21 @@ void S3Store::close() { void S3Store::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const { - /// @note: code for bucket per DB - - const auto parts = eckit::Tokenizer("/").tokenize(uri.name()); - const auto n = parts.size(); - ASSERT(n == 1 | n == 2); - - ASSERT(parts[0] == db_bucket_); - - if (n == 2) { // object - - eckit::S3ObjectName key {uri}; - - logVerbose << "destroy S3 key: " << key.asString() << std::endl; - - if (doit) { key.remove(); } + auto item = eckit::S3Name::make(endpoint_, uri.name()); - } else { // pool - - eckit::S3BucketName bucket {uri}; - - logVerbose << "destroy S3 bucket: " << bucket.asString() << std::endl; - - if (doit) { bucket.ensureDestroyed(); } + if (auto* object = dynamic_cast(item.get())) { + logVerbose << "Removing S3 object: " << object->asString() << '\n'; + if (doit) { object->remove(); } + } else if (auto* bucket = dynamic_cast(item.get())) { + logVerbose << "Removing S3 bucket: " << bucket->asString() << '\n'; + if (doit) { bucket->ensureDestroyed(); } + } else { + throw eckit::SeriousBug("S3Store::remove: unknown URI type: " + uri.asString(), Here()); } - - // void TocStore::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const { - // ASSERT(uri.scheme() == type()); - // - // eckit::PathName path = uri.path(); - // if (path.isDir()) { - // logVerbose << "rmdir: "; - // logAlways << path << std::endl; - // if (doit) path.rmdir(false); - // } else { - // logVerbose << "Unlinking: "; - // logAlways << path << std::endl; - // if (doit) path.unlink(false); - // } - // } - - // /// @note: code for single bucket for all DBs - // eckit::S3Name n{uri}; - - // ASSERT(n.bucket().name() == bucket_); - // /// @note: if uri doesn't have key name, maybe this method should return without destroying anything. - // /// this way when TocWipeVisitor has wipeAll == true, the (only) bucket will not be destroyed - // ASSERT(n.name().rfind(db_prefix_, 0) == 0); - - // logVerbose << "destroy S3 key: "; - // logAlways << n.asString() << std::endl; - // if (doit) n.destroy(); } void S3Store::print(std::ostream& out) const { - out << "S3Store(" << endpoint_ << "/" << db_bucket_ << ")"; - - /// @note: code for single bucket for all DBs - // out << "S3Store(" << endpoint_ << "/" << bucket_ << ")"; } /// @note: unique name generation copied from LocalPathName::unique. @@ -287,59 +196,8 @@ eckit::S3ObjectName S3Store::generateDataKey(const Key& key) const { std::replace(keyStr.begin(), keyStr.end(), ':', '-'); return eckit::S3ObjectName {endpoint_, {db_bucket_, keyStr + "." + md5.digest() + ".data"}}; - - /// @note: code for single bucket for all DBs - // return eckit::S3Name{endpoint_, bucket_, db_prefix_ + "_" + key.valuesToString() + "_" + md5.digest() + ".data"}; } -/// @note: code for S3 object (key) per index store: -// eckit::S3Name S3Store::getDataKey(const Key& key) const { - -// KeyStore::const_iterator j = dataKeys_.find(key); - -// if ( j != dataKeys_.end() ) -// return j->second; - -// eckit::S3Name dataKey = generateDataKey(key); - -// dataKeys_[ key ] = dataKey; - -// return dataKey; - -// } - -/// @note: code for S3 object (key) per index store: -// eckit::DataHandle& S3Store::getDataHandle(const Key& key, const eckit::S3Name& name) { - -// HandleStore::const_iterator j = handles_.find(key); -// if ( j != handles_.end() ) -// return j->second; - -// eckit::DataHandle *dh = name.dataHandle(multipart = true); - -// ASSERT(dh); - -// handles_[ key ] = dh; - -// dh->openForAppend(0); - -// return *dh; - -// } - -/// @note: code for S3 object (key) per index store: -// void S3Store::closeDataHandles() { - -// for ( HandleStore::iterator j = handles_.begin(); j != handles_.end(); ++j ) { -// eckit::DataHandle *dh = j->second; -// dh->close(); -// delete dh; -// } - -// handles_.clear(); - -// } - //---------------------------------------------------------------------------------------------------------------------- } // namespace fdb5