Skip to content

Commit

Permalink
Working on deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
kleag committed May 8, 2024
1 parent b462061 commit fc4ea8e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 21 deletions.
3 changes: 2 additions & 1 deletion deeplima/apps/deeplima.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "version/version.h"
#include "helpers/path_resolver.h"
#include "deeplima/segmentation/impl/segmentation_impl.h"

namespace po = boost::program_options;

Expand Down Expand Up @@ -421,7 +422,7 @@ void parse_file(std::istream& input,
bool tag_use_mp)
{
auto parsing_begin = std::chrono::high_resolution_clock::now();

std::dynamic_pointer_cast<deeplima::segmentation::impl::SegmentationClassifier>(psegm)->reset();
psegm->parse_from_stream([&input]
(uint8_t* buffer,
int32_t& read,
Expand Down
56 changes: 43 additions & 13 deletions deeplima/include/deeplima/utils/locked_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,42 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <stdexcept>
#include <vector>

struct locked_buffer_t
{
uint8_t* m_data;
uint32_t m_len;
uint32_t m_lock_count;
const char* m_char_aligned_data;
locked_buffer_t()
: m_data(nullptr),
m_len(0),
m_lock_count(0),
m_char_aligned_data(nullptr)
{
std::cerr << "locked_buffer_t::locked_buffer_t()"
<< (void*)this << std::endl;
}

~locked_buffer_t()
{
std::cerr << "locked_buffer_t::~locked_buffer_t() "
<< (void*)this << std::endl;
m_data = nullptr;
m_char_aligned_data = nullptr;
}

locked_buffer_t(const locked_buffer_t& other)
{
std::cerr << "locked_buffer_t::locked_buffer_t(other)"
<< (void*)this << std::endl;
m_data = other.m_data;
m_char_aligned_data = other.m_char_aligned_data;
m_len = other.m_len;
m_lock_count = other.m_lock_count;
}

locked_buffer_t& operator=(const locked_buffer_t&) = delete;

inline uint8_t* end() const
{
Expand All @@ -36,15 +62,19 @@ struct locked_buffer_t

inline void lock()
{
std::cerr << "locked_buffer_t::lock " << (void*)this << " " << m_lock_count;
m_lock_count++;
std::cerr << " -> " << m_lock_count << std::endl;
}

inline void unlock()
{
std::cerr << "locked_buffer_t::unlock " << (void*)this << " " << m_lock_count;
m_len = 0;
m_char_aligned_data = nullptr;
assert(m_lock_count > 0);
m_lock_count--;
std::cerr << " -> " << m_lock_count << std::endl;
}

inline void set_read_start(const char* new_start)
Expand All @@ -54,18 +84,18 @@ struct locked_buffer_t
m_char_aligned_data = new_start;
}

locked_buffer_t()
: m_data(nullptr),
m_len(0),
m_lock_count(0),
m_char_aligned_data(nullptr)
{}
// m_data and m_char_aligned_data are owned by the container of this buffer
uint8_t* m_data = nullptr;
uint32_t m_len = 0;
uint32_t m_lock_count = 0;
const char* m_char_aligned_data = nullptr;

};

struct locked_buffer_set_t
{
std::vector<locked_buffer_t> m_data;
uint8_t* m_mem;
uint8_t* m_mem = nullptr;
uint32_t m_max_buff_size;

locked_buffer_set_t(size_t /* currently unused. should be 2 times the number of threads */,
Expand All @@ -84,13 +114,12 @@ struct locked_buffer_set_t

void init(size_t n, uint32_t buffer_size)
{
std::cerr << "locked_buffer_set_t::init" << std::endl;
assert(n > 0);
assert(buffer_size > 0);

m_max_buff_size = buffer_size;

m_data.clear();
m_data.resize(n);
if (nullptr != m_mem)
{
delete[] m_mem;
Expand All @@ -104,6 +133,7 @@ struct locked_buffer_set_t

uint8_t* p_base = m_mem + 8;

m_data.resize(n);
for (size_t i = 0; i < m_data.size(); i++)
{
m_data[i].m_data = p_base + (i * buffer_size);
Expand Down
27 changes: 20 additions & 7 deletions deeplima/libs/tasks/segmentation/inference/segmentation_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void SegmentationImpl::init(size_t threads, size_t buffer_size_per_thread)

void SegmentationImpl::parse_from_stream(const read_callback_t fn)
{
// std::cerr << "SegmentationImpl::parse_from_stream" << std::endl;
std::cerr << "SegmentationImpl::parse_from_stream" << std::endl;
size_t n = 0;
bool just_started = true;
bool continue_reading = true;
Expand All @@ -85,18 +85,23 @@ void SegmentationImpl::parse_from_stream(const read_callback_t fn)
break;
}
counter += bytes_read;
// std::cerr << "Reading callback: " << bytes_read << " bytes, continue_reading="
// << continue_reading << " counter=" << counter
// << std::endl;
std::cerr << "SegmentationImpl::parse_from_stream Reading callback: "
<< bytes_read << " bytes, continue_reading="
<< continue_reading << " counter=" << counter << std::endl;
buff.m_char_aligned_data = (const char*)(buff.m_data);
buff.m_len = bytes_read;
std::cerr << "SegmentationImpl::parse_from_stream locking (m_buff_set) buff "
<< n << std::endl;
buff.lock();

int32_t pos = 0;
uint8_t* p = buff.m_data;
if (!just_started && 0 == n)
{
memcpy(p - 8, m_buff_set.get(m_buff_set.size() - 1).m_data + m_buff_set.max_buff_size() - 8, 8);
memcpy(p - 8,
m_buff_set.get(m_buff_set.size() - 1).m_data
+ m_buff_set.max_buff_size() - 8,
8);
}

// Warming up is required in the beginning of the text
Expand Down Expand Up @@ -153,8 +158,8 @@ void SegmentationImpl::parse_from_stream(const read_callback_t fn)
send_next_results();
}

// m_buff_set.pretty_print();
// SegmentationClassifier::pretty_print();
m_buff_set.pretty_print();
SegmentationClassifier::pretty_print();

n = m_buff_set.next(n);
}
Expand Down Expand Up @@ -239,6 +244,14 @@ void SegmentationImpl::send_next_results()
slot_idx = SegmentationClassifier::next_slot(slot_idx);
}

// We are in send_next_results
// Note, use get_lock_count from
// using SegmentationClassifier = RnnSequenceClassifier<eigen_impl::Model, eigen_impl::EmbdVectorizer, uint8_t> ;
// This one accesses its m_slots[idx].m_lock_count (std::vector<slot_t> member of RnnSequenceClassifier)
// while send_results (above but called below) do m_buff_set.get(i).unlock() (a
// locked_buffer_set_t), a member of SegmentationImpl
// Should we use SegmentationClassifier::decrement_lock_count in send_results too?

uint8_t lock_count = SegmentationClassifier::get_lock_count(slot_idx);

while (lock_count > 1)
Expand Down

0 comments on commit fc4ea8e

Please sign in to comment.