Skip to content

Commit

Permalink
ProtocConv DNP3 fragment re-ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
neilstephens committed Jan 10, 2024
1 parent e0be239 commit 531f327
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 10 deletions.
92 changes: 92 additions & 0 deletions src/ProtoConv/DNP3FragHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/* MiniPlex
*
* Copyright (c) 2023: Neil Stephens
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DNP3FragHandler.h"
#include <spdlog/spdlog.h>

DNP3FragHandler::DNP3FragHandler(const std::function<void(std::shared_ptr<uint8_t> pBuf, const size_t sz)>& write_handler, asio::io_context& IOC):
FragHandler(write_handler),
IOC(IOC),
marshalling_strand(IOC)
{}

void DNP3FragHandler::HandleFrame(const Frame& frame)
{
spdlog::get("ProtoConv")->trace("DNP3FragHandler::HandleFrame(): Got frame for flow 0x{:08x}.",frame.flow);
marshalling_strand.post([this,frame{Frame(frame)}]()
{
auto& pTxFlow = TransportFlows[frame.flow];
if(!pTxFlow)
{
spdlog::get("ProtoConv")->debug("DNP3FragHandler::HandleFrame(): Initialising new flow queue 0x{:08x}.",frame.flow);
pTxFlow = std::make_shared<TransportFlow>(IOC,frame.flow);
}
pTxFlow->strand.post([this,pTxFlow,frame{std::move(frame)}]
{
if(frame.isFragment())
{
//TODO: check for more bail-out conditions:
// dup seq, other?
if((pTxFlow->hasFir && frame.fir) || (pTxFlow->hasFin && frame.fin) || pTxFlow->hasSeq.contains(frame.seq))
{
spdlog::get("ProtoConv")->warn("DNP3FragHandler::HandleFrame(): Flow 0x{:08x}, Duplicate FIR/FIN/Seq (FIR: {}, FIN: {}, SEQ: {}). Q: {}",pTxFlow->id,frame.fir,frame.fin,frame.seq,ToString(pTxFlow->frag_q));
Flush(pTxFlow);
}

spdlog::get("ProtoConv")->trace("DNP3FragHandler::HandleFrame(): Queuing fragment (FIR: {}, FIN: {}, SEQ: {}) on flow 0x{:08x}.",frame.fir,frame.fin,frame.seq,frame.flow);
if(frame.fin)
{
pTxFlow->hasFin = true;
pTxFlow->finSeq = frame.seq;
}
if(frame.fir)
{
pTxFlow->hasFir = true;
pTxFlow->firSeq = frame.seq;
}
pTxFlow->hasSeq.insert(frame.seq);
pTxFlow->frag_q.emplace(std::move(frame));
spdlog::get("ProtoConv")->critical("DNP3FragHandler::HandleFrame(): Flow 0x{:08x} Q: {}.",pTxFlow->id,ToString(pTxFlow->frag_q));
if((pTxFlow->hasFir && pTxFlow->hasFin)
&& (pTxFlow->firSeq + pTxFlow->frag_q.size()-1)%64 == pTxFlow->finSeq)
{
spdlog::get("ProtoConv")->trace("DNP3FragHandler::HandleFrame(): Full set of fragments collected for flow 0x{:08x}.",pTxFlow->id);
Flush(pTxFlow);
}
}
else if(!pTxFlow->frag_q.empty())
{
spdlog::get("ProtoConv")->warn("DNP3FragHandler::HandleFrame(): Flow 0x{:08x}, Non-fragment frame (FIR: {}, FIN: {}, SEQ: {}) with unfinished fragment Q: {}",pTxFlow->id,frame.fir,frame.fin,frame.seq,ToString(pTxFlow->frag_q));
Flush(pTxFlow);
}
else
WriteHandler(frame.pBuf,frame.len);
});
});
}

void DNP3FragHandler::Flush(const std::shared_ptr<TransportFlow>& pTxFlow)
{
spdlog::get("ProtoConv")->trace("DNP3FragHandler::Flush(): Flow 0x{:08x}.",pTxFlow->id);
while(!pTxFlow->frag_q.empty())
{
WriteHandler(pTxFlow->frag_q.top().pBuf,pTxFlow->frag_q.top().len);
pTxFlow->frag_q.pop();
}
pTxFlow->hasFin = false;
pTxFlow->hasFir = false;
pTxFlow->hasSeq.clear();
}
89 changes: 89 additions & 0 deletions src/ProtoConv/DNP3FragHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/* MiniPlex
*
* Copyright (c) 2023: Neil Stephens
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DNP3FRAGHANDLER_H
#define DNP3FRAGHANDLER_H

#include "FragHandler.h"
#include <asio.hpp>
#include <queue>
#include <set>
#include <memory>

struct FragCmp
{
bool operator()(const Frame& l, const Frame& r)
{
if(r.fir) return true;
if(l.fir) return false;
if(r.fin) return false;
if(l.fin) return true;
if(l.seq > r.seq)
{
if((l.seq - r.seq) < (r.seq+64 - l.seq)) return true;
return false;
}
if((r.seq - l.seq) < (l.seq+64 - r.seq)) return false;
return true;
}
};

struct TransportFlow
{
TransportFlow() = delete;
explicit TransportFlow(asio::io_service& IOC, const uint64_t id):
strand(IOC),
id(id),
frag_q(FragCmp())
{}
asio::io_service::strand strand;
const uint64_t id;
std::priority_queue<Frame,std::deque<Frame>,FragCmp> frag_q;
std::set<uint8_t> hasSeq;
bool hasFin = false;
bool hasFir = false;
uint8_t finSeq = 0;
uint8_t firSeq = 0;
};

class DNP3FragHandler : public FragHandler
{
public:
DNP3FragHandler(const std::function<void(std::shared_ptr<uint8_t> pBuf, const size_t sz)>& write_handler, asio::io_context& IOC);
void HandleFrame(const Frame& frame) override;
private:
void Flush(const std::shared_ptr<TransportFlow>& pTxFlow);
asio::io_context& IOC;
asio::io_service::strand marshalling_strand; //TODO: can TransportFlows be lockless threadsafe (like ODC EventDB) - then no need for this strand
std::unordered_map<uint64_t,std::shared_ptr<TransportFlow>> TransportFlows;
};

inline std::string ToString(const std::priority_queue<Frame,std::deque<Frame>,FragCmp>& Q)
{
struct QPrinter : std::priority_queue<Frame,std::deque<Frame>,FragCmp> // to access protected container
{
std::string ToString() const
{
std::ostringstream oss;
for (const auto& f : this->c)
oss << f.seq << ((f.fir||f.fin)?"(":"") << (f.fir?"FIR|":"") << (f.fin?"|FIN":"") << ((f.fir||f.fin)?")":"") << ' ';
return oss.str();
}
};
return static_cast<const QPrinter&>(Q).ToString();
}

#endif // DNP3FRAGHANDLER_H
8 changes: 4 additions & 4 deletions src/ProtoConv/FrameChecker.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ struct Frame
operator bool() { return len != 0; };
std::shared_ptr<uint8_t> pBuf = nullptr;
size_t len;
const bool fir;
const bool fin;
const uint64_t flow;
const size_t seq;
/*const*/ bool fir;
/*const*/ bool fin;
/*const*/ uint64_t flow;
/*const*/ size_t seq;
};

struct FrameChecker
Expand Down
10 changes: 5 additions & 5 deletions src/ProtoConv/ProtoConv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "NullFrameChecker.h"
#include "DNP3FrameChecker.h"
#include "NopFragHandler.h"
#include "DNP3FragHandler.h"
#include <asio.hpp>
#include <spdlog/spdlog.h>
#include <memory>
Expand All @@ -38,12 +39,11 @@ ProtoConv::ProtoConv(const CmdArgs& Args, asio::io_context& IOC):
socket_strand(IOC),
process_strand(IOC)
{
auto writer = [this](std::shared_ptr<uint8_t> pBuf, const size_t sz){WriteHandler(pBuf,sz);};
auto writer = [this](std::shared_ptr<uint8_t> pBuf, const size_t sz){WriteUDPHandler(pBuf,sz);};
if(Args.FrameProtocol.getValue() == "DNP3")
{
pFramer = std::make_shared<DNP3FrameChecker>();
//TODO: use proper dnp3 frag handler
pFragHandler = std::make_shared<NopFragHandler>(writer);
pFragHandler = std::make_shared<DNP3FragHandler>(writer,IOC);
}
else if(Args.FrameProtocol.getValue() == "Null")
{
Expand Down Expand Up @@ -157,11 +157,11 @@ void ProtoConv::RcvStreamHandler(buf_t& buf)
}
}

void ProtoConv::WriteHandler(std::shared_ptr<uint8_t> pBuf, const size_t sz)
void ProtoConv::WriteUDPHandler(std::shared_ptr<uint8_t> pBuf, const size_t sz)
{
socket_strand.post([this,pBuf,sz]()
{
spdlog::get("ProtoConv")->trace("WriteHandler(): Forwarding frame length {} to UDP",sz);
spdlog::get("ProtoConv")->trace("WriteUDPHandler(): Forwarding frame length {} to UDP",sz);
socket.async_send(asio::buffer(pBuf.get(),sz),[pBuf](asio::error_code,size_t){});
//this is safe to call again before the handler is called - because it's non-composed
// according to stackoverflow there is a queue for the file descriptor under-the-hood
Expand Down
2 changes: 1 addition & 1 deletion src/ProtoConv/ProtoConv.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ProtoConv
void RcvUDP();
void RcvUDPHandler(const asio::error_code err, const uint8_t* const buf, const size_t n);
void RcvStreamHandler(buf_t& buf);
void WriteHandler(std::shared_ptr<uint8_t> pBuf, const size_t sz);
void WriteUDPHandler(std::shared_ptr<uint8_t> pBuf, const size_t sz);

const CmdArgs& Args;
asio::io_context& IOC;
Expand Down

0 comments on commit 531f327

Please sign in to comment.