Skip to content

Commit

Permalink
ground work for handling DNP3 transport ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
neilstephens committed Jan 9, 2024
1 parent 725a69c commit d336fb8
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 33 deletions.
28 changes: 15 additions & 13 deletions src/ProtoConv/DNP3FrameChecker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,17 @@ std::string buf2hex(const uint8_t* data, size_t size)
return str;
}

size_t DNP3FrameChecker::CheckFrame(const buf_t& readbuf)
Frame DNP3FrameChecker::CheckFrame(const buf_t& readbuf)
{
if(readbuf.size() < 1)
return 0;
return Frame(0);

size_t length = 0;
//parse datalink header
if(auto num_unframed_bytes = StartBytesOffset(readbuf))
{
spdlog::get("ProtoConv")->warn("DNP3FrameChecker::CheckFrame(): {} unframed bytes.",num_unframed_bytes);
return num_unframed_bytes;
return Frame(num_unframed_bytes);
}

if(readbuf.size() >= 10)
Expand All @@ -164,37 +164,39 @@ size_t DNP3FrameChecker::CheckFrame(const buf_t& readbuf)
//CRC failed - not real start bytes
// return them and move on
spdlog::get("ProtoConv")->warn("DNP3FrameChecker::CheckFrame(): false start bytes (header CRC failed).");
return 2;
return Frame(2);
}
}
else //found start bytes, but not enough data to check CRC
{
spdlog::get("ProtoConv")->trace("DNP3FrameChecker::CheckFrame(): Waiting for full header.");
return 0;
return Frame(0);
}

//we have a CRC verified header and know the full frame length
if(readbuf.size() < length)
{
// but don't have the whole frame yet
spdlog::get("ProtoConv")->trace("DNP3FrameChecker::CheckFrame(): Valid header, waiting for full frame.");
return 0;
}
else if(length == 10)
{
spdlog::get("ProtoConv")->trace("DNP3FrameChecker::CheckFrame(): Valid header-only frame.");
return length;
return Frame(0);
}
else
{
spdlog::get("ProtoConv")->trace("DNP3FrameChecker::CheckFrame(): Valid header, with full frame length.");
const uint32_t address_bytes = *(asio::buffers_begin(readbuf.data())+4);
if(length == 10)
{
spdlog::get("ProtoConv")->trace("DNP3FrameChecker::CheckFrame(): Header-only frame.");
return Frame(length,true,true,address_bytes);
}
//check all the block CRCs
bool GoodCRCs;
size_t start_offset = 10;
const uint8_t tx_ctl_byte = *(asio::buffers_begin(readbuf.data())+start_offset);
const bool tx_fin = tx_ctl_byte & 0x80;
const bool tx_fir = tx_ctl_byte & 0x40;
const uint8_t tx_seq = tx_ctl_byte & 0x3F;
Frame frame(length,tx_fir,tx_fin,address_bytes,tx_seq);
do
{
auto n = (length-start_offset >= 18) ? 16 : length-start_offset-2;
Expand All @@ -214,13 +216,13 @@ size_t DNP3FrameChecker::CheckFrame(const buf_t& readbuf)
if(GoodCRCs)
{
spdlog::get("ProtoConv")->trace("DNP3FrameChecker::CheckFrame(): Full valid frame.");
return length;
return frame;
}
else
{
spdlog::get("ProtoConv")->warn("DNP3FrameChecker::CheckFrame(): Corrupt frame: Transport function (FIR: {}, FIN: {}, SEQ: {})",tx_fir,tx_fin,tx_seq);
//CRC failed - count all the CRC checked bytes
return start_offset - 18;
return Frame(start_offset - 18);
}
}
}
2 changes: 1 addition & 1 deletion src/ProtoConv/DNP3FrameChecker.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class DNP3FrameChecker : public FrameChecker
{
public:
size_t CheckFrame(const buf_t& readbuf) override;
Frame CheckFrame(const buf_t& readbuf) override;
};

#endif // DNP3FRAMECHECKER_H
35 changes: 35 additions & 0 deletions src/ProtoConv/FragHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* MiniPlex
*
* Copyright (c) 2023: Neil Stephens
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FRAGHANDLER_H
#define FRAGHANDLER_H

#include "FrameChecker.h"
#include <functional>

class FragHandler
{
public:
FragHandler(const std::function<void(std::shared_ptr<uint8_t> pBuf, const size_t sz)>& write_handler):
WriteHandler(write_handler)
{}
virtual ~FragHandler() = default;
virtual void HandleFrame(const Frame& frame) = 0;
protected:
const std::function<void(std::shared_ptr<uint8_t> pBuf, const size_t sz)> WriteHandler;
};

#endif // FRAGHANDLER_H
25 changes: 24 additions & 1 deletion src/ProtoConv/FrameChecker.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,33 @@

using buf_t = asio::basic_streambuf<std::allocator<char>>;

struct Frame
{
explicit Frame(const size_t len,
const bool fir = true,
const bool fin = true,
const uint64_t flow = 0,
const size_t seq = 0):
len(len),
fir(fir),
fin(fin),
flow(flow),
seq(seq)
{}
bool isFragment() const { return !(fir && fin);}
operator bool() { return len == 0; };
std::shared_ptr<uint8_t> pBuf = nullptr;
size_t len;
const bool fir;
const bool fin;
const uint64_t flow;
const size_t seq;
};

struct FrameChecker
{
virtual ~FrameChecker() = default;
virtual size_t CheckFrame(const buf_t& readbuf) = 0;
virtual Frame CheckFrame(const buf_t& readbuf) = 0;
};

#endif // FRAMECHECKER_H
34 changes: 34 additions & 0 deletions src/ProtoConv/NopFragHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* MiniPlex
*
* Copyright (c) 2023: Neil Stephens
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef NOPFRAGHANDLER_H
#define NOPFRAGHANDLER_H

#include "FragHandler.h"

class NopFragHandler : public FragHandler
{
public:
NopFragHandler(const std::function<void(std::shared_ptr<uint8_t> pBuf, const size_t sz)>& write_handler):
FragHandler(write_handler)
{}
void HandleFrame(const Frame& frame) override
{
WriteHandler(frame.pBuf,frame.len);
}
};

#endif // NOPFRAGHANDLER_H
4 changes: 2 additions & 2 deletions src/ProtoConv/NullFrameChecker.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
class NullFrameChecker : public FrameChecker
{
public:
inline size_t CheckFrame(const buf_t& readbuf) override
inline Frame CheckFrame(const buf_t& readbuf) override
{
//Don't check anything. Everything is a valid 'frame'.
return readbuf.size();
return Frame(readbuf.size());
}
};

Expand Down
47 changes: 31 additions & 16 deletions src/ProtoConv/ProtoConv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "TCPSocketManager.h"
#include "NullFrameChecker.h"
#include "DNP3FrameChecker.h"
#include "NopFragHandler.h"
#include <asio.hpp>
#include <spdlog/spdlog.h>
#include <memory>
Expand All @@ -37,10 +38,18 @@ ProtoConv::ProtoConv(const CmdArgs& Args, asio::io_context& IOC):
socket_strand(IOC),
process_strand(IOC)
{
auto writer = [this](std::shared_ptr<uint8_t> pBuf, const size_t sz){WriteHandler(pBuf,sz);};
if(Args.FrameProtocol.getValue() == "DNP3")
{
pFramer = std::make_shared<DNP3FrameChecker>();
//TODO: use proper dnp3 frag handler
pFragHandler = std::make_shared<NopFragHandler>(writer);
}
else if(Args.FrameProtocol.getValue() == "Null")
{
pFramer = std::make_shared<NullFrameChecker>();
pFragHandler = std::make_shared<NopFragHandler>(writer);
}
else
throw std::invalid_argument("Unknown frame protocol: "+Args.FrameProtocol.getValue());

Expand Down Expand Up @@ -125,31 +134,37 @@ void ProtoConv::RcvUDPHandler(const asio::error_code err, const uint8_t* const b
void ProtoConv::RcvStreamHandler(buf_t& buf)
{
spdlog::get("ProtoConv")->trace("RcvStreamHandler(): {} bytes in buffer.",buf.size());
while(auto frame_len = pFramer->CheckFrame(buf))
while(auto frame = pFramer->CheckFrame(buf))
{
if(frame_len > buf.size())
if(frame.len > buf.size())
{
spdlog::get("ProtoConv")->error("RcvStreamHandler(): Frame checker claims frame size (), which is > size of buffer ().",frame_len,buf.size());
frame_len = buf.size();
spdlog::get("ProtoConv")->error("RcvStreamHandler(): Frame checker claims frame size (), which is > size of buffer ().",frame.len,buf.size());
frame.len = buf.size();
}
// The C++20 way causes a malloc error when asio tries to copy a handler with this style shared_ptr
//auto pForwardBuf = std::make_shared<uint8_t[]>(n);
// Use the old way instead - only difference should be the control block is allocated separately
auto pForwardBuf = std::shared_ptr<uint8_t>(new uint8_t[frame_len],[](uint8_t* p){delete[] p;});
const size_t ncopied = buf.sgetn(reinterpret_cast<char*>(pForwardBuf.get()),frame_len);
if(ncopied != frame_len)
auto pForwardBuf = std::shared_ptr<uint8_t>(new uint8_t[frame.len],[](uint8_t* p){delete[] p;});
const size_t ncopied = buf.sgetn(reinterpret_cast<char*>(pForwardBuf.get()),frame.len);
if(ncopied != frame.len)
{
spdlog::get("ProtoConv")->warn("RcvStreamHandler(): Failed to copy whole frame to datagram buffer. Frame size {}, copied {}.",frame_len,ncopied);
frame_len = ncopied;
spdlog::get("ProtoConv")->warn("RcvStreamHandler(): Failed to copy whole frame to datagram buffer. Frame size {}, copied {}.",frame.len,ncopied);
frame.len = ncopied;
}

spdlog::get("ProtoConv")->trace("RcvStreamHandler(): Forwarding frame length {} to UDP",frame_len);
socket_strand.post([this,pForwardBuf,frame_len]()
{
//this is safe to call again before the handler is called - because it's non-composed
// according to stackoverflow there is a queue for the file descriptor under-the-hood
socket.async_send(asio::buffer(pForwardBuf.get(),frame_len),[pForwardBuf](asio::error_code,size_t){});
});
frame.pBuf = pForwardBuf;
pFragHandler->HandleFrame(frame);
}
}

void ProtoConv::WriteHandler(std::shared_ptr<uint8_t> pBuf, const size_t sz)
{
socket_strand.post([this,pBuf,sz]()
{
spdlog::get("ProtoConv")->trace("WriteHandler(): Forwarding frame length {} to UDP",sz);
socket.async_send(asio::buffer(pBuf.get(),sz),[pBuf](asio::error_code,size_t){});
//this is safe to call again before the handler is called - because it's non-composed
// according to stackoverflow there is a queue for the file descriptor under-the-hood
});
}

3 changes: 3 additions & 0 deletions src/ProtoConv/ProtoConv.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "FrameChecker.h"
#include "StreamHandler.h"
#include "FragHandler.h"
#include <asio.hpp>
#include <atomic>
#include <memory>
Expand All @@ -40,11 +41,13 @@ class ProtoConv
void RcvUDP();
void RcvUDPHandler(const asio::error_code err, const uint8_t* const buf, const size_t n);
void RcvStreamHandler(buf_t& buf);
void WriteHandler(std::shared_ptr<uint8_t> pBuf, const size_t sz);

const CmdArgs& Args;
asio::io_context& IOC;

std::shared_ptr<FrameChecker> pFramer;
std::shared_ptr<FragHandler> pFragHandler;
std::shared_ptr<StreamHandler> pStream;

const asio::ip::udp::endpoint local_ep;
Expand Down

0 comments on commit d336fb8

Please sign in to comment.