Skip to content

Commit

Permalink
e2sar-perf with Control Plane tested (#57)
Browse files Browse the repository at this point in the history
- Added ability to read SegmenterFlags and ReassemblerFlags via INI
files (with unit tests)
- For LBManager added ability to force address resolution to IPv4 or
IPv6 and appropriate options in lbadm, lbmon and e2sar_perf
- Removed extraneous flags in Segmenter and Reassembler
- Modified Reassembler constructor to explicitly take IP address to
listen on (and not use what is in URI, which only works with
back-to-back testing without real LB)
- Both Segmenter and Reassembler now take explicit IP addresses to use
(to send or receive)
- Updated snifgen.py to enable it to parse PCAP files. Also to be able
to capture/parse Sync and LBRE packets together (sent by sender)
- Updated e2sar-perf to support control plane and various other
additional options
- Extended stat reporting from Reassembler to include lost event IDs
(tuple of EventNum and dataID)
- Developed a FABRIC notebook for testing against live LB 
- Updated wiki
- Fixed packaging of libe2sar.a as a single artifact combining LB gRPC
code with E2SAR code
  • Loading branch information
cissieAB authored Oct 2, 2024
2 parents 1a78836 + 1959b6e commit 3f5179c
Show file tree
Hide file tree
Showing 42 changed files with 2,727 additions and 455 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/builddeps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
env:
GRPC_VER: 1.54.1
BOOST_VER: 1.85.0
E2SAR_VER: 0.1.1
E2SAR_VER: 0.1.2

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/deploydebs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name: Deploy E2sar debs and dependency debs to releases
on:
workflow_dispatch:
env:
E2SAR_VER: 0.1.1
E2SAR_VER: 0.1.2
E2SAR_DEP: 1.85.0-boost-1.54.1-grpc

jobs:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/deps2debs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Build gRPC+Boost .deb and .rpms
on:
workflow_dispatch:
env:
E2SAR_VER: 0.1.1
E2SAR_VER: 0.1.2
DEPS_VER: 1.85.0-boost-1.54.1-grpc
FINAL_INSTALL: /usr/local

Expand Down Expand Up @@ -67,7 +67,7 @@ jobs:
if [[ ${{ matrix.os }} == *"ubuntu"* ]]; then
fpm -s tar -t deb -n e2sar-deps -v ${{ env.E2SAR_VER }} --prefix=/ e2sar-deps.tar.gz
elif [[ ${{ matrix.os }} == *"rocky"* ]]; then
fpm -s tar -t deb -n e2sar-deps -v ${{ env.E2SAR_VER }} --prefix=/ e2sar-deps.tar.gz
fpm -s tar -t rpm -n e2sar-deps -v ${{ env.E2SAR_VER }} --prefix=/ e2sar-deps.tar.gz
fi
- name: Upload artifacts
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/distro.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Build E2SAR .deb and .rpms
on:
workflow_dispatch:
env:
E2SAR_VER: 0.1.1
E2SAR_VER: 0.1.2
E2SAR_DEP: boost-1.85.0-grpc-1.54.1
FINAL_INSTALL: /usr/local

Expand Down Expand Up @@ -110,7 +110,7 @@ jobs:
if [[ ${{ matrix.os }} == *"ubuntu"* ]]; then
fpm -s tar -t deb -n e2sar -v ${{ env.E2SAR_VER }} --prefix=/ e2sar.tar.gz
elif [[ ${{ matrix.os }} == *"rocky"* ]]; then
fpm -s tar -t deb -n e2sar -v ${{ env.E2SAR_VER }} --prefix=/ e2sar.tar.gz
fpm -s tar -t rpm -n e2sar -v ${{ env.E2SAR_VER }} --prefix=/ e2sar.tar.gz
fi
- name: Upload artifacts
Expand Down
2 changes: 1 addition & 1 deletion Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ PROJECT_NAME = "E2SAR"
# could be handy for archiving the generated documentation or if some version
# control system is used.

PROJECT_NUMBER = 0.1.0
PROJECT_NUMBER = 0.1.2

# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
Expand Down
181 changes: 168 additions & 13 deletions bin/e2sar_perf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,33 @@ bool threadsRunning = true;
u_int16_t reportThreadSleepMs{1000};
Reassembler *reasPtr{nullptr};
Segmenter *segPtr{nullptr};
LBManager *lbmPtr{nullptr};
std::vector<std::string> senders;

void ctrlCHandler(int sig)
{
std::cout << "Stopping threads" << std::endl;

if (segPtr != nullptr)
if (segPtr != nullptr) {
if (lbmPtr != nullptr) {
std::cout << "Removing senders: ";
for (auto s: senders)
std::cout << s << " ";
std::cout << std::endl;
auto rmres = lbmPtr->removeSenders(senders);
if (rmres.has_error())
std::cerr << "Unable to remove sender from list on exit: " << rmres.error().message() << std::endl;
}
segPtr->stopThreads();
}
if (reasPtr != nullptr)
{
std::cout << "Deregistering worker" << std::endl;
auto deregres = reasPtr->deregisterWorker();
if (deregres.has_error())
std::cerr << "Unable to deregister worker on exit: " << deregres.error().message() << std::endl;
reasPtr->stopThreads();
}
threadsRunning = false;
// instead of join
boost::chrono::milliseconds duration(1000);
Expand Down Expand Up @@ -130,10 +148,18 @@ result<int> sendEvents(Segmenter &s, EventNum_t startEventNum, size_t numEvents,
evtBufferPool->free(item);
boost::this_thread::sleep_until(until);
}
// sleep to allow small number of frames to leave
boost::chrono::seconds duration(1);
boost::this_thread::sleep_for(duration);

auto stats = s.getSendStats();

evtBufferPool->purge_memory();
std::cout << "Completed, " << stats.get<0>() << " frames sent, " << stats.get<1>() << " errors" << std::endl;
if (stats.get<1>() != 0)
{
std::cout << "Last error encountered: " << strerror(stats.get<2>()) << std::endl;
}
return 0;
}

Expand All @@ -146,6 +172,24 @@ result<int> recvEvents(Reassembler &r, int durationSec) {
EventNum_t evtNum;
u_int16_t dataId;

// register the worker (will be NOOP if withCP is set to false)
auto hostname_res = NetUtil::getHostName();
if (hostname_res.has_error())
{
return E2SARErrorInfo{hostname_res.error().code(), hostname_res.error().message()};
}
auto regres = r.registerWorker(hostname_res.value());
if (regres.has_error())
{
return E2SARErrorInfo{E2SARErrorc::RPCError,
"Unable to register worker node due to " + regres.error().message()};
}
if (regres.value() == 1)
std::cout << "Registered the worker" << std::endl;

// NOTE: if we switch the order of registerWorker and openAndStart
// you get into a race condition where the sendState thread starts and tries
// to send queue updates, however session token is not yet available...
auto openRes = r.openAndStart();
if (openRes.has_error())
return openRes;
Expand Down Expand Up @@ -188,11 +232,21 @@ result<int> recvEvents(Reassembler &r, int durationSec) {

void recvStatsThread(Reassembler *r)
{
std::vector<std::pair<EventNum_t, u_int16_t>> lostEvents;

while(threadsRunning)
{
auto nowT = boost::chrono::high_resolution_clock::now();

auto stats = r->getStats();

while(true)
{
auto res = r->get_LostEvent();
if (res.has_error())
break;
lostEvents.push_back(res.value());
}
/*
* - EventNum_t enqueueLoss; // number of events received and lost on enqueue
* - EventNum_t eventSuccess; // events successfully processed
Expand All @@ -211,6 +265,13 @@ void recvStatsThread(Reassembler *r)
if (stats.get<5>() != E2SARErrorc::NoError)
std::cout << "\tLast E2SARError code: " << stats.get<5>() << std::endl;

std::cout << "\tEvents lost so far: ";
for(auto evt: lostEvents)
{
std::cout << "<" << evt.first << ":" << evt.second << "> ";
}
std::cout << std::endl;

auto until = nowT + boost::chrono::milliseconds(reportThreadSleepMs);
boost::this_thread::sleep_until(until);
}
Expand All @@ -228,10 +289,14 @@ int main(int argc, char **argv)
u_int16_t mtu;
u_int32_t eventSourceId;
u_int16_t dataId;
size_t numThreads;
size_t numThreads, numSockets;
float rateGbps;
int sockBufSize;
int durationSec;
bool withCP;
std::string sndrcvIP;
std::string iniFile;
u_int16_t recvStartPort;

// parameters
opts("send,s", "send traffic");
Expand All @@ -244,11 +309,20 @@ int main(int argc, char **argv)
opts("mtu,m", po::value<u_int16_t>(&mtu)->default_value(1500), "MTU (default 1500) [s]");
opts("src", po::value<u_int32_t>(&eventSourceId)->default_value(1234), "Event source (default 1234) [s]");
opts("dataid", po::value<u_int16_t>(&dataId)->default_value(4321), "Data id (default 4321) [s]");
opts("threads,t", po::value<size_t>(&numThreads)->default_value(1), "number of receive threads (defaults to 1) [r]");
opts("threads", po::value<size_t>(&numThreads)->default_value(1), "number of receive threads (defaults to 1) [r]");
opts("sockets", po::value<size_t>(&numSockets)->default_value(4), "number of send sockets (defaults to 4) [r]");
opts("rate", po::value<float>(&rateGbps)->default_value(1.0), "send rate in Gbps (defaults to 1.0)");
opts("period,p", po::value<u_int16_t>(&reportThreadSleepMs)->default_value(1000), "receive side reporting thread sleep period in ms (defaults to 1000) [r]");
opts("bufsize,b", po::value<int>(&sockBufSize)->default_value(1024*1024*3), "send or receive socket buffer size (default to 3MB)");
opts("duration,d", po::value<int>(&durationSec)->default_value(0), "duration for receiver to run for (defaults to 0 - until Ctrl-C is presses)");
opts("duration,d", po::value<int>(&durationSec)->default_value(0), "duration for receiver to run for (defaults to 0 - until Ctrl-C is pressed)");
opts("withcp,c", po::bool_switch()->default_value(false), "enable control plane interactions");
opts("ini,i", po::value<std::string>(&iniFile)->default_value(""), "INI file to initialize SegmenterFlags [s]] or ReassemblerFlags [r]."
" Values found in the file override --withcp, --mtu and --bufsize");
opts("ip", po::value<std::string>(&sndrcvIP)->default_value("127.0.0.1"), "IP address (IPv4 or IPv6) from which sender sends from or on which receiver listens. Defaults to 127.0.0.1. [s,r]");
opts("port", po::value<u_int16_t>(&recvStartPort)->default_value(10000), "Starting UDP port number on which receiver listens. Defaults to 10000. [r] ");
opts("ipv6,6", "force using IPv6 control plane address if URI specifies hostname (disables cert validation) [s,r]");
opts("ipv4,4", "force using IPv4 control plane address if URI specifies hostname (disables cert validation) [s,r]");
opts("novalidate,v", "don't validate server certificate");

po::variables_map vm;

Expand All @@ -270,6 +344,11 @@ int main(int argc, char **argv)
conflicting_options(vm, "recv", "rate");
conflicting_options(vm, "send", "threads");
conflicting_options(vm, "send", "period");
conflicting_options(vm, "ipv4", "ipv6");
option_dependency(vm, "recv", "ip");
option_dependency(vm, "recv", "port");
option_dependency(vm, "send", "ip");
conflicting_options(vm, "send", "port");
}
catch (const std::logic_error &le)
{
Expand All @@ -284,25 +363,81 @@ int main(int argc, char **argv)
return 0;
}

withCP = vm["withcp"].as<bool>();

bool preferV6 = false;
if (vm.count("ipv6"))
{
preferV6 = true;
}

// if ipv4 or ipv6 requested explicitly
bool preferHostAddr = false;
if (vm.count("ipv6") || vm.count("ipv4"))
preferHostAddr = true;

bool validate = true;
if (vm.count("novalidate"))
validate = false;

// make sure the token is interpreted as the correct type, depending on the call
EjfatURI::TokenType tt{EjfatURI::TokenType::instance};

std::string ejfat_uri;
if (vm.count("send") || vm.count("recv"))
{
auto uri_r = (vm.count("uri") ? EjfatURI::getFromString(vm["uri"].as<std::string>(), tt) :
EjfatURI::getFromEnv("EJFAT_URI"s, tt));
auto uri_r = (vm.count("uri") ? EjfatURI::getFromString(vm["uri"].as<std::string>(), tt, preferV6) :
EjfatURI::getFromEnv("EJFAT_URI"s, tt, preferV6));
if (uri_r.has_error())
{
std::cerr << "Error in parsing URI from command-line, error "s + uri_r.error().message() << std::endl;
return -1;
}
auto uri = uri_r.value();
if (vm.count("send")) {
// if using control plane
if (withCP)
{
// add to senders list of 1 element
senders.push_back(sndrcvIP);

// create LBManager
lbmPtr = new LBManager(uri, validate, preferHostAddr);

// register senders
std::cout << "Adding senders to LB: ";
for (auto s: senders)
std::cout << s << " ";
std::cout << std::endl;
auto addres = lbmPtr->addSenders(senders);
if (addres.has_error())
{
std::cerr << "Unable to add a sender due to error " << addres.error().message()
<< ", exiting" << std::endl;
return -1;
}
}

Segmenter::SegmenterFlags sflags;
sflags.useCP = false; // turn off CP sync
sflags.mtu = mtu;
sflags.sndSocketBufSize = sockBufSize;
if (!iniFile.empty())
{
std::cout << "Loading SegmenterFlags from " << iniFile << std::endl;
auto sflagsRes = Segmenter::SegmenterFlags::getFromINI(iniFile);
if (sflagsRes.has_error())
{
std::cerr << "Unable to parse SegmenterFlags INI file " << iniFile << std::endl;
return -1;
}
sflags = sflagsRes.value();
} else {
sflags.useCP = withCP;
sflags.mtu = mtu;
sflags.sndSocketBufSize = sockBufSize;
sflags.numSendSockets = numSockets;
}
std::cout << "Control plane will be " << (sflags.useCP ? "ON" : "OFF") << std::endl;
std::cout << (sflags.useCP ? "*** Make sure the LB has been reserved and the URI reflects the reserved instance information." :
"*** Make sure the URI reflects proper data address, other parts are ignored.") << std::endl;

try {
Segmenter seg(uri, dataId, eventSourceId, sflags);
Expand All @@ -319,11 +454,31 @@ int main(int argc, char **argv)
} else if (vm.count("recv")) {
Reassembler::ReassemblerFlags rflags;

rflags.useCP = false; // turn off CP gRPC
rflags.withLBHeader = true; // no LB
rflags.rcvSocketBufSize = sockBufSize;
if (!iniFile.empty())
{
std::cout << "Loading ReassemblerFlags from " << iniFile << std::endl;
auto rflagsRes = Reassembler::ReassemblerFlags::getFromINI(iniFile);
if (rflagsRes.has_error())
{
std::cerr << "Unable to parse ReassemblerFlags INI file " << iniFile << std::endl;
return -1;
}
rflags = rflagsRes.value();
} else
{
rflags.useCP = withCP;
rflags.withLBHeader = not withCP;
rflags.rcvSocketBufSize = sockBufSize;
rflags.useHostAddress = preferHostAddr;
rflags.validateCert = validate;
}
std::cout << "Control plane will be " << (rflags.useCP ? "ON" : "OFF") << std::endl;
std::cout << (rflags.useCP ? "*** Make sure the LB has been reserved and the URI reflects the reserved instance information." :
"*** Make sure the URI reflects proper data address, other parts are ignored.") << std::endl;

try {
Reassembler reas(uri, numThreads, rflags);
ip::address ip = ip::make_address(sndrcvIP);
Reassembler reas(uri, ip, recvStartPort, numThreads, rflags);
reasPtr = &reas;
boost::thread statT(&recvStatsThread, &reas);
auto res = recvEvents(reas, durationSec);
Expand Down
Loading

0 comments on commit 3f5179c

Please sign in to comment.