Skip to content

Commit

Permalink
Use flock to protect exclusive access to a file on POSIX systems. Thi…
Browse files Browse the repository at this point in the history
…s can prevent a memory-mapped file from being opened by two processes simultaneously #34
  • Loading branch information
pippocao committed Dec 3, 2024
1 parent 66c94eb commit 7d0fb1f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 23 deletions.
11 changes: 11 additions & 0 deletions src/bq_common/platform/posix_misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,17 @@ namespace bq {
if (!(int32_t)(mode & (file_open_mode_enum::write | file_open_mode_enum::exclusive))) {
return true;
}
if ((int32_t)(mode & file_open_mode_enum::exclusive)) {
struct flock lock;
memset(&lock, 0, sizeof(lock));
lock.l_type = F_WRLCK;
lock.l_whence = SEEK_SET;
lock.l_start = 0;
lock.l_len = 0;
if (fcntl(file_handle, F_SETLK, &lock) == -1) {
return false;
}
}
struct stat file_info;
if (fstat(file_handle, &file_info) < 0) {
bq::util::log_device_console(log_level::error, "add_file_execlusive_check fstat failed, fd:%d, error code:%d", file_handle, errno);
Expand Down
53 changes: 30 additions & 23 deletions src/bq_log/types/ring_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <stdlib.h>
#include <inttypes.h>
#include "bq_log/types/ring_buffer.h"
#include "bq_log/misc/bq_log_api.h"

namespace bq {

Expand All @@ -32,7 +33,6 @@ namespace bq {
bq::util::log_device_console(bq::log_level::warning, "invalid ring_buffer capacity {}, it should not be less than 16 * 64 bytes. it will be set to 16 * 64 automatically", capacity);
capacity = 16 * cache_line_size;
}
serialize_id = 0; //disable mmap temporarily , it is not robust enough.
auto mmap_result = create_memory_map(capacity, serialize_id);
switch (mmap_result) {
case bq::ring_buffer::failed:
Expand Down Expand Up @@ -71,7 +71,7 @@ namespace bq {
ring_buffer_write_handle handle;
int32_t max_try_count = 100;

uint32_t size_required = size + sizeof(block::data_section_head);
uint32_t size_required = size + data_block_offset;
uint32_t need_block_count = (size_required + (cache_line_size - 1)) >> cache_line_size_log2;
if (need_block_count > aligned_blocks_count_ || need_block_count == 0) {
#if BQ_RING_BUFFER_DEBUG
Expand Down Expand Up @@ -204,7 +204,8 @@ namespace bq {
switch (status) {
case block_status::invalid:
#if BQ_RING_BUFFER_DEBUG
assert(((current_reading_cursor_tmp_ + block_count) & (~(aligned_blocks_count_ - 1))) - (current_reading_cursor_tmp_ & (~(aligned_blocks_count_ - 1))) == aligned_blocks_count_);
assert((current_reading_cursor_tmp_ & (aligned_blocks_count_ - 1)) + block_count > aligned_blocks_count_);
assert(block_count <= aligned_blocks_count_);
#endif
current_reading_cursor_tmp_ += block_count;
continue;
Expand All @@ -216,6 +217,9 @@ namespace bq {
handle.result = enum_buffer_result_code::err_empty_ring_buffer;
break;
case block_status::used:
#if BQ_RING_BUFFER_DEBUG
assert((current_reading_cursor_tmp_ & (aligned_blocks_count_ - 1)) + block_count <= aligned_blocks_count_);
#endif
handle.result = enum_buffer_result_code::success;
handle.data_addr = block_ref.data_section_head.data;
handle.data_size = block_ref.data_section_head.data_size;
Expand Down Expand Up @@ -275,6 +279,7 @@ namespace bq {
string path = TO_ABSOLUTE_PATH(name_tmp, true);
memory_map_file_ = bq::file_manager::instance().open_file(path, file_open_mode_enum::auto_create | file_open_mode_enum::read_write | file_open_mode_enum::exclusive);
if (!memory_map_file_.is_valid()) {
bq::util::log_device_console(bq::log_level::warning, "failed to open mmap file %s, use memory instead of mmap file, error code:%d", path.c_str(), bq::file_manager::get_and_clear_last_file_error());
return create_memory_map_result::failed;
}
capacity = bq::roundup_pow_of_two(capacity);
Expand Down Expand Up @@ -325,40 +330,40 @@ namespace bq {

bool ring_buffer::try_recover_from_exist_memory_map()
{
// parse check
// verify
buffer_head_->read_cursor_consumer_cache_ = buffer_head_->read_cursor_consumer_cache_ & (aligned_blocks_count_ - 1);
uint32_t current_cursor = buffer_head_->read_cursor_consumer_cache_;
bool data_parse_finished = false;
while (current_cursor - buffer_head_->read_cursor_consumer_cache_ < aligned_blocks_count_) {
while ((current_cursor - buffer_head_->read_cursor_consumer_cache_ < aligned_blocks_count_) && !data_parse_finished) {
block& current_block = cursor_to_block(current_cursor);
auto block_num = current_block.data_section_head.block_num;
if(current_cursor + block_num > buffer_head_->read_cursor_consumer_cache_ + aligned_blocks_count_)
{
return false;
}
switch (current_block.data_section_head.status.load(bq::platform::memory_order::relaxed)) {
case block_status::used:
if (data_parse_finished == true) {
return false;
}
if (current_cursor < aligned_blocks_count_ && current_cursor + current_block.data_section_head.block_num > aligned_blocks_count_) {
if((current_cursor & (aligned_blocks_count_ - 1)) + block_num > aligned_blocks_count_)
{
return false;
}
if ((size_t)current_block.data_section_head.data_size > (current_block.data_section_head.block_num * sizeof(block) - data_block_offset)) {
if((size_t)current_block.data_section_head.data_size > (block_num * sizeof(block) - data_block_offset)
|| (size_t)current_block.data_section_head.data_size <= (block_num == 1 ? 0 : (block_num * sizeof(block) - sizeof(block) - data_block_offset))
)
{
return false;
}
current_cursor += current_block.data_section_head.block_num;
current_cursor += block_num;
break;
case block_status::invalid:
if (data_parse_finished == true) {
return false;
}
current_cursor += current_block.data_section_head.block_num;
if (current_cursor <= aligned_blocks_count_) {
current_cursor += block_num;
if((current_cursor & (aligned_blocks_count_ - 1)) + block_num <= aligned_blocks_count_)
{
return false;
}
break;
case block_status::unused:
if (!data_parse_finished) {
write_cursor_.atomic_value.store(current_cursor, platform::memory_order::release);
}
data_parse_finished = true;
++current_cursor;
break;
default:
return false;
Expand All @@ -368,12 +373,14 @@ namespace bq {
if (current_cursor - buffer_head_->read_cursor_consumer_cache_ > aligned_blocks_count_) {
return false;
}
if (!data_parse_finished) {
write_cursor_.atomic_value.store(current_cursor, platform::memory_order::release);
}
write_cursor_.atomic_value.store(current_cursor, platform::memory_order::release);
read_cursor_.atomic_value.store(buffer_head_->read_cursor_consumer_cache_, platform::memory_order::release);
current_reading_cursor_ = buffer_head_->read_cursor_consumer_cache_;
current_reading_cursor_tmp_ = (uint32_t)-1;

for (uint32_t i = current_cursor; i < current_reading_cursor_ + aligned_blocks_count_; ++i) {
cursor_to_block(i).data_section_head.status.store(block_status::unused, bq::platform::memory_order::release);
}
return true;
}

Expand Down

0 comments on commit 7d0fb1f

Please sign in to comment.