From 531f3279f8b6d20bfa8411dd57c7808f5f4f0056 Mon Sep 17 00:00:00 2001 From: Neil Stephens Date: Wed, 10 Jan 2024 13:00:30 +1100 Subject: [PATCH] ProtocConv DNP3 fragment re-ordering --- src/ProtoConv/DNP3FragHandler.cpp | 92 +++++++++++++++++++++++++++++++ src/ProtoConv/DNP3FragHandler.h | 89 ++++++++++++++++++++++++++++++ src/ProtoConv/FrameChecker.h | 8 +-- src/ProtoConv/ProtoConv.cpp | 10 ++-- src/ProtoConv/ProtoConv.h | 2 +- 5 files changed, 191 insertions(+), 10 deletions(-) create mode 100644 src/ProtoConv/DNP3FragHandler.cpp create mode 100644 src/ProtoConv/DNP3FragHandler.h diff --git a/src/ProtoConv/DNP3FragHandler.cpp b/src/ProtoConv/DNP3FragHandler.cpp new file mode 100644 index 0000000..244009c --- /dev/null +++ b/src/ProtoConv/DNP3FragHandler.cpp @@ -0,0 +1,92 @@ +/* MiniPlex + * + * Copyright (c) 2023: Neil Stephens + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "DNP3FragHandler.h" +#include + +DNP3FragHandler::DNP3FragHandler(const std::function pBuf, const size_t sz)>& write_handler, asio::io_context& IOC): + FragHandler(write_handler), + IOC(IOC), + marshalling_strand(IOC) +{} + +void DNP3FragHandler::HandleFrame(const Frame& frame) +{ + spdlog::get("ProtoConv")->trace("DNP3FragHandler::HandleFrame(): Got frame for flow 0x{:08x}.",frame.flow); + marshalling_strand.post([this,frame{Frame(frame)}]() + { + auto& pTxFlow = TransportFlows[frame.flow]; + if(!pTxFlow) + { + spdlog::get("ProtoConv")->debug("DNP3FragHandler::HandleFrame(): Initialising new flow queue 0x{:08x}.",frame.flow); + pTxFlow = std::make_shared(IOC,frame.flow); + } + pTxFlow->strand.post([this,pTxFlow,frame{std::move(frame)}] + { + if(frame.isFragment()) + { + //TODO: check for more bail-out conditions: + // dup seq, other? + if((pTxFlow->hasFir && frame.fir) || (pTxFlow->hasFin && frame.fin) || pTxFlow->hasSeq.contains(frame.seq)) + { + spdlog::get("ProtoConv")->warn("DNP3FragHandler::HandleFrame(): Flow 0x{:08x}, Duplicate FIR/FIN/Seq (FIR: {}, FIN: {}, SEQ: {}). Q: {}",pTxFlow->id,frame.fir,frame.fin,frame.seq,ToString(pTxFlow->frag_q)); + Flush(pTxFlow); + } + + spdlog::get("ProtoConv")->trace("DNP3FragHandler::HandleFrame(): Queuing fragment (FIR: {}, FIN: {}, SEQ: {}) on flow 0x{:08x}.",frame.fir,frame.fin,frame.seq,frame.flow); + if(frame.fin) + { + pTxFlow->hasFin = true; + pTxFlow->finSeq = frame.seq; + } + if(frame.fir) + { + pTxFlow->hasFir = true; + pTxFlow->firSeq = frame.seq; + } + pTxFlow->hasSeq.insert(frame.seq); + pTxFlow->frag_q.emplace(std::move(frame)); + spdlog::get("ProtoConv")->critical("DNP3FragHandler::HandleFrame(): Flow 0x{:08x} Q: {}.",pTxFlow->id,ToString(pTxFlow->frag_q)); + if((pTxFlow->hasFir && pTxFlow->hasFin) + && (pTxFlow->firSeq + pTxFlow->frag_q.size()-1)%64 == pTxFlow->finSeq) + { + spdlog::get("ProtoConv")->trace("DNP3FragHandler::HandleFrame(): Full set of fragments collected for flow 0x{:08x}.",pTxFlow->id); + Flush(pTxFlow); + } + } + else if(!pTxFlow->frag_q.empty()) + { + spdlog::get("ProtoConv")->warn("DNP3FragHandler::HandleFrame(): Flow 0x{:08x}, Non-fragment frame (FIR: {}, FIN: {}, SEQ: {}) with unfinished fragment Q: {}",pTxFlow->id,frame.fir,frame.fin,frame.seq,ToString(pTxFlow->frag_q)); + Flush(pTxFlow); + } + else + WriteHandler(frame.pBuf,frame.len); + }); + }); +} + +void DNP3FragHandler::Flush(const std::shared_ptr& pTxFlow) +{ + spdlog::get("ProtoConv")->trace("DNP3FragHandler::Flush(): Flow 0x{:08x}.",pTxFlow->id); + while(!pTxFlow->frag_q.empty()) + { + WriteHandler(pTxFlow->frag_q.top().pBuf,pTxFlow->frag_q.top().len); + pTxFlow->frag_q.pop(); + } + pTxFlow->hasFin = false; + pTxFlow->hasFir = false; + pTxFlow->hasSeq.clear(); +} diff --git a/src/ProtoConv/DNP3FragHandler.h b/src/ProtoConv/DNP3FragHandler.h new file mode 100644 index 0000000..4c10c2c --- /dev/null +++ b/src/ProtoConv/DNP3FragHandler.h @@ -0,0 +1,89 @@ +/* MiniPlex + * + * Copyright (c) 2023: Neil Stephens + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DNP3FRAGHANDLER_H +#define DNP3FRAGHANDLER_H + +#include "FragHandler.h" +#include +#include +#include +#include + +struct FragCmp +{ + bool operator()(const Frame& l, const Frame& r) + { + if(r.fir) return true; + if(l.fir) return false; + if(r.fin) return false; + if(l.fin) return true; + if(l.seq > r.seq) + { + if((l.seq - r.seq) < (r.seq+64 - l.seq)) return true; + return false; + } + if((r.seq - l.seq) < (l.seq+64 - r.seq)) return false; + return true; + } +}; + +struct TransportFlow +{ + TransportFlow() = delete; + explicit TransportFlow(asio::io_service& IOC, const uint64_t id): + strand(IOC), + id(id), + frag_q(FragCmp()) + {} + asio::io_service::strand strand; + const uint64_t id; + std::priority_queue,FragCmp> frag_q; + std::set hasSeq; + bool hasFin = false; + bool hasFir = false; + uint8_t finSeq = 0; + uint8_t firSeq = 0; +}; + +class DNP3FragHandler : public FragHandler +{ +public: + DNP3FragHandler(const std::function pBuf, const size_t sz)>& write_handler, asio::io_context& IOC); + void HandleFrame(const Frame& frame) override; +private: + void Flush(const std::shared_ptr& pTxFlow); + asio::io_context& IOC; + asio::io_service::strand marshalling_strand; //TODO: can TransportFlows be lockless threadsafe (like ODC EventDB) - then no need for this strand + std::unordered_map> TransportFlows; +}; + +inline std::string ToString(const std::priority_queue,FragCmp>& Q) +{ + struct QPrinter : std::priority_queue,FragCmp> // to access protected container + { + std::string ToString() const + { + std::ostringstream oss; + for (const auto& f : this->c) + oss << f.seq << ((f.fir||f.fin)?"(":"") << (f.fir?"FIR|":"") << (f.fin?"|FIN":"") << ((f.fir||f.fin)?")":"") << ' '; + return oss.str(); + } + }; + return static_cast(Q).ToString(); +} + +#endif // DNP3FRAGHANDLER_H diff --git a/src/ProtoConv/FrameChecker.h b/src/ProtoConv/FrameChecker.h index 304d9a1..a09065c 100644 --- a/src/ProtoConv/FrameChecker.h +++ b/src/ProtoConv/FrameChecker.h @@ -38,10 +38,10 @@ struct Frame operator bool() { return len != 0; }; std::shared_ptr pBuf = nullptr; size_t len; - const bool fir; - const bool fin; - const uint64_t flow; - const size_t seq; + /*const*/ bool fir; + /*const*/ bool fin; + /*const*/ uint64_t flow; + /*const*/ size_t seq; }; struct FrameChecker diff --git a/src/ProtoConv/ProtoConv.cpp b/src/ProtoConv/ProtoConv.cpp index 0f7fcad..b0f5fc3 100644 --- a/src/ProtoConv/ProtoConv.cpp +++ b/src/ProtoConv/ProtoConv.cpp @@ -24,6 +24,7 @@ #include "NullFrameChecker.h" #include "DNP3FrameChecker.h" #include "NopFragHandler.h" +#include "DNP3FragHandler.h" #include #include #include @@ -38,12 +39,11 @@ ProtoConv::ProtoConv(const CmdArgs& Args, asio::io_context& IOC): socket_strand(IOC), process_strand(IOC) { - auto writer = [this](std::shared_ptr pBuf, const size_t sz){WriteHandler(pBuf,sz);}; + auto writer = [this](std::shared_ptr pBuf, const size_t sz){WriteUDPHandler(pBuf,sz);}; if(Args.FrameProtocol.getValue() == "DNP3") { pFramer = std::make_shared(); - //TODO: use proper dnp3 frag handler - pFragHandler = std::make_shared(writer); + pFragHandler = std::make_shared(writer,IOC); } else if(Args.FrameProtocol.getValue() == "Null") { @@ -157,11 +157,11 @@ void ProtoConv::RcvStreamHandler(buf_t& buf) } } -void ProtoConv::WriteHandler(std::shared_ptr pBuf, const size_t sz) +void ProtoConv::WriteUDPHandler(std::shared_ptr pBuf, const size_t sz) { socket_strand.post([this,pBuf,sz]() { - spdlog::get("ProtoConv")->trace("WriteHandler(): Forwarding frame length {} to UDP",sz); + spdlog::get("ProtoConv")->trace("WriteUDPHandler(): Forwarding frame length {} to UDP",sz); socket.async_send(asio::buffer(pBuf.get(),sz),[pBuf](asio::error_code,size_t){}); //this is safe to call again before the handler is called - because it's non-composed // according to stackoverflow there is a queue for the file descriptor under-the-hood diff --git a/src/ProtoConv/ProtoConv.h b/src/ProtoConv/ProtoConv.h index 212df3a..0c70834 100644 --- a/src/ProtoConv/ProtoConv.h +++ b/src/ProtoConv/ProtoConv.h @@ -41,7 +41,7 @@ class ProtoConv void RcvUDP(); void RcvUDPHandler(const asio::error_code err, const uint8_t* const buf, const size_t n); void RcvStreamHandler(buf_t& buf); - void WriteHandler(std::shared_ptr pBuf, const size_t sz); + void WriteUDPHandler(std::shared_ptr pBuf, const size_t sz); const CmdArgs& Args; asio::io_context& IOC;