diff --git a/src/fdb5/database/Index.h b/src/fdb5/database/Index.h index 3705fbef5..2bbbf94f1 100755 --- a/src/fdb5/database/Index.h +++ b/src/fdb5/database/Index.h @@ -15,27 +15,26 @@ #ifndef fdb5_Index_H #define fdb5_Index_H -#include -#include -#include - #include "eckit/eckit.h" - #include "eckit/io/Length.h" #include "eckit/io/Offset.h" +#include "eckit/memory/Counted.h" #include "eckit/memory/NonCopyable.h" #include "eckit/types/FixedString.h" #include "eckit/types/Types.h" -#include "eckit/memory/Counted.h" - #include "fdb5/database/EntryVisitMechanism.h" #include "fdb5/database/Field.h" -#include "fdb5/database/IndexStats.h" #include "fdb5/database/IndexAxis.h" #include "fdb5/database/IndexLocation.h" +#include "fdb5/database/IndexStats.h" #include "fdb5/database/Indexer.h" #include "fdb5/database/Key.h" +#include +#include +#include +#include + namespace eckit { class Stream; } @@ -154,7 +153,7 @@ class Index { const IndexLocation& location() const { return content_->location(); } - const std::vector dataURIs() const { return content_->dataURIs(); } + std::vector dataURIs() const { return content_->dataURIs(); } bool dirty() const { return content_->dirty(); } diff --git a/src/fdb5/s3/S3Common.cc b/src/fdb5/s3/S3Common.cc index e94628a4b..131285e1b 100644 --- a/src/fdb5/s3/S3Common.cc +++ b/src/fdb5/s3/S3Common.cc @@ -8,16 +8,19 @@ * does it submit to any jurisdiction. */ -#include +#include "fdb5/s3/S3Common.h" -#include "eckit/io/s3/S3Name.h" -#include "eckit/io/s3/S3Credential.h" +#include "eckit/config/LocalConfiguration.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/filesystem/URI.h" +#include "eckit/io/s3/S3Config.h" #include "eckit/io/s3/S3Session.h" +#include "eckit/utils/Tokenizer.h" +#include "fdb5/config/Config.h" +#include "fdb5/database/Key.h" -#include "fdb5/s3/S3Common.h" - -// #include "eckit/exception/Exceptions.h" -#include "eckit/config/Resource.h" +#include +#include namespace fdb5 { @@ -27,17 +30,12 @@ S3Common::S3Common(const fdb5::Config& config, const std::string& component, con parseConfig(config); - - /// @note: code for bucket per DB std::string keyStr = key.valuesToString(); std::replace(keyStr.begin(), keyStr.end(), ':', '-'); db_bucket_ = prefix_ + keyStr; - - - /// @note: code for single bucket for all DBs // std::vector valid{"catalogue", "store"}; @@ -63,11 +61,7 @@ S3Common::S3Common(const fdb5::Config& config, const std::string& component, con // if (c.has("client")) // fdb5::DaosManager::instance().configure(c.getSubConfiguration("client")); - - - /// @todo: check that the bucket name complies with name restrictions - } S3Common::S3Common(const fdb5::Config& config, const std::string& component, const eckit::URI& uri) { @@ -77,19 +71,15 @@ S3Common::S3Common(const fdb5::Config& config, const std::string& component, con parseConfig(config); - endpoint_ = eckit::net::Endpoint{uri.host(), uri.port()}; - - + endpoint_ = eckit::net::Endpoint {uri.host(), uri.port()}; /// @note: code for bucket per DB const auto parts = eckit::Tokenizer("/").tokenize(uri.name()); - const auto n = parts.size(); + const auto n = parts.size(); ASSERT(n == 1 | n == 2); db_bucket_ = parts[0]; - - /// @note: code for single bucket for all DBs // eckit::S3Name n{uri}; @@ -104,38 +94,42 @@ S3Common::S3Common(const fdb5::Config& config, const std::string& component, con // db_prefix_ = bits[0]; - // // eckit::LocalConfiguration c{}; // // if (config.has("s3")) c = config.getSubConfiguration("s3"); // // if (c.has("client")) // // fdb5::DaosManager::instance().configure(c.getSubConfiguration("client")); - } void S3Common::parseConfig(const fdb5::Config& config) { - eckit::LocalConfiguration s3{}; + eckit::LocalConfiguration s3 {}; if (config.has("s3")) { s3 = config.getSubConfiguration("s3"); - + std::string credentialsPath; if (s3.has("credential")) { credentialsPath = s3.getString("credential"); } - eckit::S3Session::instance().readCredentials(credentialsPath); + eckit::S3Session::instance().loadCredentials(credentialsPath); + } + + if (!s3.has("endpoint")) { + throw eckit::UserError("Missing \"endpoint\" in configuration: " + config.configPath()); } - - endpoint_ = eckit::net::Endpoint{s3.getString("endpoint", "127.0.0.1:9000")}; + endpoint_ = eckit::net::Endpoint {s3.getString("endpoint", "127.0.0.1:9000")}; + + eckit::S3Config s3Config(endpoint_); + if (s3.has("region")) { s3Config.region = s3.getString("region"); } + eckit::S3Session::instance().addClient(s3Config); /// @note: code for bucket per DB only prefix_ = s3.getString("bucketPrefix", prefix_); - } //---------------------------------------------------------------------------------------------------------------------- -} // namespace fdb5 \ No newline at end of file +} // namespace fdb5 diff --git a/src/fdb5/s3/S3FieldLocation.cc b/src/fdb5/s3/S3FieldLocation.cc index 09730264b..baa3b4919 100644 --- a/src/fdb5/s3/S3FieldLocation.cc +++ b/src/fdb5/s3/S3FieldLocation.cc @@ -33,14 +33,14 @@ S3FieldLocation::S3FieldLocation(const eckit::URI &uri, eckit::Offset offset, ec S3FieldLocation::S3FieldLocation(eckit::Stream& s) : FieldLocation(s) {} -std::shared_ptr S3FieldLocation::make_shared() const { - return std::make_shared(std::move(*this)); +std::shared_ptr S3FieldLocation::make_shared() const { + return std::make_shared(std::move(*this)); } eckit::DataHandle* S3FieldLocation::dataHandle() const { return eckit::S3ObjectName(uri_).dataHandle(offset()); - + } void S3FieldLocation::print(std::ostream &out) const { @@ -53,4 +53,4 @@ void S3FieldLocation::visit(FieldLocationVisitor& visitor) const { static FieldLocationBuilder builder("s3"); -} // namespace fdb5 \ No newline at end of file +} // namespace fdb5 diff --git a/src/fdb5/s3/S3FieldLocation.h b/src/fdb5/s3/S3FieldLocation.h index c27604792..9d966ab71 100644 --- a/src/fdb5/s3/S3FieldLocation.h +++ b/src/fdb5/s3/S3FieldLocation.h @@ -32,7 +32,7 @@ class S3FieldLocation : public FieldLocation { eckit::DataHandle* dataHandle() const override; - virtual std::shared_ptr make_shared() const override; + virtual std::shared_ptr make_shared() const override; virtual void visit(FieldLocationVisitor& visitor) const override; @@ -56,4 +56,4 @@ class S3FieldLocation : public FieldLocation { //---------------------------------------------------------------------------------------------------------------------- -} // namespace fdb5 \ No newline at end of file +} // namespace fdb5 diff --git a/src/fdb5/s3/S3Store.cc b/src/fdb5/s3/S3Store.cc index dd2c1d1ca..5611502cf 100644 --- a/src/fdb5/s3/S3Store.cc +++ b/src/fdb5/s3/S3Store.cc @@ -8,20 +8,38 @@ * does it submit to any jurisdiction. */ -#include +#include "fdb5/s3/S3Store.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/filesystem/URI.h" +#include "eckit/io/DataHandle.h" +#include "eckit/io/Length.h" +#include "eckit/io/s3/S3BucketName.h" +#include "eckit/io/s3/S3Name.h" +#include "eckit/log/TimeStamp.h" #include "eckit/runtime/Main.h" #include "eckit/thread/AutoLock.h" #include "eckit/thread/StaticMutex.h" -#include "eckit/log/TimeStamp.h" #include "eckit/utils/MD5.h" #include "eckit/utils/Tokenizer.h" -#include "eckit/io/s3/S3BucketName.h" +#include "fdb5/LibFdb5.h" +#include "fdb5/database/Field.h" +#include "fdb5/database/FieldLocation.h" +#include "fdb5/database/Key.h" +#include "fdb5/database/Store.h" +#include "fdb5/rules/Schema.h" +#include "fdb5/s3/S3Common.h" +#include "fdb5/s3/S3FieldLocation.h" -// #include "eckit/config/Resource.h" +#include -#include "fdb5/s3/S3FieldLocation.h" -#include "fdb5/s3/S3Store.h" +#include +#include +#include +#include +#include +#include +#include namespace fdb5 { @@ -29,100 +47,57 @@ namespace fdb5 { static StoreBuilder builder("s3"); -S3Store::S3Store(const Schema& schema, const Key& key, const Config& config) : - Store(schema), S3Common(config, "store", key), config_(config) { - -} +S3Store::S3Store(const Schema& schema, const Key& key, const Config& config) + : Store(schema), S3Common(config, "store", key), config_(config) { } -S3Store::S3Store(const Schema& schema, const eckit::URI& uri, const Config& config) : - Store(schema), S3Common(config, "store", uri), config_(config) { - -} +S3Store::S3Store(const Schema& schema, const eckit::URI& uri, const Config& config) + : Store(schema), S3Common(config, "store", uri), config_(config) { } eckit::URI S3Store::uri() const { - return eckit::S3BucketName(endpoint_, db_bucket_).uri(); - - -/// @note: code for single bucket for all DBs -// TODO -// // warning! here an incomplete uri is being returned. Where is this method -// // being called? Can that caller code accept incomplete uris? -// return eckit::S3Name(endpoint_, bucket_, db_prefix_).URI(); - } bool S3Store::uriBelongs(const eckit::URI& uri) const { - - const auto parts = eckit::Tokenizer("/").tokenize(uri.name()); - const auto n = parts.size(); - - - /// @note: code for bucket per DB - ASSERT(n == 1 || n == 2); - return ( - (uri.scheme() == type()) && - (parts[0] == db_bucket_)); - - - /// @note: code for single bucket for all DBs - // ASSERT(n == 2); - // return ( - // (uri.scheme() == type()) && - // (parts[1].rfind(db_prefix_, 0) == 0)); - + const auto uriBucket = eckit::S3Name::parse(uri.name())[0]; + return uri.scheme() == type() && uriBucket == db_bucket_; } bool S3Store::uriExists(const eckit::URI& uri) const { - /// @todo: revisit the name of this method + // auto tmp = uri; + // tmp.endpoint(endpoint_); + std::cerr << "-----------> Checking if " << uri << " exists" << std::endl; - /// @note: code for bucket per DB - const auto parts = eckit::Tokenizer("/").tokenize(uri.name()); - const auto pn = parts.size(); - ASSERT(pn == 1 | pn == 2); - ASSERT(parts[0] == db_bucket_); - ASSERT(uri.scheme() == type()); - eckit::S3BucketName n{eckit::net::Endpoint{uri.host(), uri.port()}, parts[0]}; - return n.exists(); - + return eckit::S3Name::make(endpoint_, uri.name())->exists(); - /// @note: code for single bucket for all DBs - // ASSERT(uri.scheme() == type()); - // eckit::S3Name n(uri); - // ASSERT(n.bucket().name() == bucket_); - // ASSERT(n.name().rfind(db_prefix_, 0) == 0); - // return n.exists(); + // return eckit::S3ObjectName(tmp).exists(); +} +bool S3Store::auxiliaryURIExists(const eckit::URI& uri) const { + return uriExists(uri); } -std::vector S3Store::storeUnitURIs() const { +std::vector S3Store::collocatedDataURIs() const { std::vector store_unit_uris; - eckit::S3BucketName bucket {endpoint_, db_bucket_}; - if (!bucket.exists()) return store_unit_uris; - + if (!bucket.exists()) { return store_unit_uris; } + /// @note if an S3Catalogue is implemented, some filtering will need to /// be done here to discriminate store keys from catalogue keys - for (const auto& key : bucket.listObjects()) { - - store_unit_uris.push_back(bucket.makeObject(key)->uri()); - - } + for (const auto& key : bucket.listObjects()) { store_unit_uris.push_back(bucket.makeObject(key)->uri()); } return store_unit_uris; - /// @note: code for single bucket for all DBs // std::vector store_unit_uris; // eckit::S3Bucket bucket{endpoint_, bucket_}; - + // if (!bucket.exists()) return store_unit_uris; - + // /// @note if an S3Catalogue is implemented, more filtering will need to // /// be done here to discriminate store keys from catalogue keys // for (const auto& key : bucket.listObjects(filter = "^" + db_prefix_ + "_.*")) { @@ -132,36 +107,37 @@ std::vector S3Store::storeUnitURIs() const { // } // return store_unit_uris; - } -std::set S3Store::asStoreUnitURIs(const std::vector& uris) const { - - std::set res; - - /// @note: this is only uniquefying the input uris (coming from an index) - /// in case theres any duplicate. - for (auto& uri : uris) - res.insert(uri); - - return res; - +std::set S3Store::asCollocatedDataURIs(const std::vector& uris) const { + return {uris.begin(), uris.end()}; } bool S3Store::exists() const { + return eckit::S3BucketName(endpoint_, db_bucket_).exists(); +} +eckit::URI S3Store::getAuxiliaryURI(const eckit::URI& uri, const std::string& ext) const { + ASSERT(uri.scheme() == type()); + return {type(), uri.name() + '.' + ext}; +} - return eckit::S3BucketName(endpoint_, db_bucket_).exists(); +std::vector S3Store::getAuxiliaryURIs(const eckit::URI& uri) const { + ASSERT(uri.scheme() == type()); + std::vector uris; + for (const auto& ext : LibFdb5::instance().auxiliaryRegistry()) { uris.push_back(getAuxiliaryURI(uri, ext)); } + return uris; } +//---------------------------------------------------------------------------------------------------------------------- + /// @todo: never used in actual fdb-read? eckit::DataHandle* S3Store::retrieve(Field& field) const { return field.dataHandle(); - } -std::unique_ptr S3Store::archive(const Key& key, const void * data, eckit::Length length) { +std::unique_ptr S3Store::archive(const Key& key, const void* data, eckit::Length length) { /// @note: code for S3 object (key) per field: @@ -184,8 +160,7 @@ std::unique_ptr S3Store::archive(const Key& key, const void * dat h->write(data, length); - return std::unique_ptr(new S3FieldLocation(n.uri(), 0, length, fdb5::Key())); - + return std::unique_ptr(new S3FieldLocation(n.uri(), 0, length, fdb5::Key())); /// @note: code for S3 object (key) per index store: @@ -201,19 +176,17 @@ std::unique_ptr S3Store::archive(const Key& key, const void * dat // h.write(data, length); // return std::unique_ptr(new S3FieldLocation(n.URI(), offset, length, fdb5::Key())); - } void S3Store::flush() { /// @note: code for S3 object (key) per index store: - - // /// @note: clear cached data handles thus triggering consolidation of - // /// multipart objects, so that step data is made visible to readers. - // /// New S3 handles will be created on the next archive() call after + + // /// @note: clear cached data handles thus triggering consolidation of + // /// multipart objects, so that step data is made visible to readers. + // /// New S3 handles will be created on the next archive() call after // /// flush(). // closeDataHandles(); - } void S3Store::close() { @@ -221,7 +194,6 @@ void S3Store::close() { /// @note: code for S3 object (key) per index store: // closeDataHandles(); - } void S3Store::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const { @@ -229,9 +201,9 @@ void S3Store::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostrea /// @note: code for bucket per DB const auto parts = eckit::Tokenizer("/").tokenize(uri.name()); - const auto n = parts.size(); + const auto n = parts.size(); ASSERT(n == 1 | n == 2); - + ASSERT(parts[0] == db_bucket_); if (n == 2) { // object @@ -248,13 +220,27 @@ void S3Store::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostrea logVerbose << "destroy S3 bucket: " << bucket.asString() << std::endl; - if (doit) bucket.ensureDestroyed(); + if (doit) { bucket.ensureDestroyed(); } } + // void TocStore::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const { + // ASSERT(uri.scheme() == type()); + // + // eckit::PathName path = uri.path(); + // if (path.isDir()) { + // logVerbose << "rmdir: "; + // logAlways << path << std::endl; + // if (doit) path.rmdir(false); + // } else { + // logVerbose << "Unlinking: "; + // logAlways << path << std::endl; + // if (doit) path.unlink(false); + // } + // } // /// @note: code for single bucket for all DBs // eckit::S3Name n{uri}; - + // ASSERT(n.bucket().name() == bucket_); // /// @note: if uri doesn't have key name, maybe this method should return without destroying anything. // /// this way when TocWipeVisitor has wipeAll == true, the (only) bucket will not be destroyed @@ -263,7 +249,6 @@ void S3Store::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostrea // logVerbose << "destroy S3 key: "; // logAlways << n.asString() << std::endl; // if (doit) n.destroy(); - } void S3Store::print(std::ostream& out) const { @@ -272,13 +257,11 @@ void S3Store::print(std::ostream& out) const { /// @note: code for single bucket for all DBs // out << "S3Store(" << endpoint_ << "/" << bucket_ << ")"; - } /// @note: unique name generation copied from LocalPathName::unique. static eckit::StaticMutex local_mutex; - eckit::S3ObjectName S3Store::generateDataKey(const Key& key) const { eckit::AutoLock lock(local_mutex); @@ -303,11 +286,10 @@ eckit::S3ObjectName S3Store::generateDataKey(const Key& key) const { std::string keyStr = key.valuesToString(); std::replace(keyStr.begin(), keyStr.end(), ':', '-'); - return eckit::S3ObjectName {endpoint_, db_bucket_, keyStr + "." + md5.digest() + ".data"}; + return eckit::S3ObjectName {endpoint_, {db_bucket_, keyStr + "." + md5.digest() + ".data"}}; /// @note: code for single bucket for all DBs // return eckit::S3Name{endpoint_, bucket_, db_prefix_ + "_" + key.valuesToString() + "_" + md5.digest() + ".data"}; - } /// @note: code for S3 object (key) per index store: @@ -334,13 +316,13 @@ eckit::S3ObjectName S3Store::generateDataKey(const Key& key) const { // return j->second; // eckit::DataHandle *dh = name.dataHandle(multipart = true); - + // ASSERT(dh); // handles_[ key ] = dh; // dh->openForAppend(0); - + // return *dh; // } @@ -360,4 +342,4 @@ eckit::S3ObjectName S3Store::generateDataKey(const Key& key) const { //---------------------------------------------------------------------------------------------------------------------- -} // namespace fdb5 \ No newline at end of file +} // namespace fdb5 diff --git a/src/fdb5/s3/S3Store.h b/src/fdb5/s3/S3Store.h index a8f0ad5d9..0c89126fd 100644 --- a/src/fdb5/s3/S3Store.h +++ b/src/fdb5/s3/S3Store.h @@ -15,74 +15,71 @@ #pragma once +#include "eckit/filesystem/URI.h" #include "eckit/io/s3/S3ObjectName.h" - #include "fdb5/database/Store.h" #include "fdb5/rules/Schema.h" - #include "fdb5/s3/S3Common.h" +#include +#include +#include + namespace fdb5 { //---------------------------------------------------------------------------------------------------------------------- class S3Store : public Store, public S3Common { -public: // methods - +public: // methods S3Store(const Schema& schema, const Key& key, const Config& config); + S3Store(const Schema& schema, const eckit::URI& uri, const Config& config); - ~S3Store() override {} + ~S3Store() override { } eckit::URI uri() const override; - bool uriBelongs(const eckit::URI&) const override; - bool uriExists(const eckit::URI&) const override; - std::vector storeUnitURIs() const override; - std::set asStoreUnitURIs(const std::vector&) const override; + + bool uriBelongs(const eckit::URI& uri) const override; + + bool uriExists(const eckit::URI& uri) const override; + + std::vector collocatedDataURIs() const override; + + std::set asCollocatedDataURIs(const std::vector& uris) const override; + + std::vector getAuxiliaryURIs(const eckit::URI& uri) const override; + + bool auxiliaryURIExists(const eckit::URI& uri) const override; bool open() override { return true; } + void flush() override; + void close() override; void checkUID() const override { /* nothing to do */ } -protected: // methods - +private: // methods std::string type() const override { return "s3"; } bool exists() const override; - eckit::DataHandle* retrieve(Field& field) const override; - std::unique_ptr archive(const Key& key, const void * data, eckit::Length length) override; + eckit::DataHandle* retrieve(Field& field) const override; + std::unique_ptr archive(const Key& key, const void* data, eckit::Length length) override; void remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const override; - void print(std::ostream &out) const override; + void print(std::ostream& out) const override; eckit::S3ObjectName generateDataKey(const Key& key) const; - /// @note: code for S3 object (key) per index store: - // eckit::S3Name getDataKey(const Key& key) const; - // eckit::DataHandle& getDataHandle(const Key& key, const eckit::S3Name& name); - // void closeDataHandles(); + eckit::URI getAuxiliaryURI(const eckit::URI& uri, const std::string& ext) const; -private: // types - - /// @note: code for S3 object (key) per index store: - // typedef std::map HandleStore; - // typedef std::map KeyStore; - -private: // members - +private: // members const Config& config_; - - /// @note: code for S3 object (key) per index store: - // HandleStore handles_; - // mutable KeyStore dataKeys_; - }; //---------------------------------------------------------------------------------------------------------------------- -} // namespace fdb5 +} // namespace fdb5 diff --git a/src/fdb5/toc/FieldRef.cc b/src/fdb5/toc/FieldRef.cc index 90561a36a..1de09fd22 100644 --- a/src/fdb5/toc/FieldRef.cc +++ b/src/fdb5/toc/FieldRef.cc @@ -10,15 +10,14 @@ #include "fdb5/toc/FieldRef.h" +#include "eckit/exception/Exceptions.h" #include "eckit/filesystem/PathName.h" -#include "eckit/filesystem/URI.h" -#include "eckit/serialisation/Stream.h" #include "fdb5/fdb5_config.h" #include "fdb5/database/Field.h" #include "fdb5/database/UriStore.h" - #include "fdb5/toc/TocFieldLocation.h" + #ifdef fdb5_HAVE_S3FDB #include "fdb5/s3/S3FieldLocation.h" #endif @@ -37,9 +36,9 @@ FieldRefLocation::FieldRefLocation(UriStore &store, const Field& field) { const FieldLocation& loc = field.location(); + const auto* tocfloc = dynamic_cast(&loc); #ifdef fdb5_HAVE_S3FDB - const TocFieldLocation* tocfloc = dynamic_cast(&loc); - const S3FieldLocation* s3floc = dynamic_cast(&loc); + const auto* s3floc = dynamic_cast(&loc); if(!tocfloc && !s3floc) { throw eckit::NotImplemented( "Field location is not of TocFieldLocation or S3FieldLocation type " diff --git a/tests/fdb/s3/CMakeLists.txt b/tests/fdb/s3/CMakeLists.txt index 7c460fd53..7f91d415e 100644 --- a/tests/fdb/s3/CMakeLists.txt +++ b/tests/fdb/s3/CMakeLists.txt @@ -1,5 +1,10 @@ if (HAVE_S3FDB) + file( + COPY S3Credentials.yaml + DESTINATION ${CMAKE_CURRENT_BINARY_DIR} + ) + list( APPEND s3_tests s3_store ) @@ -8,11 +13,11 @@ if (HAVE_S3FDB) foreach( _test ${s3_tests} ) - ecbuild_add_test( TARGET test_fdb5_s3_${_test} - SOURCES test_${_test}.cc - LIBS "${unit_test_libraries}" - INCLUDES "${unit_test_include_dirs}" ) + ecbuild_add_test( TARGET test_fdb5_s3_${_test} + SOURCES test_${_test}.cc + LIBS "${unit_test_libraries}" + INCLUDES "${unit_test_include_dirs}" ) endforeach() -endif() \ No newline at end of file +endif() diff --git a/tests/fdb/s3/S3Credentials.yaml b/tests/fdb/s3/S3Credentials.yaml new file mode 100644 index 000000000..1935a4815 --- /dev/null +++ b/tests/fdb/s3/S3Credentials.yaml @@ -0,0 +1,13 @@ +--- +credentials: + - endpoint: '127.0.0.1:9000' + accessKeyID: 'minio' + secretKey: 'minio1234' + + - endpoint: 'minio:9000' + accessKeyID: 'minio' + secretKey: 'minio1234' + + - endpoint: 'localhost:9000' + accessKeyID: 'asd2' + secretKey: 'asd2' diff --git a/tests/fdb/s3/test_s3_store.cc b/tests/fdb/s3/test_s3_store.cc index 5800c8432..855ea70d9 100644 --- a/tests/fdb/s3/test_s3_store.cc +++ b/tests/fdb/s3/test_s3_store.cc @@ -18,27 +18,25 @@ #include "eckit/filesystem/TmpFile.h" // #include "eckit/filesystem/TmpDir.h" // #include "eckit/io/FileHandle.h" -#include "eckit/io/MemoryHandle.h" #include "eckit/config/YAMLConfiguration.h" +#include "eckit/io/MemoryHandle.h" // #include "metkit/mars/MarsRequest.h" // #include "fdb5/fdb5_config.h" // #include "fdb5/config/Config.h" -#include "fdb5/api/FDB.h" -#include "fdb5/api/helpers/FDBToolRequest.h" - -#include "fdb5/toc/TocCatalogueWriter.h" -#include "fdb5/toc/TocCatalogueReader.h" - #include "eckit/io/s3/S3BucketName.h" #include "eckit/io/s3/S3Client.h" -#include "eckit/io/s3/S3Session.h" #include "eckit/io/s3/S3Credential.h" #include "eckit/io/s3/S3Handle.h" - -#include "fdb5/s3/S3Store.h" +#include "eckit/io/s3/S3Session.h" +#include "fdb5/api/FDB.h" +#include "fdb5/api/helpers/FDBToolRequest.h" +#include "fdb5/database/Key.h" #include "fdb5/s3/S3FieldLocation.h" +#include "fdb5/s3/S3Store.h" +#include "fdb5/toc/TocCatalogueReader.h" +#include "fdb5/toc/TocCatalogueWriter.h" // #include "fdb5/daos/DaosException.h" using namespace eckit::testing; @@ -46,43 +44,37 @@ using namespace eckit; namespace { - void deldir(eckit::PathName& p) { - if (!p.exists()) { - return; - } +void deldir(eckit::PathName& p) { + if (!p.exists()) { return; } - std::vector files; - std::vector dirs; - p.children(files, dirs); + std::vector files; + std::vector dirs; + p.children(files, dirs); - for (auto& f : files) { - f.unlink(); - } - for (auto& d : dirs) { - deldir(d); - } + for (auto& f : files) { f.unlink(); } + for (auto& d : dirs) { deldir(d); } - p.rmdir(); - }; + p.rmdir(); +}; - S3Config cfg("eu-central-1", "127.0.0.1", 9000); +S3Config cfg("minio", 9000, "eu-central-1"); - void ensureClean(const std::string& prefix) { - const eckit::S3Credential cred {"127.0.0.1:9000", "minio", "minio1234"}; - eckit::S3Session::instance().addCredentials(cred); - auto client = S3Client::makeUnique(cfg); - auto&& tmp = client->listBuckets(); - std::set buckets(tmp.begin(), tmp.end()); +void ensureClean(const std::string& prefix) { + const eckit::S3Credential cred {{"minio:9000"}, "minio", "minio1234"}; + eckit::S3Session::instance().addCredential(cred); + auto client = S3Client::makeUnique(cfg); + auto&& tmp = client->listBuckets(); + std::set buckets(tmp.begin(), tmp.end()); - for (const std::string& name : buckets) { - if (name.rfind(prefix, 0) == 0) { - client->emptyBucket(name); - client->deleteBucket(name); - } + for (const std::string& name : buckets) { + if (name.rfind(prefix, 0) == 0) { + client->emptyBucket(name); + client->deleteBucket(name); } - eckit::S3Session::instance().clear(); } + eckit::S3Session::instance().clear(); } +} // namespace // #ifdef fdb5_HAVE_DUMMY_DAOS // eckit::TmpDir& tmp_dummy_daos_root() { @@ -94,17 +86,17 @@ namespace { // temporary schema,spaces,root files common to all DAOS Store tests eckit::TmpFile& schema_file() { - static eckit::TmpFile f{}; + static eckit::TmpFile f {}; return f; } eckit::TmpFile& spaces_file() { - static eckit::TmpFile f{}; + static eckit::TmpFile f {}; return f; } eckit::TmpFile& roots_file() { - static eckit::TmpFile f{}; + static eckit::TmpFile f {}; return f; } @@ -113,20 +105,19 @@ eckit::PathName& store_tests_tmp_root() { return sd; } -namespace fdb { -namespace test { +namespace fdb { namespace test { -CASE( "Setup" ) { +CASE("Setup") { - // ensure fdb root directory exists. If not, then that root is + // ensure fdb root directory exists. If not, then that root is // registered as non existing and Store tests fail. - if (store_tests_tmp_root().exists()) deldir(store_tests_tmp_root()); + if (store_tests_tmp_root().exists()) { deldir(store_tests_tmp_root()); } store_tests_tmp_root().mkdir(); ::setenv("FDB_ROOT_DIRECTORY", store_tests_tmp_root().path().c_str(), 1); // prepare schema for tests involving S3Store - std::string schema_str{"[ a, b [ c, d [ e, f ]]]"}; + std::string schema_str {"[ a, b [ c, d [ e, f ]]]"}; std::unique_ptr hs(schema_file().fileHandle()); hs->openForWrite(schema_str.size()); @@ -142,7 +133,7 @@ CASE( "Setup" ) { // prepare scpaces - std::string spaces_str{".* all Default"}; + std::string spaces_str {".* all Default"}; std::unique_ptr hsp(spaces_file().fileHandle()); hsp->openForWrite(spaces_str.size()); @@ -155,7 +146,7 @@ CASE( "Setup" ) { // prepare roots - std::string roots_str{store_tests_tmp_root().asString() + " all yes yes"}; + std::string roots_str {store_tests_tmp_root().asString() + " all yes yes"}; std::unique_ptr hr(roots_file().fileHandle()); hr->openForWrite(roots_str.size()); @@ -165,39 +156,48 @@ CASE( "Setup" ) { } ::setenv("FDB_ROOTS_FILE", roots_file().path().c_str(), 1); - } CASE("S3Store tests") { SECTION("archive and retrieve") { - std::string prefix{"test1-"}; + std::string prefix {"test1-"}; ensureClean(prefix); - std::string config_str{ - "s3:\n" - " credential: ~/.config/eckit/s3credentials.yaml\n" - " endpoint: 127.0.0.1:9000\n" - " bucketPrefix: " + prefix + "\n" - }; + std::string + config_str { + "s3:\n credential: ./S3Credentials.yaml\n endpoint: minio:9000\n " "region: " "\"eu-central-1\"\n " "bucketPrefix: " + + prefix + "\n"}; + + fdb5::Config config {YAMLConfiguration(config_str)}; - fdb5::Config config{YAMLConfiguration(config_str)}; + fdb5::Schema schema {schema_file()}; - fdb5::Schema schema{schema_file()}; + fdb5::Key request_key; + request_key.set("a", "1"); + request_key.set("b", "2"); + request_key.set("c", "3"); + request_key.set("d", "4"); + request_key.set("e", "5"); + request_key.set("f", "6"); - fdb5::Key request_key{"a=1,b=2,c=3,d=4,e=5,f=6"}; - fdb5::Key db_key{"a=1,b=2"}; - fdb5::Key index_key{"c=3,d=4"}; + fdb5::Key db_key; + db_key.set("a", "1"); + db_key.set("b", "2"); + + fdb5::Key index_key; + index_key.set("c", "3"); + index_key.set("d", "4"); char data[] = "test"; // archive - fdb5::S3Store s3store{schema, db_key, config}; - fdb5::Store& store = s3store; - std::unique_ptr loc(store.archive(index_key, data, sizeof(data))); + fdb5::S3Store s3store {schema, db_key, config}; + fdb5::Store& store = s3store; + auto loc = store.archive(index_key, data, sizeof(data)); s3store.flush(); @@ -206,72 +206,83 @@ CASE("S3Store tests") { std::cout << "Read location: " << field.location() << std::endl; std::unique_ptr dh(store.retrieve(field)); EXPECT(dynamic_cast(dh.get())); - + eckit::MemoryHandle mh; dh->copyTo(mh); EXPECT(mh.size() == eckit::Length(sizeof(data))); EXPECT(::memcmp(mh.data(), data, sizeof(data)) == 0); // remove - const URI fieldURI = field.location().uri(); + const URI fieldURI = field.location().uri(); eckit::S3ObjectName field_name {fieldURI}; eckit::S3BucketName store_name {fieldURI, field_name.bucket()}; - eckit::URI store_uri(store_name.uri()); - std::ostream out(std::cout.rdbuf()); + eckit::URI store_uri(store_name.uri()); + std::ostream out(std::cout.rdbuf()); store.remove(store_uri, out, out, false); EXPECT(field_name.exists()); store.remove(store_uri, out, out, true); EXPECT_NOT(field_name.exists()); EXPECT_NOT(store_name.exists()); - } SECTION("with POSIX Catalogue") { - std::string prefix{"test2-"}; + std::string prefix {"test2-"}; ensureClean(prefix); // FDB configuration - std::string config_str{ - "schema : " + schema_file().path() + "\n" - "s3:\n" - " credential: ~/.config/eckit/s3credentials.yaml\n" - " endpoint: 127.0.0.1:9000\n" - " bucketPrefix: " + prefix + "\n" - }; + std::string config_str { + "schema : " + + schema_file().path() + "\ns3:\n credential: ./S3Credentials.yaml\n endpoint: minio:9000\n " "region: \"eu-central-1\"\n bucketPrefix: " + + prefix + "\n"}; - fdb5::Config config{YAMLConfiguration(config_str)}; + fdb5::Config config {YAMLConfiguration(config_str)}; // schema - fdb5::Schema schema{schema_file()}; + fdb5::Schema schema {schema_file()}; // request - fdb5::Key request_key{"a=1,b=2,c=3,d=4,e=5,f=6"}; - fdb5::Key db_key{"a=1,b=2"}; - fdb5::Key index_key{"c=3,d=4"}; - fdb5::Key field_key{"e=5,f=6"}; + fdb5::Key request_key; + request_key.set("a", "1"); + request_key.set("b", "2"); + request_key.set("c", "3"); + request_key.set("d", "4"); + request_key.set("e", "5"); + request_key.set("f", "6"); + + fdb5::Key db_key; + db_key.set("a", "1"); + db_key.set("b", "2"); + + fdb5::Key index_key; + index_key.set("c", "3"); + index_key.set("d", "4"); + + fdb5::Key field_key; + field_key.set("e", "5"); + field_key.set("f", "6"); // store data char data[] = "test"; - fdb5::S3Store s3store{schema, db_key, config}; - fdb5::Store& store = static_cast(s3store); - std::unique_ptr loc(store.archive(index_key, data, sizeof(data))); + fdb5::S3Store s3store {schema, db_key, config}; + fdb5::Store& store = static_cast(s3store); + auto loc = store.archive(index_key, data, sizeof(data)); // index data { /// @todo: could have a unique ptr here, might not need a static cast - fdb5::TocCatalogueWriter tcat{db_key, config}; - fdb5::Catalogue& cat = static_cast(tcat); + fdb5::TocCatalogueWriter tcat {db_key, config}; + fdb5::Catalogue& cat = static_cast(tcat); cat.deselectIndex(); cat.selectIndex(index_key); - //const fdb5::Index& idx = tcat.currentIndex(); + // const fdb5::Index& idx = tcat.currentIndex(); static_cast(tcat).archive(field_key, std::move(loc)); /// flush store before flushing catalogue @@ -282,8 +293,8 @@ CASE("S3Store tests") { fdb5::Field field; { - fdb5::TocCatalogueReader tcat{db_key, config}; - fdb5::Catalogue& cat = static_cast(tcat); + fdb5::TocCatalogueReader tcat {db_key, config}; + fdb5::Catalogue& cat = static_cast(tcat); cat.selectIndex(index_key); static_cast(tcat).retrieve(field_key, field); } @@ -293,7 +304,7 @@ CASE("S3Store tests") { std::unique_ptr dh(store.retrieve(field)); EXPECT(dynamic_cast(dh.get())); - + eckit::MemoryHandle mh; dh->copyTo(mh); EXPECT(mh.size() == eckit::Length(sizeof(data))); @@ -301,11 +312,11 @@ CASE("S3Store tests") { // remove data - const URI fieldURI = field.location().uri(); + const URI fieldURI = field.location().uri(); eckit::S3ObjectName field_name {fieldURI}; eckit::S3BucketName store_name {fieldURI, field_name.bucket()}; - eckit::URI store_uri(store_name.uri()); - std::ostream out(std::cout.rdbuf()); + eckit::URI store_uri(store_name.uri()); + std::ostream out(std::cout.rdbuf()); store.remove(store_uri, out, out, false); EXPECT(field_name.exists()); store.remove(store_uri, out, out, true); @@ -315,57 +326,53 @@ CASE("S3Store tests") { // deindex data { - fdb5::TocCatalogueWriter tcat{db_key, config}; - fdb5::Catalogue& cat = static_cast(tcat); - metkit::mars::MarsRequest r = db_key.request("retrieve"); + fdb5::TocCatalogueWriter tcat {db_key, config}; + fdb5::Catalogue& cat = static_cast(tcat); + metkit::mars::MarsRequest r = db_key.request("retrieve"); std::unique_ptr wv(cat.wipeVisitor(store, r, out, true, false, false)); cat.visitEntries(*wv, store, false); } - } SECTION("VIA FDB API") { - std::string prefix{"test3-"}; + std::string prefix {"test3-"}; ensureClean(prefix); // FDB configuration - std::string config_str{ - "type: local\n" - "schema : " + schema_file().path() + "\n" - "engine: toc\n" - "store: s3\n" - "s3:\n" - " credential: ~/.config/eckit/s3credentials.yaml\n" - " endpoint: 127.0.0.1:9000\n" - " bucketPrefix: " + prefix + "\n" - }; + std::string + config_str { + "type: local\n" "schema : " + schema_file().path() + + "\n" "engine: toc\n" "store: s3\ns3:\n credential: ./S3Credentials.yaml\n endpoint: minio:9000\n " "region: \"eu-central-1\"\n bucketPrefix: " + + prefix + "\n"}; - fdb5::Config config{YAMLConfiguration(config_str)}; + fdb5::Config config {YAMLConfiguration(config_str)}; // request - fdb5::Key request_key{"a=1,b=2,c=3,d=4,e=5,f=6"}; - fdb5::Key index_key{"a=1,b=2,c=3,d=4"}; - fdb5::Key db_key{"a=1,b=2"}; - - fdb5::FDBToolRequest full_req{ - request_key.request("retrieve"), - false, - std::vector{"a", "b"} - }; - fdb5::FDBToolRequest index_req{ - index_key.request("retrieve"), - false, - std::vector{"a", "b"} - }; - fdb5::FDBToolRequest db_req{ - db_key.request("retrieve"), - false, - std::vector{"a", "b"} - }; + fdb5::Key request_key; + request_key.set("a", "1"); + request_key.set("b", "2"); + request_key.set("c", "3"); + request_key.set("d", "4"); + request_key.set("e", "5"); + request_key.set("f", "6"); + + fdb5::Key db_key; + db_key.set("a", "1"); + db_key.set("b", "2"); + + fdb5::Key index_key; + index_key.set("a", "1"); + index_key.set("b", "2"); + index_key.set("c", "3"); + index_key.set("d", "4"); + + fdb5::FDBToolRequest full_req {request_key.request("retrieve"), false, std::vector {"a", "b"}}; + fdb5::FDBToolRequest index_req {index_key.request("retrieve"), false, std::vector {"a", "b"}}; + fdb5::FDBToolRequest db_req {db_key.request("retrieve"), false, std::vector {"a", "b"}}; // initialise store @@ -373,7 +380,7 @@ CASE("S3Store tests") { // check store is empty - size_t count; + size_t count; fdb5::ListElement info; auto listObject = fdb.list(db_req); @@ -396,9 +403,9 @@ CASE("S3Store tests") { // retrieve data - metkit::mars::MarsRequest r = request_key.request("retrieve"); + metkit::mars::MarsRequest r = request_key.request("retrieve"); std::unique_ptr dh(fdb.retrieve(r)); - + eckit::MemoryHandle mh; dh->copyTo(mh); EXPECT(mh.size() == eckit::Length(sizeof(data))); @@ -411,25 +418,25 @@ CASE("S3Store tests") { // dry run attempt to wipe with too specific request auto wipeObject = fdb.wipe(full_req); - count = 0; - while (wipeObject.next(elem)) count++; + count = 0; + while (wipeObject.next(elem)) { count++; } EXPECT(count == 0); // dry run wipe index and store unit wipeObject = fdb.wipe(index_req); - count = 0; - while (wipeObject.next(elem)) count++; + count = 0; + while (wipeObject.next(elem)) { count++; } EXPECT(count > 0); // dry run wipe database wipeObject = fdb.wipe(db_req); - count = 0; - while (wipeObject.next(elem)) count++; + count = 0; + while (wipeObject.next(elem)) { count++; } EXPECT(count > 0); // ensure field still exists listObject = fdb.list(full_req); - count = 0; + count = 0; while (listObject.next(info)) { // info.print(std::cout, true, true); // std::cout << std::endl; @@ -439,70 +446,68 @@ CASE("S3Store tests") { // attempt to wipe with too specific request wipeObject = fdb.wipe(full_req, true); - count = 0; - while (wipeObject.next(elem)) count++; + count = 0; + while (wipeObject.next(elem)) { count++; } EXPECT(count == 0); /// @todo: really needed? fdb.flush(); // wipe index and store unit (and DB bucket as there is only one index) wipeObject = fdb.wipe(index_req, true); - count = 0; - while (wipeObject.next(elem)) count++; + count = 0; + while (wipeObject.next(elem)) { count++; } EXPECT(count > 0); /// @todo: really needed? fdb.flush(); // ensure field does not exist listObject = fdb.list(full_req); - count = 0; - while (listObject.next(info)) count++; + count = 0; + while (listObject.next(info)) { count++; } EXPECT(count == 0); - } /// @todo: if doing what's in this section at the end of the previous section reusing the same FDB object, // archive() fails as it expects a toc file to exist, but it has been removed by previous wipe SECTION("FDB API RE-STORE AND WIPE DB") { - std::string prefix{"test4-"}; + std::string prefix {"test4-"}; + + ensureClean(prefix); // FDB configuration - std::string config_str{ - "type: local\n" - "schema : " + schema_file().path() + "\n" - "engine: toc\n" - "store: s3\n" - "s3:\n" - " credential: ~/.config/eckit/s3credentials.yaml\n" - " endpoint: 127.0.0.1:9000\n" - " bucketPrefix: " + prefix + "\n" - }; + std::string + config_str { + "type: local\n" "schema : " + schema_file().path() + + "\n" "engine: toc\n" "store: s3\ns3:\n credential: ./S3Credentials.yaml\n endpoint: minio:9000\n " "region: \"eu-central-1\"\n bucketPrefix: " + + prefix + "\n"}; - fdb5::Config config{YAMLConfiguration(config_str)}; + fdb5::Config config {YAMLConfiguration(config_str)}; // request - fdb5::Key request_key{"a=1,b=2,c=3,d=4,e=5,f=6"}; - fdb5::Key index_key{"a=1,b=2,c=3,d=4"}; - fdb5::Key db_key{"a=1,b=2"}; - - fdb5::FDBToolRequest full_req{ - request_key.request("retrieve"), - false, - std::vector{"a", "b"} - }; - fdb5::FDBToolRequest index_req{ - index_key.request("retrieve"), - false, - std::vector{"a", "b"} - }; - fdb5::FDBToolRequest db_req{ - db_key.request("retrieve"), - false, - std::vector{"a", "b"} - }; + fdb5::Key request_key; + request_key.set("a", "1"); + request_key.set("b", "2"); + request_key.set("c", "3"); + request_key.set("d", "4"); + request_key.set("e", "5"); + request_key.set("f", "6"); + + fdb5::Key db_key; + db_key.set("a", "1"); + db_key.set("b", "2"); + + fdb5::Key index_key; + index_key.set("a", "1"); + index_key.set("b", "2"); + index_key.set("c", "3"); + index_key.set("d", "4"); + + fdb5::FDBToolRequest full_req {request_key.request("retrieve"), false, std::vector {"a", "b"}}; + fdb5::FDBToolRequest index_req {index_key.request("retrieve"), false, std::vector {"a", "b"}}; + fdb5::FDBToolRequest db_req {db_key.request("retrieve"), false, std::vector {"a", "b"}}; // initialise store @@ -513,43 +518,41 @@ CASE("S3Store tests") { char data[] = "test"; fdb.archive(request_key, data, sizeof(data)); - + fdb.flush(); - size_t count; - // wipe all database + size_t count = 0; + fdb5::WipeElement elem; + auto wipeObject = fdb.wipe(db_req, true); - count = 0; - while (wipeObject.next(elem)) count++; + while (wipeObject.next(elem)) { count++; } + EXPECT(count > 0); - /// @todo: really needed? - fdb.flush(); // ensure field does not exist + count = 0; + fdb5::ListElement info; + auto listObject = fdb.list(full_req); - count = 0; while (listObject.next(info)) { // info.print(std::cout, true, true); // std::cout << std::endl; count++; } - EXPECT(count == 0); + EXPECT(count == 0); } - } -} // namespace test -} // namespace fdb +}} // namespace fdb::test -int main(int argc, char **argv) -{ - return run_tests ( argc, argv ); +int main(int argc, char** argv) { + return run_tests(argc, argv); ensureClean(""); -} \ No newline at end of file +}