Skip to content

thread-unsafe memory type with seq-fit algorithm #110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions rtb/datacache/entity_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
#include "rtb/core/core.hpp"
#include "rtb/common/concepts.hpp"

#include <boost/throw_exception.hpp>
#include <boost/chrono/duration.hpp>
#include <boost/exception/info.hpp>
#include <boost/stacktrace.hpp>
#include <boost/thread/lock_options.hpp>
#include <boost/throw_exception.hpp>
#include <stdexcept>

// Define an error_info tag for stacktrace
Expand Down Expand Up @@ -117,6 +119,7 @@ struct retriever<Tag,std::vector<std::shared_ptr<Serializable>>> {

inline constexpr size_t ENTITY_CACHE_DEFAULT_MEMORY_SIZE = 67108864 /* 64 MB*/;
inline constexpr size_t ENTITY_CACHE_DEFAULT_MEMORY_GROW_INCREMENT = 67108864 /* 64 MB*/;
inline constexpr uint64_t ENTITY_CACHE_DEFAULT_LOCK_TIMEOUT_S = 10;
template<typename Memory, template <class,class...> class Container, typename ...T>
class entity_cache
{
Expand All @@ -132,13 +135,18 @@ class entity_cache

/** Can set memory_grow_increment=0 to prevent cache from growing */
explicit entity_cache(std::string name, size_t const memory_size = ENTITY_CACHE_DEFAULT_MEMORY_SIZE,
size_t const memory_grow_increment = ENTITY_CACHE_DEFAULT_MEMORY_GROW_INCREMENT)
size_t const memory_grow_increment = ENTITY_CACHE_DEFAULT_MEMORY_GROW_INCREMENT,
uint32_t lock_timeout_s = ENTITY_CACHE_DEFAULT_LOCK_TIMEOUT_S)
: _segment_ptr(), _container_ptr(), _store_name(), _cache_name(std::move(name)),
_named_mutex(bip::open_or_create, (_cache_name + "_mutex").c_str()),
_memory_grow_increment{memory_grow_increment} {
// TODO: add to ctor to switch between mmap and shm
// TODO: maybe needs bip::scoped_lock to lock for other processes calling grow_memory
std::string data_base_dir = "/tmp/CACHE";

auto guard = bip::scoped_lock{_named_mutex, bip::defer_lock};
VAV_REQUIRE(guard.try_lock_for(boost::chrono::seconds(lock_timeout_s)));

_store_name = Memory::convert_base_dir(data_base_dir) + _cache_name;
_segment_ptr.reset(Memory::open_or_create_segment(_store_name.c_str(), memory_size));
_container_ptr = _segment_ptr->template find_or_construct<Container_t>(_cache_name.c_str())(
Expand Down
44 changes: 41 additions & 3 deletions rtb/datacache/memory_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,58 @@
#include <boost/interprocess/managed_mapped_file.hpp> //Variant-II , open(), mmap()
#include <boost/interprocess/managed_heap_memory.hpp> // Variant III for heap
#include <boost/interprocess/creation_tags.hpp>

#include <boost/interprocess/mem_algo/simple_seq_fit.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/shared_mutex.hpp>

namespace mpclmi::ipc {


/** A variation of the Shared:
* - simpler memory allocation algorithm suitable for segments storing a single node-based container
* - memory allocation is not thread safe, relying on external synchronization typical for entity_cache-based containers
*/
struct SharedSeqFitNoLock {
using char_type = char;
using mutex_family_type = boost::interprocess::null_mutex_family;
using memory_algorithm_type = boost::interprocess::simple_seq_fit<mutex_family_type>;
template <typename T> using index_type = boost::interprocess::iset_index<T>;
using segment_t = boost::interprocess::basic_managed_shared_memory<char_type, memory_algorithm_type, index_type>;
using segment_manager_t = boost::interprocess::segment_manager<char, memory_algorithm_type, index_type>;

static segment_t * open_or_create_segment (const std::string &path, size_t size) {
return new segment_t(boost::interprocess::open_or_create, path.c_str(), size) ;
}
static segment_t * open_segment (const std::string &path) {
return new segment_t(boost::interprocess::open_only, path.c_str()) ;
}
static segment_t * create_segment (const std::string &path, size_t size) {
return new segment_t(boost::interprocess::create_only, path.c_str(), size) ;
}
template <typename MemPtr>
static void grow(MemPtr &mem_ptr , const std::string &path, size_t size) {
mem_ptr.reset() ;
segment_t::grow(path.c_str(), size) ;
mem_ptr.reset(open_segment(path)) ;
}
static std::string convert_base_dir([[maybe_unused]] const std::string &base_dir) {
return "" ;
}

template<typename Function>
static void attach( Function && f ) {
f() ;
}
};


struct Shared {
typedef boost::interprocess::managed_shared_memory segment_t;
typedef boost::interprocess::managed_shared_memory::segment_manager segment_manager_t;
typedef boost::shared_mutex lock_t ;
typedef boost::unique_lock<lock_t> scoped_exclusive_locker_t;
typedef boost::shared_lock<lock_t> scoped_shared_locker_t;
static segment_t * open_or_create_segment (const std::string &path, size_t size) {
return new segment_t(boost::interprocess::open_or_create, path.c_str(), size) ;
return new segment_t(boost::interprocess::open_or_create, path.c_str(), size) ;
}
static segment_t * open_segment (const std::string &path) {
return new segment_t(boost::interprocess::open_only, path.c_str()) ;
Expand Down
Loading