From 7d0fb1f1a05b45b3a3cc5c115f3a9384becc20da Mon Sep 17 00:00:00 2001 From: pippocao Date: Tue, 3 Dec 2024 11:14:52 +0800 Subject: [PATCH] Use flock to protect exclusive access to a file on POSIX systems. This can prevent a memory-mapped file from being opened by two processes simultaneously #34 --- src/bq_common/platform/posix_misc.cpp | 11 ++++++ src/bq_log/types/ring_buffer.cpp | 53 +++++++++++++++------------ 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/src/bq_common/platform/posix_misc.cpp b/src/bq_common/platform/posix_misc.cpp index 4e91513..9e2a063 100644 --- a/src/bq_common/platform/posix_misc.cpp +++ b/src/bq_common/platform/posix_misc.cpp @@ -298,6 +298,17 @@ namespace bq { if (!(int32_t)(mode & (file_open_mode_enum::write | file_open_mode_enum::exclusive))) { return true; } + if ((int32_t)(mode & file_open_mode_enum::exclusive)) { + struct flock lock; + memset(&lock, 0, sizeof(lock)); + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + lock.l_start = 0; + lock.l_len = 0; + if (fcntl(file_handle, F_SETLK, &lock) == -1) { + return false; + } + } struct stat file_info; if (fstat(file_handle, &file_info) < 0) { bq::util::log_device_console(log_level::error, "add_file_execlusive_check fstat failed, fd:%d, error code:%d", file_handle, errno); diff --git a/src/bq_log/types/ring_buffer.cpp b/src/bq_log/types/ring_buffer.cpp index 832b4bf..e965a27 100644 --- a/src/bq_log/types/ring_buffer.cpp +++ b/src/bq_log/types/ring_buffer.cpp @@ -13,6 +13,7 @@ #include #include #include "bq_log/types/ring_buffer.h" +#include "bq_log/misc/bq_log_api.h" namespace bq { @@ -32,7 +33,6 @@ namespace bq { bq::util::log_device_console(bq::log_level::warning, "invalid ring_buffer capacity {}, it should not be less than 16 * 64 bytes. it will be set to 16 * 64 automatically", capacity); capacity = 16 * cache_line_size; } - serialize_id = 0; //disable mmap temporarily , it is not robust enough. auto mmap_result = create_memory_map(capacity, serialize_id); switch (mmap_result) { case bq::ring_buffer::failed: @@ -71,7 +71,7 @@ namespace bq { ring_buffer_write_handle handle; int32_t max_try_count = 100; - uint32_t size_required = size + sizeof(block::data_section_head); + uint32_t size_required = size + data_block_offset; uint32_t need_block_count = (size_required + (cache_line_size - 1)) >> cache_line_size_log2; if (need_block_count > aligned_blocks_count_ || need_block_count == 0) { #if BQ_RING_BUFFER_DEBUG @@ -204,7 +204,8 @@ namespace bq { switch (status) { case block_status::invalid: #if BQ_RING_BUFFER_DEBUG - assert(((current_reading_cursor_tmp_ + block_count) & (~(aligned_blocks_count_ - 1))) - (current_reading_cursor_tmp_ & (~(aligned_blocks_count_ - 1))) == aligned_blocks_count_); + assert((current_reading_cursor_tmp_ & (aligned_blocks_count_ - 1)) + block_count > aligned_blocks_count_); + assert(block_count <= aligned_blocks_count_); #endif current_reading_cursor_tmp_ += block_count; continue; @@ -216,6 +217,9 @@ namespace bq { handle.result = enum_buffer_result_code::err_empty_ring_buffer; break; case block_status::used: +#if BQ_RING_BUFFER_DEBUG + assert((current_reading_cursor_tmp_ & (aligned_blocks_count_ - 1)) + block_count <= aligned_blocks_count_); +#endif handle.result = enum_buffer_result_code::success; handle.data_addr = block_ref.data_section_head.data; handle.data_size = block_ref.data_section_head.data_size; @@ -275,6 +279,7 @@ namespace bq { string path = TO_ABSOLUTE_PATH(name_tmp, true); memory_map_file_ = bq::file_manager::instance().open_file(path, file_open_mode_enum::auto_create | file_open_mode_enum::read_write | file_open_mode_enum::exclusive); if (!memory_map_file_.is_valid()) { + bq::util::log_device_console(bq::log_level::warning, "failed to open mmap file %s, use memory instead of mmap file, error code:%d", path.c_str(), bq::file_manager::get_and_clear_last_file_error()); return create_memory_map_result::failed; } capacity = bq::roundup_pow_of_two(capacity); @@ -325,40 +330,40 @@ namespace bq { bool ring_buffer::try_recover_from_exist_memory_map() { - // parse check + // verify buffer_head_->read_cursor_consumer_cache_ = buffer_head_->read_cursor_consumer_cache_ & (aligned_blocks_count_ - 1); uint32_t current_cursor = buffer_head_->read_cursor_consumer_cache_; bool data_parse_finished = false; - while (current_cursor - buffer_head_->read_cursor_consumer_cache_ < aligned_blocks_count_) { + while ((current_cursor - buffer_head_->read_cursor_consumer_cache_ < aligned_blocks_count_) && !data_parse_finished) { block& current_block = cursor_to_block(current_cursor); + auto block_num = current_block.data_section_head.block_num; + if(current_cursor + block_num > buffer_head_->read_cursor_consumer_cache_ + aligned_blocks_count_) + { + return false; + } switch (current_block.data_section_head.status.load(bq::platform::memory_order::relaxed)) { case block_status::used: - if (data_parse_finished == true) { - return false; - } - if (current_cursor < aligned_blocks_count_ && current_cursor + current_block.data_section_head.block_num > aligned_blocks_count_) { + if((current_cursor & (aligned_blocks_count_ - 1)) + block_num > aligned_blocks_count_) + { return false; } - if ((size_t)current_block.data_section_head.data_size > (current_block.data_section_head.block_num * sizeof(block) - data_block_offset)) { + if((size_t)current_block.data_section_head.data_size > (block_num * sizeof(block) - data_block_offset) + || (size_t)current_block.data_section_head.data_size <= (block_num == 1 ? 0 : (block_num * sizeof(block) - sizeof(block) - data_block_offset)) + ) + { return false; } - current_cursor += current_block.data_section_head.block_num; + current_cursor += block_num; break; case block_status::invalid: - if (data_parse_finished == true) { - return false; - } - current_cursor += current_block.data_section_head.block_num; - if (current_cursor <= aligned_blocks_count_) { + current_cursor += block_num; + if((current_cursor & (aligned_blocks_count_ - 1)) + block_num <= aligned_blocks_count_) + { return false; } break; case block_status::unused: - if (!data_parse_finished) { - write_cursor_.atomic_value.store(current_cursor, platform::memory_order::release); - } data_parse_finished = true; - ++current_cursor; break; default: return false; @@ -368,12 +373,14 @@ namespace bq { if (current_cursor - buffer_head_->read_cursor_consumer_cache_ > aligned_blocks_count_) { return false; } - if (!data_parse_finished) { - write_cursor_.atomic_value.store(current_cursor, platform::memory_order::release); - } + write_cursor_.atomic_value.store(current_cursor, platform::memory_order::release); read_cursor_.atomic_value.store(buffer_head_->read_cursor_consumer_cache_, platform::memory_order::release); current_reading_cursor_ = buffer_head_->read_cursor_consumer_cache_; current_reading_cursor_tmp_ = (uint32_t)-1; + + for (uint32_t i = current_cursor; i < current_reading_cursor_ + aligned_blocks_count_; ++i) { + cursor_to_block(i).data_section_head.status.store(block_status::unused, bq::platform::memory_order::release); + } return true; }