Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EW-8447 Fix CPU profiling again #2571

Merged
merged 5 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2311,8 +2311,10 @@ struct MessageQueue {

class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel {
public:
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam, kj::WebSocket& webSocket)
: ioHandler(webSocket),
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam,
kj::Own<const kj::Executor> isolateThreadExecutor,
kj::WebSocket& webSocket)
: ioHandler(kj::mv(isolateThreadExecutor), webSocket),
state(kj::heap<State>(this, kj::mv(isolateParam))) {
ioHandler.connect(*this);
}
Expand Down Expand Up @@ -2561,10 +2563,11 @@ private:
// the InspectorChannelImpl and the InspectorClient.
class WebSocketIoHandler final {
public:
WebSocketIoHandler(kj::WebSocket& webSocket): webSocket(webSocket) {
WebSocketIoHandler(kj::Own<const kj::Executor> isolateThreadExecutor, kj::WebSocket& webSocket)
: isolateThreadExecutor(kj::mv(isolateThreadExecutor)),
webSocket(webSocket) {
// Assume we are being instantiated on the InspectorService thread, the thread that will do
// I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread.
incomingQueueNotifier = XThreadNotifier::create();
outgoingQueueNotifier = XThreadNotifier::create();
}

Expand Down Expand Up @@ -2595,7 +2598,34 @@ private:
// Message pumping promise that should be evaluated on the InspectorService
// thread.
kj::Promise<void> messagePump() {
return receiveLoop().exclusiveJoin(dispatchLoop()).exclusiveJoin(transmitLoop());
// Although inspector I/O must happen on the InspectorService thread (to make sure breakpoints
// don't block inspector I/O), inspector messages must be actually dispatched on the Isolate
// thread. So, we run the dispatch loop on the Isolate thread.
//
// Note that the above comment is only really accurate in vanilla workerd. In the case of the
// internal Cloudflare Workers runtime, `isolateThreadExecutor` may actually refer to the
// current thread's `kj::Executor`. That's fine; calling `executeAsync()` on the current
// thread's executor just posts the task to the event loop, and everything works as expected.

// Since the dispatch loop and the receive loop communicate over a XThreadNotifier, and
// XThreadNotifiers must be created on the thread which will call their `awaitNotification()`
// function, we awkwardly perform two `executeAsync()`s here, one to create the
// XThreadNotifier, then another to spawn the dispatch loop.
//
// We create a new XThreadNotifier for each `messagePump()` call, rather than try to re-use
// one long-term, because XThreadNotifiers' `awaitNotification()` function is not cancel-safe.
// That is, once its promise is cancelled, the notifier is broken.
auto incomingQueueNotifier =
co_await isolateThreadExecutor->executeAsync([]() { return XThreadNotifier::create(); });

auto dispatchLoopPromise = isolateThreadExecutor->executeAsync(
[this, notifier = kj::atomicAddRef(*incomingQueueNotifier)]() mutable {
return dispatchLoop(kj::mv(notifier));
});

co_return co_await receiveLoop(kj::mv(incomingQueueNotifier))
.exclusiveJoin(kj::mv(dispatchLoopPromise))
.exclusiveJoin(transmitLoop());
}

void send(kj::String message) {
Expand Down Expand Up @@ -2636,7 +2666,8 @@ private:
outgoingQueueNotifier->notify();
}

kj::Promise<void> receiveLoop() {
// Must be called on the InspectorService thread.
kj::Promise<void> receiveLoop(kj::Own<XThreadNotifier> incomingQueueNotifier) {
for (;;) {
auto message = co_await webSocket.receive(MAX_MESSAGE_SIZE);
KJ_SWITCH_ONEOF(message) {
Expand All @@ -2658,7 +2689,8 @@ private:
}
}

kj::Promise<void> dispatchLoop() {
// Must be called on the Isolate thread.
kj::Promise<void> dispatchLoop(kj::Own<XThreadNotifier> incomingQueueNotifier) {
for (;;) {
co_await incomingQueueNotifier->awaitNotification();
KJ_IF_SOME(c, channel) {
Expand All @@ -2667,6 +2699,7 @@ private:
}
}

// Must be called on the InspectorService thread.
kj::Promise<void> transmitLoop() {
for (;;) {
co_await outgoingQueueNotifier->awaitNotification();
Expand All @@ -2693,10 +2726,17 @@ private:
}
}

// We need access to the Isolate thread's kj::Executor to run the inspector dispatch loop. This
// doesn't actually have to be an Own, because the Isolate thread will destroy the Isolate
// before it exits, but it doesn't hurt.
kj::Own<const kj::Executor> isolateThreadExecutor;

kj::MutexGuarded<MessageQueue> incomingQueue;
kj::Own<XThreadNotifier> incomingQueueNotifier;
// The notifier for `incomingQueue`, `incomingQueueNotifier`, is created once per
// `messagePump()` call, and never re-used, so it doesn't live here.

kj::MutexGuarded<MessageQueue> outgoingQueue;
// This XThreadNotifier must be created on the InspectorService thread.
kj::Own<XThreadNotifier> outgoingQueueNotifier;

kj::WebSocket& webSocket; // only accessed on the InspectorService thread.
Expand Down Expand Up @@ -2850,11 +2890,20 @@ kj::Promise<void> Worker::Isolate::attachInspector(kj::Timer& timer,
headers.set(controlHeaderId, "{\"ewLog\":{\"status\":\"ok\"}}");
auto webSocket = response.acceptWebSocket(headers);

return attachInspector(timer, timerOffset, *webSocket).attach(kj::mv(webSocket));
// This `attachInspector()` overload is used by the internal Cloudflare Workers runtime, which has
// no concept of a single Isolate thread. Instead, it's okay for all inspector messages to be
// dispatched on the calling thread.
auto executor = kj::getCurrentThreadExecutor().addRef();

return attachInspector(kj::mv(executor), timer, timerOffset, *webSocket)
.attach(kj::mv(webSocket));
}

kj::Promise<void> Worker::Isolate::attachInspector(
kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const {
kj::Own<const kj::Executor> isolateThreadExecutor,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const {
KJ_REQUIRE(impl->inspector != kj::none);

return jsg::runInV8Stack([&](jsg::V8StackScope& stackScope) {
Expand All @@ -2872,8 +2921,8 @@ kj::Promise<void> Worker::Isolate::attachInspector(

lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset);

auto channel =
kj::heap<Worker::Isolate::InspectorChannelImpl>(kj::atomicAddRef(*this), webSocket);
auto channel = kj::heap<Worker::Isolate::InspectorChannelImpl>(
kj::atomicAddRef(*this), kj::mv(isolateThreadExecutor), webSocket);
lockedSelf.currentInspectorSession = *channel;
lockedSelf.impl->inspectorClient.setChannel(*channel);

Expand Down
22 changes: 20 additions & 2 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,32 @@ class Worker::Isolate: public kj::AtomicRefcounted {
uint getLockSuccessCount() const;

// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
// Also adds a special JSON value to the header identified by `controlHeaderId`, for compatibility
// with internal Cloudflare systems.
//
// This overload will dispatch all inspector messages on the _calling thread's_ `kj::Executor`.
// When linked against vanilla V8, this means that CPU profiling will only profile JavaScript
// running on the _calling thread_, which will most likely only be inspector console commands, and
// is not typically desired.
//
// For the above reason , this overload is curently only suitable for use by the internal Workers
// Runtime codebase, which patches V8 to profile whichever thread currently holds the `v8::Locker`
// for this Isolate.
kj::Promise<void> attachInspector(kj::Timer& timer,
kj::Duration timerOffset,
kj::HttpService::Response& response,
const kj::HttpHeaderTable& headerTable,
kj::HttpHeaderId controlHeaderId) const;

kj::Promise<void> attachInspector(
kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const;
// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
//
// This overload will dispatch all inspector messages on the `kj::Executor` passed in via
// `isolateThreadExecutor`. For CPU profiling to work as expected, this `kj::Executor` must be
// associated with the same thread which executes the Worker's JavaScript.
kj::Promise<void> attachInspector(kj::Own<const kj::Executor> isolateThreadExecutor,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const;

// Log a warning to the inspector if attached, and log an INFO severity message.
void logWarning(kj::StringPtr description, Worker::Lock& lock);
Expand Down
28 changes: 22 additions & 6 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1230,10 +1230,12 @@ private:
// to define the inspector socket.
class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler {
public:
InspectorService(kj::Timer& timer,
InspectorService(kj::Own<const kj::Executor> isolateThreadExecutor,
kj::Timer& timer,
kj::HttpHeaderTable::Builder& headerTableBuilder,
InspectorServiceIsolateRegistrar& registrar)
: timer(timer),
: isolateThreadExecutor(kj::mv(isolateThreadExecutor)),
timer(timer),
headerTable(headerTableBuilder.getFutureTable()),
server(timer, headerTable, *this, kj::HttpServerSettings{.errorHandler = *this}),
registrar(registrar) {
Expand Down Expand Up @@ -1288,7 +1290,8 @@ public:
auto webSocket = response.acceptWebSocket(responseHeaders);
kj::Duration timerOffset = 0 * kj::MILLISECONDS;
try {
co_return co_await ref->attachInspector(timer, timerOffset, *webSocket);
co_return co_await ref->attachInspector(
isolateThreadExecutor->addRef(), timer, timerOffset, *webSocket);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
if (exception.getType() == kj::Exception::Type::DISCONNECTED) {
Expand Down Expand Up @@ -1407,6 +1410,7 @@ public:
}

private:
kj::Own<const kj::Executor> isolateThreadExecutor;
kj::Timer& timer;
kj::HttpHeaderTable& headerTable;
kj::HashMap<kj::String, kj::Own<const Worker::Isolate::WeakIsolateRef>> isolates;
Expand Down Expand Up @@ -3481,14 +3485,26 @@ uint startInspector(
static constexpr uint DEFAULT_PORT = 9229;
kj::MutexGuarded<uint> inspectorPort(UNASSIGNED_PORT);

kj::Thread thread([inspectorAddress, &inspectorPort, &registrar]() {
// `startInspector()` is called on the Isolate thread. V8 requires CPU profiling to be started and
// stopped on the same thread which executes JavaScript -- that is, the Isolate thread -- which
// means we need to dispatch inspector messages on this thread. To help make that happen, we
// capture this thread's kj::Executor here, and pass it into the InspectorService below. Later,
// when the InspectorService receives a WebSocket connection, it calls
// `Isolate::attachInspector()`, which uses the kj::Executor we create here to create a
// XThreadNotifier and start a dispatch loop. The InspectorService reads subsequent WebSocket
// inspector messages and feeds them to that dispatch loop via the XThreadNotifier.
auto isolateThreadExecutor = kj::getCurrentThreadExecutor().addRef();

// Start the InspectorService thread.
kj::Thread thread([inspectorAddress, &inspectorPort, &registrar,
isolateThreadExecutor = kj::mv(isolateThreadExecutor)]() mutable {
kj::AsyncIoContext io = kj::setupAsyncIo();

kj::HttpHeaderTable::Builder headerTableBuilder;

// Create the special inspector service.
auto inspectorService(
kj::heap<Server::InspectorService>(io.provider->getTimer(), headerTableBuilder, registrar));
auto inspectorService(kj::heap<Server::InspectorService>(
kj::mv(isolateThreadExecutor), io.provider->getTimer(), headerTableBuilder, registrar));
auto ownHeaderTable = headerTableBuilder.build();

// Configure and start the inspector socket.
Expand Down
126 changes: 67 additions & 59 deletions src/workerd/server/tests/inspector/driver.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ beforeEach(async () => {
});

await workerd.start();

// We wait for the worker's HTTP port to come online before starting the test case. If we don't,
// and the inspector port comes online first, there's a chance the inspector connection will fail
// with 404 because the isolate doesn't exist yet.
await workerd.getListenPort('http');
});

// Stop workerd.
Expand All @@ -49,68 +54,71 @@ async function connectInspector(port) {
});
}

// TODO(soon): This test reproduces a null pointer dereference in workerd (possibly the same issue
// as https://github.com/cloudflare/workerd/issues/2564), but the test doesn't fail. :(
test('Can repeatedly connect and disconnect to the inspector port', async () => {
for (let i = 0; i < 5; ++i) {
let inspectorClient = await connectInspector(
await workerd.getListenInspectorPort()
async function profileAndExpectDeriveBitsFrames(inspectorClient) {
// Enable and start profiling.
await inspectorClient.Profiler.enable();
await inspectorClient.Profiler.start();

// Drive the worker with a test request. A single one is sufficient.
let httpPort = await workerd.getListenPort('http');
const response = await fetch(`http://localhost:${httpPort}`);
await response.arrayBuffer();

// Stop and disable profiling.
const profile = await inspectorClient.Profiler.stop();
await inspectorClient.Profiler.disable();

// Figure out which function name was most frequently sampled.
let hitCountMap = new Map();

for (let node of profile.profile.nodes) {
if (hitCountMap.get(node.callFrame.functionName) === undefined) {
hitCountMap.set(node.callFrame.functionName, 0);
}
hitCountMap.set(
node.callFrame.functionName,
hitCountMap.get(node.callFrame.functionName) + node.hitCount
);
}

// Drive the worker with a test request.
let httpPort = await workerd.getListenPort('http');
const response = await fetch(`http://localhost:${httpPort}`);
let body = await response.arrayBuffer();
console.log(body);
let max = {
name: null,
count: 0,
};

await inspectorClient.close();
for (let [name, count] of hitCountMap) {
if (count > max.count) {
max.name = name;
max.count = count;
}
}

// The most CPU-intensive function our test script runs is `deriveBits()`, so we expect that to be
// the most frequently sampled function.
assert.equal(max.name, 'deriveBits');
assert.notEqual(max.count, 0);
}

// Regression test for https://github.com/cloudflare/workerd/issues/1754.
//
// At one time, workerd produced only "(program)" frames.
test('Profiler mostly sees deriveBits() frames', async () => {
let inspectorClient = await connectInspector(
await workerd.getListenInspectorPort()
);
await profileAndExpectDeriveBitsFrames(inspectorClient);
await inspectorClient.close();
});

// TODO(soon): Re-enable once https://github.com/cloudflare/workerd/issues/2564 is solved.
// test("Profiler mostly sees deriveBits() frames", async () => {
// let inspectorClient = await connectInspector(await workerd.getListenInspectorPort());

// // Enable and start profiling.
// await inspectorClient.Profiler.enable();
// await inspectorClient.Profiler.start();

// // Drive the worker with a test request. A single one is sufficient.
// let httpPort = await workerd.getListenPort("http");
// const response = await fetch(`http://localhost:${httpPort}`);
// await response.arrayBuffer();

// // Stop and disable profiling.
// const profile = await inspectorClient.Profiler.stop();
// await inspectorClient.Profiler.disable();

// // Figure out which function name was most frequently sampled.
// let hitCountMap = new Map();

// for (let node of profile.profile.nodes) {
// if (hitCountMap.get(node.callFrame.functionName) === undefined) {
// hitCountMap.set(node.callFrame.functionName, 0);
// }
// hitCountMap.set(node.callFrame.functionName,
// hitCountMap.get(node.callFrame.functionName) + node.hitCount);
// }

// let max = {
// name: null,
// count: 0,
// };

// for (let [name, count] of hitCountMap) {
// if (count > max.count) {
// max.name = name;
// max.count = count;
// }
// }

// // The most CPU-intensive function our test script runs is `deriveBits()`, so we expect that to be
// // the most frequently sampled function.
// assert.equal(max.name, "deriveBits");
// assert.notEqual(max.count, 0);

// await inspectorClient.close();
// });
// Regression test for https://github.com/cloudflare/workerd/issues/2564.
//
// At one time, workerd segfaulted on the second inspector connection.
test('Can repeatedly reconnect the inspector and profiling still works', async () => {
for (let i = 0; i < 4; ++i) {
let inspectorClient = await connectInspector(
await workerd.getListenInspectorPort()
);
await profileAndExpectDeriveBitsFrames(inspectorClient);
await inspectorClient.close();
}
});
Loading