Skip to content

Commit

Permalink
enhance mmap verify
Browse files Browse the repository at this point in the history
  • Loading branch information
pippocao committed Dec 3, 2024
1 parent 45ebf93 commit 98353eb
Showing 1 changed file with 21 additions and 24 deletions.
45 changes: 21 additions & 24 deletions src/bq_log/types/miso_ring_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,16 @@ namespace bq {
switch (status) {
case block_status::invalid:
#if BQ_RING_BUFFER_DEBUG
assert(((reading_cursor_tmp_ + block_count - 1) & (~(aligned_blocks_count_ - 1))) - (reading_cursor_tmp_ & (~(aligned_blocks_count_ - 1))) == aligned_blocks_count_);
assert((reading_cursor_tmp_ & (aligned_blocks_count_ - 1)) + block_count > aligned_blocks_count_);
assert(block_count <= aligned_blocks_count_);
#endif
reading_cursor_tmp_ += block_count;
continue;
break;
case block_status::unused:
#if BQ_RING_BUFFER_DEBUG
assert((reading_cursor_tmp_ & (aligned_blocks_count_ - 1)) + block_count <= aligned_blocks_count_);
#endif
handle.result = enum_buffer_result_code::err_empty_ring_buffer;
break;
case block_status::used:
Expand Down Expand Up @@ -265,6 +269,7 @@ namespace bq {
string path = TO_ABSOLUTE_PATH(name_tmp, true);
memory_map_file_ = bq::file_manager::instance().open_file(path, file_open_mode_enum::auto_create | file_open_mode_enum::read_write | file_open_mode_enum::exclusive);
if (!memory_map_file_.is_valid()) {
bq::util::log_device_console(bq::log_level::warning, "failed to open mmap file %s, use memory instead of mmap file, error code:%d", path.c_str(), bq::file_manager::get_and_clear_last_file_error());
return create_memory_map_result::failed;
}
capacity = bq::roundup_pow_of_two(capacity);
Expand Down Expand Up @@ -317,60 +322,52 @@ namespace bq {

bool miso_ring_buffer::try_recover_from_exist_memory_map()
{
// parse and verify
// verify
mmap_head_->read_cursor_cache_ = mmap_head_->read_cursor_cache_ & (aligned_blocks_count_ - 1);
uint32_t current_cursor = mmap_head_->read_cursor_cache_;
bool data_parse_finished = false;
while (current_cursor - mmap_head_->read_cursor_cache_ < aligned_blocks_count_) {
while ((current_cursor - mmap_head_->read_cursor_cache_ < aligned_blocks_count_) && !data_parse_finished) {
block& current_block = cursor_to_block(current_cursor);
auto block_num = current_block.block_head.block_num;
auto data_size = current_block.block_head.data_size;
if (current_cursor + block_num > mmap_head_->read_cursor_cache_ + aligned_blocks_count_) {
return false;
}
switch (RING_BUFFER_ATOMIC_CAST_IGNORE_ALIGNMENT(current_block.block_head.status, block_status).load(bq::platform::memory_order::relaxed)) {
case block_status::used:
if (data_parse_finished == true) {
return false;
}
if (current_cursor < aligned_blocks_count_ && current_cursor + block_num > aligned_blocks_count_) {
return false;
}
if ((size_t)data_size > (block_num * sizeof(block) - data_block_offset)) {
if ((current_cursor & (aligned_blocks_count_ - 1)) + block_num > aligned_blocks_count_) {
return false;
}
if ((size_t)data_size < ((block_num * sizeof(block) - data_block_offset) < sizeof(block) ? 1 : (block_num * sizeof(block) - data_block_offset - sizeof(block)))) {
if ((size_t)data_size > (block_num * sizeof(block) - data_block_offset)
|| (size_t)data_size <= (block_num == 1 ? 0 : (block_num * sizeof(block) - sizeof(block) - data_block_offset))) {
return false;
}
current_cursor += block_num;
break;
case block_status::invalid:
if (data_parse_finished == true) {
return false;
}
current_cursor += block_num;
if (current_cursor <= aligned_blocks_count_) {
if ((current_cursor & (aligned_blocks_count_ - 1)) + block_num <= aligned_blocks_count_) {
return false;
}
break;
case block_status::unused:
if (!data_parse_finished) {
write_cursor_.atomic_value.store(current_cursor, platform::memory_order::release);
}
data_parse_finished = true;
++current_cursor;
break;
default:
return false;
break;
}
}
if (current_cursor - mmap_head_->read_cursor_cache_ > aligned_blocks_count_) {
return false;
}
if (!data_parse_finished) {
write_cursor_.atomic_value.store(current_cursor, platform::memory_order::release);
}
write_cursor_.atomic_value.store(current_cursor, platform::memory_order::release);
read_cursor_.atomic_value.store(mmap_head_->read_cursor_cache_, platform::memory_order::release);
#if BQ_RING_BUFFER_DEBUG
reading_cursor_tmp_ = (uint32_t)-1;
#endif

for (uint32_t i = current_cursor; i < mmap_head_->read_cursor_cache_ + aligned_blocks_count_; ++i) {
RING_BUFFER_ATOMIC_CAST_IGNORE_ALIGNMENT(cursor_to_block(i).block_head.status, block_status).store(block_status::unused, bq::platform::memory_order::release);
}
return true;
}

Expand Down

0 comments on commit 98353eb

Please sign in to comment.