Skip to content

Commit

Permalink
serial port write queuing
Browse files Browse the repository at this point in the history
  • Loading branch information
neilstephens committed Jan 5, 2024
1 parent c1c72d0 commit efd14e4
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 12 deletions.
46 changes: 35 additions & 11 deletions src/ProtoConv/SerialPortsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ SerialPortsManager::SerialPortsManager(asio::io_context& IOC, const std::vector<
IOC(IOC),
handler_tracker(std::make_shared<char>()),
port_settings(devs.size()),
port_write_idx(0),
write_q_strand(IOC),
num_ports(devs.size()),
bufs(devs.size()),
ReadHandler(read_handler)
{
size_t i = 0;
for(auto& dev : devs)
{
ports.emplace_back(IOC);
ports.back().open(dev);
strands.emplace_back(IOC);
idle_port_idx_q.push_back(i++);
}
}

Expand Down Expand Up @@ -189,19 +191,41 @@ void SerialPortsManager::Read(asio::io_context::strand& strand, asio::serial_por

void SerialPortsManager::Write(std::vector<uint8_t>&& data)
{
auto idx = port_write_idx++ % num_ports;
auto& port = ports[idx];
auto& strand = strands[idx];
auto pBuf = std::make_shared<std::vector<uint8_t>>(std::move(data));
Write(pBuf, handler_tracker);
}

strand.post([pBuf,&port,tracker{handler_tracker}]()
void SerialPortsManager::Write(std::shared_ptr<std::vector<uint8_t>> pBuf, std::shared_ptr<void> tracker)
{
write_q_strand.post([this,pBuf,tracker]()
{
asio::async_write(port,asio::buffer(pBuf->data(),pBuf->size()),asio::transfer_all(),[pBuf,tracker](asio::error_code err, size_t n)
if(idle_port_idx_q.empty())
write_q.push_back(pBuf);
else
{
if(err)
spdlog::get("ProtoConv")->error("Wrote {} bytes to serial, return error '{}'.",n,err.message());
else
spdlog::get("ProtoConv")->trace("Wrote {} bytes to serial.",n);
});
auto idx = idle_port_idx_q.front();
idle_port_idx_q.pop_front();
auto& port = ports[idx];
auto& strand = strands[idx];

strand.post([this,pBuf,&port,idx,tracker]()
{
asio::async_write(port,asio::buffer(pBuf->data(),pBuf->size()),asio::transfer_all(),write_q_strand.wrap([this,idx,tracker](asio::error_code err, size_t n)
{
if(err)
spdlog::get("ProtoConv")->error("Wrote {} bytes to serial, return error '{}'.",n,err.message());
else
spdlog::get("ProtoConv")->trace("Wrote {} bytes to serial.",n);

idle_port_idx_q.push_back(idx);
if(!write_q.empty())
{
auto pBuf = write_q.front();
write_q.pop_front();
Write(pBuf,tracker);
}
}));
});
}
});
}
6 changes: 5 additions & 1 deletion src/ProtoConv/SerialPortsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <functional>
#include <atomic>
#include <deque>

using buf_t = asio::basic_streambuf<std::allocator<char>>;

Expand Down Expand Up @@ -55,12 +56,15 @@ class SerialPortsManager
std::shared_ptr<void> handler_tracker;
std::vector<SerialDeviceSettings> port_settings;
std::vector<asio::serial_port> ports;
std::atomic_size_t port_write_idx;
std::deque<size_t> idle_port_idx_q;
std::deque<std::shared_ptr<std::vector<uint8_t>>> write_q;
asio::io_context::strand write_q_strand;
const size_t num_ports;
std::vector<buf_t> bufs;
const std::function<void(buf_t& readbuf)> ReadHandler;

void Read(asio::io_context::strand& strand, asio::serial_port& port, buf_t& buf);
void Write(std::shared_ptr<std::vector<uint8_t>> pBuf, std::shared_ptr<void> tracker);
};

#endif // SERIALPORTSMANAGER_H

0 comments on commit efd14e4

Please sign in to comment.