Skip to content

Commit

Permalink
core: sv compression
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Jul 13, 2023
1 parent 6046d2c commit 09252c4
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 8 deletions.
57 changes: 51 additions & 6 deletions ndn-svs/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,18 @@
*/

#include "core.hpp"
#include "tlv.hpp"

#include <ndn-cxx/security/signing-helpers.hpp>
#include <ndn-cxx/security/verification-helpers.hpp>
#include <ndn-cxx/encoding/buffer-stream.hpp>

#ifdef NDN_SVS_COMPRESSION
#include <boost/iostreams/filter/lzma.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/device/array.hpp>
#include <boost/iostreams/filtering_stream.hpp>
#endif

namespace ndn::svs {

Expand All @@ -32,7 +41,6 @@ SVSyncCore::SVSyncCore(ndn::Face& face,
, m_id(nid)
, m_onUpdate(onUpdate)
, m_rng(ndn::random::getRandomNumberEngine())
, m_packetDist(10, 15)
, m_retxDist(30000 * 0.9, 30000 * 1.1)
, m_intrReplyDist(0, 75)
, m_keyChainMem("pib-memory:", "tpm-memory:")
Expand Down Expand Up @@ -95,10 +103,33 @@ SVSyncCore::onSyncInterestValidated(const Interest &interest)
std::shared_ptr<VersionVector> vvOther;
try
{
vvOther = std::make_shared<VersionVector>(n.get(-2));
ndn::Block vvBlock = n.get(-2);

// Decompress if necessary
if (vvBlock.type() == 211) {
#ifdef NDN_SVS_COMPRESSION
boost::iostreams::filtering_istreambuf in;
in.push(boost::iostreams::lzma_decompressor());
in.push(boost::iostreams::array_source(reinterpret_cast<const char*>(vvBlock.value()), vvBlock.value_size()));
ndn::OBufferStream decompressed;
boost::iostreams::copy(in, decompressed);

auto inner = ndn::Block::fromBuffer(decompressed.buf());
if (!std::get<0>(inner)) {
throw ndn::tlv::Error("Failed to decode inner block");
}

vvBlock = std::get<1>(inner);
#else
throw ndn::tlv::Error("SVS was compiled without compression support");
#endif
}

vvOther = std::make_shared<VersionVector>(vvBlock);
}
catch (ndn::tlv::Error&)
{
// TODO: log error
return;
}

Expand Down Expand Up @@ -171,22 +202,36 @@ SVSyncCore::sendSyncInterest()
if (!m_initialized)
return;

Name syncName(m_syncPrefix);
Interest interest;
interest.setApplicationParameters(span<const uint8_t>{'0'});

ndn::Block vvWire;
{
std::lock_guard<std::mutex> lock(m_vvMutex);
syncName.append(Name::Component(m_vv.encode()));
vvWire = m_vv.encode();

// Add parameters digest
interest.setApplicationParameters(span<const uint8_t>{'0'});

if (m_getExtraBlock)
{
interest.setApplicationParameters(m_getExtraBlock(m_vv));
}
}

// Create sync interest name
Name syncName(m_syncPrefix);

#ifdef NDN_SVS_COMPRESSION
vvWire.encode();
boost::iostreams::filtering_istreambuf in;
in.push(boost::iostreams::lzma_compressor(boost::iostreams::lzma::best_compression));
in.push(boost::iostreams::array_source(reinterpret_cast<const char*>(vvWire.data()), vvWire.size()));
ndn::OBufferStream compressed;
boost::iostreams::copy(in, compressed);
vvWire = ndn::Block(svs::tlv::StateVectorLzma, compressed.buf());
#endif

syncName.append(Name::Component(vvWire));

interest.setName(syncName);
interest.setInterestLifetime(time::milliseconds(1));

Expand Down
2 changes: 0 additions & 2 deletions ndn-svs/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ class SVSyncCore : noncopyable

// Random Engine
ndn::random::RandomNumberEngine& m_rng;
// Milliseconds between sending two packets in the queues
std::uniform_int_distribution<> m_packetDist;
// Milliseconds between sending two sync interests
std::uniform_int_distribution<> m_retxDist;
// Milliseconds to send sync interest reply after
Expand Down
1 change: 1 addition & 0 deletions ndn-svs/tlv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ enum {
SeqNo = 204,
MappingData = 205,
MappingEntry = 206,
StateVectorLzma = 211,
};

} // namespace ndn::svs::tlv
Expand Down
7 changes: 7 additions & 0 deletions wscript
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def options(opt):
optgrp.add_option('--with-tests', action='store_true', default=False,
help='Build unit tests')

optgrp.add_option('--with-compression', action='store_true', default=False,
dest='with_compression', help='Build with state vector compression extension')

def configure(conf):
conf.start_msg('Building static library')
if conf.options.enable_static:
Expand Down Expand Up @@ -71,6 +74,10 @@ def configure(conf):
if conf.env.WITH_TESTS:
boost_libs.append('unit_test_framework')

if conf.options.with_compression:
boost_libs.append('iostreams')
conf.define('COMPRESSION', 1)

conf.check_boost(lib=boost_libs, mt=True)

conf.check_compiler_flags()
Expand Down

0 comments on commit 09252c4

Please sign in to comment.