Skip to content

Commit b64af24

Browse files
authored
[ESI][Cosim][NFC] Refactor cosim to divorce capnp from DPI (#7045)
Move generic functionality into Capnp library. This divorces it from the DPI server, allowing other things to use it. Additionally, divorce server from general functionality to make adding a client thread easier.
1 parent 6e028d6 commit b64af24

File tree

8 files changed

+92
-51
lines changed

8 files changed

+92
-51
lines changed

lib/Dialect/ESI/runtime/cosim/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ add_library(EsiCosimCapnp SHARED
1818
${COSIM_CAPNP_HDRS}
1919
${COSIM_CAPNP_SRCS}
2020
${COSIM_SCHEMA_HDR}
21+
22+
lib/CapnpThreads.cpp
23+
lib/Endpoint.cpp
24+
lib/Server.cpp
2125
)
2226
target_include_directories(EsiCosimCapnp PUBLIC ${CAPNPC_OUTPUT_DIR})
2327
target_include_directories(EsiCosimCapnp PUBLIC ${CAPNP_INCLUDE_DIRS})

lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
add_library(EsiCosimDpiServer SHARED
88
DpiEntryPoints.cpp
9-
Server.cpp
10-
Endpoint.cpp
119
)
1210
set_target_properties(EsiCosimDpiServer
1311
PROPERTIES

lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
//
1717
//===----------------------------------------------------------------------===//
1818

19-
#include "cosim/Server.h"
20-
#include "cosim/dpi.h"
19+
#include "cosim/CapnpThreads.h"
20+
#include "dpi.h"
2121

2222
#include <algorithm>
2323
#include <cassert>

lib/Dialect/ESI/runtime/cosim/include/cosim/Server.h renamed to lib/Dialect/ESI/runtime/cosim/include/cosim/CapnpThreads.h

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//===- Server.h - ESI cosim RPC servers -------------------------*- C++ -*-===//
1+
//===- CapnpThreads.h - ESI cosim RPC ---------------------------*- C++ -*-===//
22
//
33
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
44
// See https://llvm.org/LICENSE.txt for license information.
@@ -20,24 +20,46 @@
2020
#include "cosim/LowLevel.h"
2121
#include <thread>
2222

23+
namespace kj {
24+
class WaitScope;
25+
} // namespace kj
26+
2327
namespace esi {
2428
namespace cosim {
2529

26-
/// The main RpcServer. Does not implement any capnp RPC interfaces but contains
27-
/// the capnp main RPC server. We run the capnp server in its own thread to be
28-
/// more responsive to network traffic and so as to not slow down the
29-
/// simulation.
30-
class RpcServer {
30+
/// Since Capnp is not thread-safe, client and server must be run in their own
31+
/// threads and communicate with the outside world through thread safe channels.
32+
class CapnpCosimThread {
3133
public:
3234
EndpointRegistry endpoints;
3335
LowLevel lowLevelBridge;
3436

35-
RpcServer();
36-
~RpcServer();
37+
CapnpCosimThread();
38+
~CapnpCosimThread();
39+
40+
/// Stop the thread. This is a blocking call -- it will not return until the
41+
/// capnp thread has stopped.
42+
void stop();
43+
44+
protected:
45+
/// Start capnp polling loop. Does not return until stop() is called. Must be
46+
/// called in the same thread the RPC server/client was created.
47+
void loop(kj::WaitScope &waitScope);
48+
49+
using Lock = std::lock_guard<std::mutex>;
50+
std::thread *myThread;
51+
volatile bool stopSig;
52+
std::mutex m;
53+
};
3754

55+
/// The main RpcServer. Does not implement any capnp RPC interfaces but contains
56+
/// the capnp main RPC server. We run the capnp server in its own thread to be
57+
/// more responsive to network traffic and so as to not slow down the
58+
/// simulation.
59+
class RpcServer : public CapnpCosimThread {
60+
public:
3861
/// Start and stop the server thread.
3962
void run(uint16_t port);
40-
void stop();
4163

4264
void setManifest(unsigned int esiVersion,
4365
const std::vector<uint8_t> &manifest) {
@@ -46,15 +68,9 @@ class RpcServer {
4668
}
4769

4870
private:
49-
using Lock = std::lock_guard<std::mutex>;
50-
5171
/// The thread's main loop function. Exits on shutdown.
5272
void mainLoop(uint16_t port);
5373

54-
std::thread *mainThread;
55-
volatile bool stopSig;
56-
std::mutex m;
57-
5874
unsigned int esiVersion = -1;
5975
std::vector<uint8_t> compressedManifest;
6076
};
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
//===- CapnpThreads.cpp - Cosim RPC common code -----------------*- C++ -*-===//
2+
//
3+
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4+
// See https://llvm.org/LICENSE.txt for license information.
5+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6+
//
7+
//===----------------------------------------------------------------------===//
8+
9+
#include "cosim/CapnpThreads.h"
10+
#include "CosimDpi.capnp.h"
11+
#include <capnp/ez-rpc.h>
12+
#include <thread>
13+
#ifdef _WIN32
14+
#include <io.h>
15+
#else
16+
#include <unistd.h>
17+
#endif
18+
19+
using namespace capnp;
20+
using namespace esi::cosim;
21+
22+
CapnpCosimThread::CapnpCosimThread() : myThread(nullptr), stopSig(false) {}
23+
CapnpCosimThread::~CapnpCosimThread() { stop(); }
24+
25+
void CapnpCosimThread::loop(kj::WaitScope &waitScope) {
26+
// OK, this is uber hacky, but it unblocks me and isn't _too_ inefficient. The
27+
// problem is that I can't figure out how read the stop signal from libkj
28+
// asyncrony land.
29+
//
30+
// IIRC the main libkj wait loop uses `select()` (or something similar on
31+
// Windows) on its FDs. As a result, any code which checks the stop variable
32+
// doesn't run until there is some I/O. Probably the right way is to set up a
33+
// pipe to deliver a shutdown signal.
34+
//
35+
// TODO: Figure out how to do this properly, if possible.
36+
while (!stopSig) {
37+
waitScope.poll();
38+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
39+
}
40+
}
41+
42+
/// Signal the RPC server thread to stop. Wait for it to exit.
43+
void CapnpCosimThread::stop() {
44+
Lock g(m);
45+
if (myThread == nullptr) {
46+
fprintf(stderr, "CapnpCosimThread not Run()\n");
47+
} else if (!stopSig) {
48+
stopSig = true;
49+
myThread->join();
50+
}
51+
}

lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/Server.cpp renamed to lib/Dialect/ESI/runtime/cosim/lib/Server.cpp

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
//
1818
//===----------------------------------------------------------------------===//
1919

20-
#include "cosim/Server.h"
2120
#include "CosimDpi.capnp.h"
21+
#include "cosim/CapnpThreads.h"
2222
#include <capnp/ez-rpc.h>
2323
#include <thread>
2424
#ifdef _WIN32
@@ -242,9 +242,6 @@ kj::Promise<void> CosimServer::openLowLevel(OpenLowLevelContext ctxt) {
242242

243243
/// ----- RpcServer definitions.
244244

245-
RpcServer::RpcServer() : mainThread(nullptr), stopSig(false) {}
246-
RpcServer::~RpcServer() { stop(); }
247-
248245
/// Write the port number to a file. Necessary when we allow 'EzRpcServer' to
249246
/// select its own port. We can't use stdout/stderr because the flushing
250247
/// semantics are undefined (as in `flush()` doesn't work on all simulators).
@@ -268,40 +265,15 @@ void RpcServer::mainLoop(uint16_t port) {
268265
}
269266
writePort(port);
270267
printf("[COSIM] Listening on port: %u\n", (unsigned int)port);
271-
272-
// OK, this is uber hacky, but it unblocks me and isn't _too_ inefficient. The
273-
// problem is that I can't figure out how read the stop signal from libkj
274-
// asyncrony land.
275-
//
276-
// IIRC the main libkj wait loop uses `select()` (or something similar on
277-
// Windows) on its FDs. As a result, any code which checks the stop variable
278-
// doesn't run until there is some I/O. Probably the right way is to set up a
279-
// pipe to deliver a shutdown signal.
280-
//
281-
// TODO: Figure out how to do this properly, if possible.
282-
while (!stopSig) {
283-
waitScope.poll();
284-
std::this_thread::sleep_for(std::chrono::milliseconds(10));
285-
}
268+
loop(waitScope);
286269
}
287270

288271
/// Start the server if not already started.
289272
void RpcServer::run(uint16_t port) {
290273
Lock g(m);
291-
if (mainThread == nullptr) {
292-
mainThread = new std::thread(&RpcServer::mainLoop, this, port);
274+
if (myThread == nullptr) {
275+
myThread = new std::thread(&RpcServer::mainLoop, this, port);
293276
} else {
294277
fprintf(stderr, "Warning: cannot Run() RPC server more than once!");
295278
}
296279
}
297-
298-
/// Signal the RPC server thread to stop. Wait for it to exit.
299-
void RpcServer::stop() {
300-
Lock g(m);
301-
if (mainThread == nullptr) {
302-
fprintf(stderr, "RpcServer not Run()\n");
303-
} else if (!stopSig) {
304-
stopSig = true;
305-
mainThread->join();
306-
}
307-
}

0 commit comments

Comments
 (0)