diff --git a/conanfile.txt b/conanfile.txt new file mode 100644 index 0000000..76e3748 --- /dev/null +++ b/conanfile.txt @@ -0,0 +1,13 @@ +[requires] +benchmark/1.8.4 +fmt/10.2.1 +boost/1.86.0 + +[generators] +CMakeDeps +CMakeToolchain + +[options] +boost/*:shared=True +[layout] +cmake_layout diff --git a/examples/bidder/generator/ad.py b/examples/bidder/generator/ad.py index 5438de1..da7294e 100755 --- a/examples/bidder/generator/ad.py +++ b/examples/bidder/generator/ad.py @@ -38,7 +38,7 @@ with open("../data/geo_campaign", "w") as file: for geo_id, city, country in geo: - max_targetings = random.randint(1, max_campaigns/10) + max_targetings = random.randint(1, int(max_campaigns/10)) start_pos = random.randint(1, max_campaigns-max_targetings) for i in range(start_pos, start_pos+max_targetings+1): file.write("%d\t%d\n" % (geo_id, i)) diff --git a/examples/bidder/generator/ico.py b/examples/bidder/generator/ico.py index 8824c6c..734cd06 100755 --- a/examples/bidder/generator/ico.py +++ b/examples/bidder/generator/ico.py @@ -35,7 +35,7 @@ file = open("../data/ico_campaign", "w") for domain, ref_id in ico_domains: - max_targetings = random.randint(1, max_campaigns/10) + max_targetings = random.randint(1, int(max_campaigns/10)) start_pos = random.randint(1, max_campaigns-max_targetings) for i in range(start_pos, start_pos+max_targetings+1): file.write("%d\t%d\n" % (ref_id, i)) diff --git a/examples/campaign/campaign_cache.hpp b/examples/campaign/campaign_cache.hpp index ec92f1e..f4707ea 100644 --- a/examples/campaign/campaign_cache.hpp +++ b/examples/campaign/campaign_cache.hpp @@ -121,17 +121,17 @@ struct BudgetManager { template ::char_allocator > + typename Alloc = typename datacache::entity_cache::char_allocator > class CampaignCache { - using Cache = datacache::entity_cache ; + using Cache = datacache::entity_cache ; using Keys = vanilla::tagged_tuple< typename ipc::data::campaign_entity::campaign_id_tag, uint32_t >; using CampaignTag = typename ipc::data::campaign_entity::campaign_id_tag; using insert_handler_type = std::function; using remove_handler_type = std::function; - public: - public: + +public: using DataCollection = std::vector >; CampaignCache(const Config &config): config{config}, cache(config.data().ipc_name) diff --git a/rtb/datacache/entity_cache.hpp b/rtb/datacache/entity_cache.hpp index 897499f..f6277f3 100644 --- a/rtb/datacache/entity_cache.hpp +++ b/rtb/datacache/entity_cache.hpp @@ -17,11 +17,11 @@ * */ -#ifndef __DATACACHE_ENTITY_CACHE_HPP__ -#define __DATACACHE_ENTITY_CACHE_HPP__ +#pragma once #include #include +#include #include #include #include @@ -34,6 +34,7 @@ #include #include +#include #include "rtb/core/core.hpp" #include "rtb/common/concepts.hpp" @@ -92,8 +93,10 @@ struct retriever>> { return !entries.empty(); } }; - -template class Container, size_t MEMORY_SIZE = 67108864, typename ...T> + +inline constexpr size_t ENTITY_CACHE_DEFAULT_MEMORY_SIZE = 67108864 /* 64 MB*/; +inline constexpr size_t ENTITY_CACHE_DEFAULT_MEMORY_GROW_INCREMENT = 67108864 /* 64 MB*/; +template class Container, typename ...T> class entity_cache { public: @@ -105,68 +108,71 @@ class entity_cache using char_string = boost::interprocess::basic_string, char_allocator> ; using Container_t = Container ; using Data_t = typename Container_t::value_type; - - entity_cache(const std::string &name) : - _segment_ptr(), _container_ptr(), _store_name(), _cache_name(name), _named_mutex(bip::open_or_create, - (_cache_name + "_mutex").c_str()) { - //TODO: add to ctor to switch between mmap and shm - //TODO: maybe needs bip::scoped_lock to lock for other processes calling grow_memory - std::string data_base_dir = "/tmp/CACHE" ; - _store_name = Memory::convert_base_dir(data_base_dir) + _cache_name; - _segment_ptr.reset(Memory::open_or_create_segment(_store_name.c_str(), MEMORY_SIZE) ) ; - _container_ptr = _segment_ptr->template find_or_construct( _cache_name.c_str() ) - (typename Container_t::ctor_args_list() , typename Container_t::allocator_type(_segment_ptr->get_segment_manager())); + + explicit entity_cache(std::string name, size_t const memory_size = ENTITY_CACHE_DEFAULT_MEMORY_SIZE, + size_t const memory_grow_increment = ENTITY_CACHE_DEFAULT_MEMORY_GROW_INCREMENT) + : _segment_ptr(), _container_ptr(), _store_name(), _cache_name(std::move(name)), + _named_mutex(bip::open_or_create, (_cache_name + "_mutex").c_str()), + _memory_grow_increment{memory_grow_increment} { + // TODO: add to ctor to switch between mmap and shm + // TODO: maybe needs bip::scoped_lock to lock for other processes calling grow_memory + std::string data_base_dir = "/tmp/CACHE"; + _store_name = Memory::convert_base_dir(data_base_dir) + _cache_name; + _segment_ptr.reset(Memory::open_or_create_segment(_store_name.c_str(), memory_size)); + _container_ptr = _segment_ptr->template find_or_construct(_cache_name.c_str())( + typename Container_t::ctor_args_list(), + typename Container_t::allocator_type(_segment_ptr->get_segment_manager())); } - + + template + auto read(F&& f) { + auto guard = bip::sharable_lock{_named_mutex}; + return f(); + } + + template + auto modify(F&& f) { + auto guard = bip::scoped_lock{_named_mutex}; + return f(); + } + void clear() { - bip::scoped_lock guard(_named_mutex) ; + bip::scoped_lock guard(_named_mutex) ; _container_ptr->clear(); } template bool update( Key && key, Serializable && data, Arg&& arg) { - bip::scoped_lock guard(_named_mutex) ; + bip::scoped_lock guard(_named_mutex) ; bool is_success {false}; auto &index = _container_ptr->template get(); auto p = index.equal_range(std::forward(arg)); - while ( p.first != p.second ) { - try { - is_success |= update_data(std::forward(key),std::forward(data),index,p.first++); - } catch (const bad_alloc_exception_t &e) { - LOG(debug) << boost::core::demangle(typeid(*this).name()) - << " data was not updated , MEMORY AVAILABLE=" - << _segment_ptr->get_free_memory() ; - grow_memory(MEMORY_SIZE); - is_success |= update_data(std::forward(key),std::forward(data),index,p.first++); - } + while (p.first != p.second) { + is_success |= retried_with_grow("update_data", [&, this] { + return update_data(std::forward(key),std::forward(data),index,p.first++); + }); } return is_success; } template bool update( Key && key, Serializable && data, Args&& ...args) { - bip::scoped_lock guard(_named_mutex) ; + bip::scoped_lock guard(_named_mutex) ; bool is_success {false}; //Memory::attach([this](){attach();}); // reattach to newly created auto &index = _container_ptr->template get(); auto p = index.equal_range(boost::make_tuple(std::forward(args)...)); - while ( p.first != p.second ) { - try { - is_success |= update_data(std::forward(key),std::forward(data),index,p.first++); - } catch (const bad_alloc_exception_t &e) { - LOG(debug) << boost::core::demangle(typeid(*this).name()) - << " data was not updated , MEMORY AVAILABLE=" - << _segment_ptr->get_free_memory() ; - grow_memory(MEMORY_SIZE); - is_success |= update_data(std::forward(key),std::forward(data),index,p.first++); - } + while ( p.first != p.second ) { + is_success |= retried_with_grow("update_data", [&, this] { + return update_data(std::forward(key),std::forward(data),index,p.first++); + }); } return is_success; } template bool update( Functor && func, Arg&& arg) { - bip::scoped_lock guard(_named_mutex) ; + bip::scoped_lock guard(_named_mutex) ; auto &index = _container_ptr->template get(); auto p = index.equal_range(std::forward(arg)); return modify(index, p.first, p.second, std::forward(func)); @@ -174,7 +180,7 @@ class entity_cache template bool update( Functor && func, Args&& ...args) { - bip::scoped_lock guard(_named_mutex) ; + bip::scoped_lock guard(_named_mutex) ; auto &index = _container_ptr->template get(); auto p = index.equal_range(boost::make_tuple(std::forward(args)...)); return modify(index, p.first, p.second, std::forward(func)); @@ -182,58 +188,33 @@ class entity_cache template auto insert( Key && key, Serializable &&data) { - bip::scoped_lock guard(_named_mutex) ; - decltype(insert_data(key, data)) is_success ; - try { - is_success = insert_data(std::forward(key), std::forward(data)); - } catch (const bad_alloc_exception_t &e) { - LOG(debug) << boost::core::demangle(typeid(*this).name()) - << " data was not inserted , MEMORY AVAILABLE=" - << _segment_ptr->get_free_memory(); - grow_memory(MEMORY_SIZE); - is_success = insert_data(std::forward(key), std::forward(data)); - } - - return is_success; + bip::scoped_lock guard(_named_mutex) ; + return insert_unsafe(std::forward(key), std::forward(data)); } - -/***************** - template - bool insert( const std::vector &data) { - bip::scoped_lock guard(_named_mutex) ; - bool is_success {false}; - std::size_t n {data.size()} ; - for ( const auto &item : data) { - try { - if ( insert_data(data) ) { --n; } - } catch (const bad_alloc_exception_t &e) { - LOG(debug) << boost::core::demangle(typeid(*this).name()) - << " data was not inserted , MEMORY AVAILABLE=" - << _segment_ptr->get_free_memory(); - grow_memory(MEMORY_SIZE); - if ( insert_data(data) ) { --n; } - } - } - return !n; + + template + auto insert_unsafe( Key && key, Serializable &&data) { + return retried_with_grow("insert_data", [&, this] { + return insert_data(std::forward(key), std::forward(data)); + }); } -*******************/ - + template bool retrieve(Serializable &entry, Args&& ...args) { - bip::sharable_lock guard(_named_mutex); + bip::sharable_lock guard(_named_mutex); return retriever()(*_container_ptr,entry,std::forward(args)...); } template auto retrieve_raw(Args&& ...args) { - bip::sharable_lock guard(_named_mutex); + bip::sharable_lock guard(_named_mutex); auto &idx = _container_ptr->template get(); return equal_range(idx, std::forward(args)...); } template bool retrieve(std::vector> &entries) { - bip::sharable_lock guard(_named_mutex); + bip::sharable_lock guard(_named_mutex); auto p = std::make_pair(_container_ptr->begin(), _container_ptr->end()); std::transform ( p.first, p.second, std::back_inserter(entries), [] ( const Data_t &data ) { std::shared_ptr impl_ptr { std::make_shared() } ; @@ -244,38 +225,55 @@ class entity_cache } template - void remove(Args&& ...args) { - bip::scoped_lock guard(_named_mutex); + auto remove(Args&& ...args) { + bip::scoped_lock guard(_named_mutex); + return remove_unsafe(std::forward(args)...); + } + + template + void remove_unsafe(Args&& ...args) { auto p = _container_ptr->template get().equal_range(boost::make_tuple(std::forward(args)...)); _container_ptr->erase(p.first, p.second); } - + + template + auto remove(Arg && arg) { + bip::scoped_lock guard(_named_mutex); + return remove_unsafe(std::forward(arg)); + } + template - void remove(Arg && arg) { - bip::scoped_lock guard(_named_mutex); + void remove_unsafe(Arg && arg) { auto p = _container_ptr->template get().equal_range(std::forward(arg)); _container_ptr->erase(p.first, p.second); } - char_string create_ipc_key(const std::string &key) const { - try { - char_string tmp(key.data(), key.size(), _segment_ptr->get_segment_manager()) ; - return tmp; - } catch ( const bad_alloc_exception_t &e ) { - LOG(debug) << boost::core::demangle(typeid(*this).name()) - << " create_ipc_key failed , MEMORY AVAILABLE=" - << _segment_ptr->get_free_memory(); - grow_memory(MEMORY_SIZE) ; - char_string tmp(key.data(), key.size(), _segment_ptr->get_segment_manager()) ; - return tmp; - } - } + char_string create_ipc_key(const std::string &key) const { + return retried_with_grow("create_ipc_key", [&, this] { + return char_string{key.data(), key.size(), _segment_ptr->get_segment_manager()}; + }); + } - size_t get_size() const { - bip::sharable_lock guard(_named_mutex) ; - return _container_ptr->size(); - } + size_t get_size() const { + bip::sharable_lock guard(_named_mutex) ; + return _container_ptr->size(); + } private: + auto retried_with_grow([[maybe_unused]] std::string_view op_name, auto&& op) { + try { + return op(); + } catch ([[maybe_unused]] bad_alloc_exception_t const& e) { +#ifndef NDEBUG + LOG(debug) << boost::core::demangle(typeid(*this).name()) << " " << op_name + << " failed , MEMORY AVAILABLE=" << _segment_ptr->get_free_memory(); +#endif + if (grow_memory(_memory_grow_increment)) [[likely]] { + return op(); + } + throw; + } + } + void attach() const { if constexpr ( !std::is_same_v ) { _segment_ptr.reset(new segment_t(bip::open_only,_store_name.c_str()) ) ; @@ -284,17 +282,20 @@ class entity_cache (typename Container_t::ctor_args_list(), typename Container_t::allocator_type(_segment_ptr->get_segment_manager())); } - void grow_memory(size_t size) const { + bool grow_memory(size_t size) const { + bool success = false; try { - if constexpr ( !std::is_same_v ) { - _segment_ptr.reset() ; - } - Memory::grow(_segment_ptr, _store_name.c_str(), size) ; - } catch ( const bad_alloc_exception_t &e ) { - LOG(debug) << boost::core::demangle(typeid(*this).name()) - << " failed to grow " << e.what() << ":free mem=" << _segment_ptr->get_free_memory() ; + if constexpr (!std::is_same_v) { + _segment_ptr.reset(); + } + Memory::grow(_segment_ptr, _store_name.c_str(), size); + success = true; + } catch (bad_alloc_exception_t const& e) { + LOG(debug) << boost::core::demangle(typeid(*this).name()) << " failed to grow " << e.what() + << ":free mem=" << _segment_ptr->get_free_memory(); } - Memory::attach([this](){attach();}); // reattach to newly created + Memory::attach([this]() { attach(); }); // reattach to newly created + return success; } template @@ -316,15 +317,9 @@ class entity_cache auto modify ( Index & index, Iterator first , Iterator last, Functor && func) { bool is_success {false}; while ( first != last ) { - try { - is_success |= index.modify(first++, std::forward(func)); - } catch (const bad_alloc_exception_t &e) { - LOG(debug) << boost::core::demangle(typeid(*this).name()) - << " data by functor was not updated , MEMORY AVAILABLE=" - << _segment_ptr->get_free_memory() ; - grow_memory(MEMORY_SIZE); - is_success |= index.modify(first++, func); - } + is_success |= retried_with_grow("modify", [&] { + return index.modify(first++, std::forward(func)); + }); } return is_success; } @@ -334,11 +329,8 @@ class entity_cache std::string _store_name ; std::string _cache_name ; mutable boost::interprocess::named_upgradable_mutex _named_mutex; + size_t _memory_grow_increment; }; } - - -#endif /* __DATACACHE_ENTITY_CACHE_HPP__ */ -