Skip to content

Commit

Permalink
added configuration for stream_read and stream_read_write
Browse files Browse the repository at this point in the history
  • Loading branch information
Raghav Rawat committed Dec 17, 2024
1 parent 1adcf4f commit 19487b8
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 19 deletions.
7 changes: 5 additions & 2 deletions source/config/localhost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
"root_cert": ""
},
"moniker_stream_processor": {
"moniker_stream_read_write": -1,
"moniker_stream_write": -1
"moniker_sideband_stream_read_write": -1,
"moniker_stream_write": -1,
"moniker_stream_read": -1,
"moniker_stream_read_write": -1

}
}
38 changes: 24 additions & 14 deletions source/server/data_moniker_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,31 @@ void DataMonikerService::InitiateMonikerList(const MonikerList& monikers, Endpoi
}
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
#ifndef _WIN32
void set_moniker_stream_processor(int stream_processor)
{
if(stream_processor >= 0) {
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(stream_processor, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
}
}
#endif

//---------------------------------------------------------------------
//---------------------------------------------------------------------
void DataMonikerService::RunSidebandReadWriteLoop(string sidebandIdentifier, ::SidebandStrategy strategy, EndpointList* readers, EndpointList* writers)
{
#ifndef _WIN32
if ((strategy == ::SidebandStrategy::RDMA_LOW_LATENCY ||
strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) && c_StreamProcessor.moniker_stream_read_write >= 0) {
strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) && c_StreamProcessor.moniker_sideband_stream_read_write >= 0) {
pid_t threadId = syscall(SYS_gettid);
::SysFsWrite("/dev/cgroup/cpuset/LabVIEW_tl_set/tasks", std::to_string(threadId));

set_moniker_stream_processor(c_StreamProcessor.moniker_stream_read_write);
set_moniker_stream_processor(c_StreamProcessor.moniker_sideband_stream_read_write);
}
#endif

Expand Down Expand Up @@ -195,6 +209,10 @@ Status DataMonikerService::BeginSidebandStream(ServerContext* context, const Beg
//---------------------------------------------------------------------
Status DataMonikerService::StreamReadWrite(ServerContext* context, ServerReaderWriter<MonikerReadResponse, MonikerWriteRequest>* stream)
{
#ifndef _WIN32
set_moniker_stream_processor(c_StreamProcessor.moniker_stream_read_write);
#endif

EndpointList writers;
EndpointList readers;
MonikerWriteRequest writeRequest;
Expand Down Expand Up @@ -223,6 +241,10 @@ Status DataMonikerService::StreamReadWrite(ServerContext* context, ServerReaderW
//---------------------------------------------------------------------
Status DataMonikerService::StreamRead(ServerContext* context, const MonikerList* request, ServerWriter<MonikerReadResponse>* writer)
{
#ifndef _WIN32
set_moniker_stream_processor(c_StreamProcessor.moniker_stream_read);
#endif

EndpointList writers;
EndpointList readers;
InitiateMonikerList(*request, &readers, &writers);
Expand Down Expand Up @@ -265,16 +287,4 @@ Status DataMonikerService::StreamWrite(ServerContext* context, ServerReaderWrite
}
return Status::OK;
}

#ifndef _WIN32
void set_moniker_stream_processor(int stream_processor)
{
if(stream_processor >= 0) {
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(stream_processor, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
}
}
#endif
} // namespace ni::data_monikers
4 changes: 3 additions & 1 deletion source/server/moniker_stream_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#define MONIKER_STREAM_PROCESSOR_H

struct MonikerStreamProcessor {
int moniker_stream_read_write = -1;
int moniker_sideband_stream_read_write = -1;
int moniker_stream_write = -1;
int moniker_stream_read = -1;
int moniker_stream_read_write = -1;
};

#endif // Moniker_Stream_Processor_H
8 changes: 6 additions & 2 deletions source/server/server_configuration_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ static const char* kPortJsonKey = "port";
static const char* kSidebandAddressJsonKey = "sideband_address";
static const char* kSidebandPortJsonKey = "sideband_port";
static const char* kMonikerStreamProcessorKey = "moniker_stream_processor_configuration";
static const char* kMonikerStreamReadWriteKey = "moniker_stream_read_write";
static const char* kMonikerSidebandStreamReadWriteKey = "moniker_sideband_stream_read_write";
static const char* kMonikerStreamWriteKey = "moniker_stream_write";
static const char* kMonikerStreamReadKey = "moniker_stream_read";
static const char* kMonikerStreamReadWriteKey = "moniker_stream_read_write";
static const char* kServerCertJsonKey = "server_cert";
static const char* kServerKeyJsonKey = "server_key";
static const char* kRootCertJsonKey = "root_cert";
Expand Down Expand Up @@ -297,8 +299,10 @@ MonikerStreamProcessor ServerConfigurationParser::parse_moniker_stream_processor

auto core_config_it = config_file_.find(kMonikerStreamProcessorKey);
if (core_config_it != config_file_.end()) {
stream_processor.moniker_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerStreamReadWriteKey);
stream_processor.moniker_sideband_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerSidebandStreamReadWriteKey);
stream_processor.moniker_stream_write = parse_moniker_stream_processor_with_key(kMonikerStreamWriteKey);
stream_processor.moniker_stream_read = parse_moniker_stream_processor_with_key(kMonikerStreamReadKey);
stream_processor.moniker_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerStreamReadWriteKey);
}
return stream_processor;
}
Expand Down

0 comments on commit 19487b8

Please sign in to comment.