From 19487b891012b24a1a10fc15bddf5a1033bea573 Mon Sep 17 00:00:00 2001 From: Raghav Rawat Date: Tue, 17 Dec 2024 16:20:27 +0530 Subject: [PATCH] added configuration for stream_read and stream_read_write --- source/config/localhost_config.json | 7 +++- source/server/data_moniker_service.cpp | 38 ++++++++++++------- source/server/moniker_stream_processor.h | 4 +- source/server/server_configuration_parser.cpp | 8 +++- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/source/config/localhost_config.json b/source/config/localhost_config.json index 8307dbe38..b48fd6c45 100644 --- a/source/config/localhost_config.json +++ b/source/config/localhost_config.json @@ -9,7 +9,10 @@ "root_cert": "" }, "moniker_stream_processor": { - "moniker_stream_read_write": -1, - "moniker_stream_write": -1 + "moniker_sideband_stream_read_write": -1, + "moniker_stream_write": -1, + "moniker_stream_read": -1, + "moniker_stream_read_write": -1 + } } \ No newline at end of file diff --git a/source/server/data_moniker_service.cpp b/source/server/data_moniker_service.cpp index 09a786529..37d7cfb2b 100644 --- a/source/server/data_moniker_service.cpp +++ b/source/server/data_moniker_service.cpp @@ -111,17 +111,31 @@ void DataMonikerService::InitiateMonikerList(const MonikerList& monikers, Endpoi } } +//--------------------------------------------------------------------- +//--------------------------------------------------------------------- +#ifndef _WIN32 +void set_moniker_stream_processor(int stream_processor) +{ + if(stream_processor >= 0) { + cpu_set_t cpuSet; + CPU_ZERO(&cpuSet); + CPU_SET(stream_processor, &cpuSet); + sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet); + } +} +#endif + //--------------------------------------------------------------------- //--------------------------------------------------------------------- void DataMonikerService::RunSidebandReadWriteLoop(string sidebandIdentifier, ::SidebandStrategy strategy, EndpointList* readers, EndpointList* writers) { #ifndef _WIN32 if ((strategy == ::SidebandStrategy::RDMA_LOW_LATENCY || - strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) && c_StreamProcessor.moniker_stream_read_write >= 0) { + strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) && c_StreamProcessor.moniker_sideband_stream_read_write >= 0) { pid_t threadId = syscall(SYS_gettid); ::SysFsWrite("/dev/cgroup/cpuset/LabVIEW_tl_set/tasks", std::to_string(threadId)); - set_moniker_stream_processor(c_StreamProcessor.moniker_stream_read_write); + set_moniker_stream_processor(c_StreamProcessor.moniker_sideband_stream_read_write); } #endif @@ -195,6 +209,10 @@ Status DataMonikerService::BeginSidebandStream(ServerContext* context, const Beg //--------------------------------------------------------------------- Status DataMonikerService::StreamReadWrite(ServerContext* context, ServerReaderWriter* stream) { +#ifndef _WIN32 + set_moniker_stream_processor(c_StreamProcessor.moniker_stream_read_write); +#endif + EndpointList writers; EndpointList readers; MonikerWriteRequest writeRequest; @@ -223,6 +241,10 @@ Status DataMonikerService::StreamReadWrite(ServerContext* context, ServerReaderW //--------------------------------------------------------------------- Status DataMonikerService::StreamRead(ServerContext* context, const MonikerList* request, ServerWriter* writer) { +#ifndef _WIN32 + set_moniker_stream_processor(c_StreamProcessor.moniker_stream_read); +#endif + EndpointList writers; EndpointList readers; InitiateMonikerList(*request, &readers, &writers); @@ -265,16 +287,4 @@ Status DataMonikerService::StreamWrite(ServerContext* context, ServerReaderWrite } return Status::OK; } - -#ifndef _WIN32 -void set_moniker_stream_processor(int stream_processor) -{ - if(stream_processor >= 0) { - cpu_set_t cpuSet; - CPU_ZERO(&cpuSet); - CPU_SET(stream_processor, &cpuSet); - sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet); - } -} -#endif } // namespace ni::data_monikers diff --git a/source/server/moniker_stream_processor.h b/source/server/moniker_stream_processor.h index 44bff77b4..dca03ceb0 100644 --- a/source/server/moniker_stream_processor.h +++ b/source/server/moniker_stream_processor.h @@ -2,8 +2,10 @@ #define MONIKER_STREAM_PROCESSOR_H struct MonikerStreamProcessor { - int moniker_stream_read_write = -1; + int moniker_sideband_stream_read_write = -1; int moniker_stream_write = -1; + int moniker_stream_read = -1; + int moniker_stream_read_write = -1; }; #endif // Moniker_Stream_Processor_H diff --git a/source/server/server_configuration_parser.cpp b/source/server/server_configuration_parser.cpp index 473ab2307..b343712f2 100644 --- a/source/server/server_configuration_parser.cpp +++ b/source/server/server_configuration_parser.cpp @@ -21,8 +21,10 @@ static const char* kPortJsonKey = "port"; static const char* kSidebandAddressJsonKey = "sideband_address"; static const char* kSidebandPortJsonKey = "sideband_port"; static const char* kMonikerStreamProcessorKey = "moniker_stream_processor_configuration"; -static const char* kMonikerStreamReadWriteKey = "moniker_stream_read_write"; +static const char* kMonikerSidebandStreamReadWriteKey = "moniker_sideband_stream_read_write"; static const char* kMonikerStreamWriteKey = "moniker_stream_write"; +static const char* kMonikerStreamReadKey = "moniker_stream_read"; +static const char* kMonikerStreamReadWriteKey = "moniker_stream_read_write"; static const char* kServerCertJsonKey = "server_cert"; static const char* kServerKeyJsonKey = "server_key"; static const char* kRootCertJsonKey = "root_cert"; @@ -297,8 +299,10 @@ MonikerStreamProcessor ServerConfigurationParser::parse_moniker_stream_processor auto core_config_it = config_file_.find(kMonikerStreamProcessorKey); if (core_config_it != config_file_.end()) { - stream_processor.moniker_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerStreamReadWriteKey); + stream_processor.moniker_sideband_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerSidebandStreamReadWriteKey); stream_processor.moniker_stream_write = parse_moniker_stream_processor_with_key(kMonikerStreamWriteKey); + stream_processor.moniker_stream_read = parse_moniker_stream_processor_with_key(kMonikerStreamReadKey); + stream_processor.moniker_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerStreamReadWriteKey); } return stream_processor; }