Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <workerd/io/compatibility-date.h>
#include <workerd/io/features.h>
#include <workerd/io/io-context.h>
#include <workerd/io/tracer.h>
#include <workerd/jsg/async-context.h>
#include <workerd/jsg/ser.h>
#include <workerd/jsg/util.h>
Expand Down Expand Up @@ -396,7 +397,12 @@ void ServiceWorkerGlobalScope::startScheduled(kj::Date scheduledTime,
KJ_IF_SOME(h, exportedHandler) {
KJ_IF_SOME(f, h.scheduled) {
auto promise =
f(lock, js.alloc<ScheduledController>(event.addRef()), h.env.addRef(isolate), h.getCtx());
f(lock, js.alloc<ScheduledController>(event.addRef()), h.env.addRef(isolate), h.getCtx())
.then([&context]() {
KJ_IF_SOME(t, context.getWorkerTracer()) {
t.setReturn(context.now());
}
});
event->waitUntil(kj::mv(promise));
} else {
lock.logWarningOnce(
Expand Down Expand Up @@ -623,7 +629,9 @@ kj::Promise<void> ServiceWorkerGlobalScope::setHibernatableEventTimeout(
return event;
}

void ServiceWorkerGlobalScope::sendHibernatableWebSocketMessage(
// TODO(cleanup): the hibernatable websocket handler functions here are largely identical – consider
// folding them.
void ServiceWorkerGlobalScope::sendHibernatableWebSocketMessage(IoContext& context,
kj::OneOf<kj::String, kj::Array<byte>> message,
kj::Maybe<uint32_t> eventTimeoutMs,
kj::String websocketId,
Expand All @@ -637,13 +645,19 @@ void ServiceWorkerGlobalScope::sendHibernatableWebSocketMessage(
KJ_IF_SOME(h, exportedHandler) {
KJ_IF_SOME(handler, h.webSocketMessage) {
event->waitUntil(setHibernatableEventTimeout(
handler(lock, kj::mv(websocket), kj::mv(message)), eventTimeoutMs));
handler(lock, kj::mv(websocket), kj::mv(message)), eventTimeoutMs)
.then([&context]() {
KJ_IF_SOME(t, context.getWorkerTracer()) {
t.setReturn(context.now());
}
}));
}
// We want to deliver a message, but if no webSocketMessage handler is exported, we shouldn't fail
}
}

void ServiceWorkerGlobalScope::sendHibernatableWebSocketClose(HibernatableSocketParams::Close close,
void ServiceWorkerGlobalScope::sendHibernatableWebSocketClose(IoContext& context,
HibernatableSocketParams::Close close,
kj::Maybe<uint32_t> eventTimeoutMs,
kj::String websocketId,
Worker::Lock& lock,
Expand All @@ -663,13 +677,19 @@ void ServiceWorkerGlobalScope::sendHibernatableWebSocketClose(HibernatableSocket
KJ_IF_SOME(handler, h.webSocketClose) {
event->waitUntil(setHibernatableEventTimeout(
handler(lock, kj::mv(websocket), close.code, kj::mv(close.reason), close.wasClean),
eventTimeoutMs));
eventTimeoutMs)
.then([&context]() {
KJ_IF_SOME(t, context.getWorkerTracer()) {
t.setReturn(context.now());
}
}));
}
// We want to deliver close, but if no webSocketClose handler is exported, we shouldn't fail
}
}

void ServiceWorkerGlobalScope::sendHibernatableWebSocketError(kj::Exception e,
void ServiceWorkerGlobalScope::sendHibernatableWebSocketError(IoContext& context,
kj::Exception e,
kj::Maybe<uint32_t> eventTimeoutMs,
kj::String websocketId,
Worker::Lock& lock,
Expand All @@ -689,7 +709,12 @@ void ServiceWorkerGlobalScope::sendHibernatableWebSocketError(kj::Exception e,
KJ_IF_SOME(h, exportedHandler) {
KJ_IF_SOME(handler, h.webSocketError) {
event->waitUntil(setHibernatableEventTimeout(
handler(js, kj::mv(websocket), js.exceptionToJs(kj::mv(e))), eventTimeoutMs));
handler(js, kj::mv(websocket), js.exceptionToJs(kj::mv(e))), eventTimeoutMs)
.then([&context]() {
KJ_IF_SOME(t, context.getWorkerTracer()) {
t.setReturn(context.now());
}
}));
}
// We want to deliver an error, but if no webSocketError handler is exported, we shouldn't fail
}
Expand Down
9 changes: 6 additions & 3 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,19 +501,22 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
kj::Promise<void> setHibernatableEventTimeout(
kj::Promise<void> event, kj::Maybe<uint32_t> eventTimeoutMs);

void sendHibernatableWebSocketMessage(kj::OneOf<kj::String, kj::Array<byte>> message,
void sendHibernatableWebSocketMessage(IoContext& context,
kj::OneOf<kj::String, kj::Array<byte>> message,
kj::Maybe<uint32_t> eventTimeoutMs,
kj::String websocketId,
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler);

void sendHibernatableWebSocketClose(HibernatableSocketParams::Close close,
void sendHibernatableWebSocketClose(IoContext& context,
HibernatableSocketParams::Close close,
kj::Maybe<uint32_t> eventTimeoutMs,
kj::String websocketId,
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler);

void sendHibernatableWebSocketError(kj::Exception e,
void sendHibernatableWebSocketError(IoContext& context,
kj::Exception e,
kj::Maybe<uint32_t> eventTimeoutMs,
kj::String websocketId,
Worker::Lock& lock,
Expand Down
14 changes: 8 additions & 6 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,24 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve
props = kj::mv(props)](Worker::Lock& lock) mutable {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
KJ_CASE_ONEOF(text, HibernatableSocketParams::Text) {
return lock.getGlobalScope().sendHibernatableWebSocketMessage(kj::mv(text.message),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
return lock.getGlobalScope().sendHibernatableWebSocketMessage(context,
kj::mv(text.message), eventParameters.eventTimeoutMs,
kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()));
}
KJ_CASE_ONEOF(data, HibernatableSocketParams::Data) {
return lock.getGlobalScope().sendHibernatableWebSocketMessage(kj::mv(data.message),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
return lock.getGlobalScope().sendHibernatableWebSocketMessage(context,
kj::mv(data.message), eventParameters.eventTimeoutMs,
kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()));
}
KJ_CASE_ONEOF(close, HibernatableSocketParams::Close) {
return lock.getGlobalScope().sendHibernatableWebSocketClose(kj::mv(close),
return lock.getGlobalScope().sendHibernatableWebSocketClose(context, kj::mv(close),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()));
}
KJ_CASE_ONEOF(e, HibernatableSocketParams::Error) {
return lock.getGlobalScope().sendHibernatableWebSocketError(kj::mv(e.error),
return lock.getGlobalScope().sendHibernatableWebSocketError(context, kj::mv(e.error),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()));
}
Expand Down
8 changes: 6 additions & 2 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ struct StartQueueEventResponse {
};

StartQueueEventResponse startQueueEvent(EventTarget& globalEventTarget,
IoContext& context,
kj::OneOf<rpc::EventDispatcher::QueueParams::Reader, QueueEvent::Params> params,
IoPtr<QueueEventResult> result,
Worker::Lock& lock,
Expand All @@ -493,8 +494,11 @@ StartQueueEventResponse startQueueEvent(EventTarget& globalEventTarget,
KJ_IF_SOME(f, queueHandler.queue) {
auto promise = f(lock, js.alloc<QueueController>(event.addRef()),
jsg::JsValue(h.env.getHandle(js)).addRef(js), h.getCtx())
.then([event = event.addRef()]() mutable {
.then([event = event.addRef(), &context]() mutable {
event->setCompletionStatus(QueueEvent::CompletedSuccessfully{});
KJ_IF_SOME(t, context.getWorkerTracer()) {
t.setReturn(context.now());
}
}, [event = event.addRef()](kj::Exception&& e) mutable {
event->setCompletionStatus(QueueEvent::CompletedWithError{kj::cp(e)});
return kj::mv(e);
Expand Down Expand Up @@ -573,7 +577,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto& typeHandler = lock.getWorker().getIsolate().getApi().getQueueTypeHandler(lock);
auto startResp = startQueueEvent(lock.getGlobalScope(), kj::mv(params),
auto startResp = startQueueEvent(lock.getGlobalScope(), context, kj::mv(params),
context.addObject(result), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler);
queueEvent->event = kj::mv(startResp.event);
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/tail-worker-test-receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const test = {
// Number of traces based on how often main tail worker is invoked from previous tests
let numTraces = 26;
let basicTrace =
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"trace","traces":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}';
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"trace","traces":[]}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}';
assert.deepStrictEqual(
Array.from(resposeMap.values()),
Array(numTraces).fill(basicTrace)
Expand Down
12 changes: 6 additions & 6 deletions src/workerd/api/tail-worker-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ export const test = {
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"fetch","spanId":"0000000000000001"}{"type":"attributes","info":[{"name":"network.protocol.name","value":"http"},{"name":"network.protocol.version","value":"HTTP/1.1"},{"name":"http.request.method","value":"POST"},{"name":"url.full","value":"http://placeholder/body-length"},{"name":"http.request.body.size","value":"3"},{"name":"http.response.status_code","value":"200"},{"name":"http.response.body.size","value":"22"}]}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"fetch","spanId":"0000000000000002"}{"type":"attributes","info":[{"name":"network.protocol.name","value":"http"},{"name":"network.protocol.version","value":"HTTP/1.1"},{"name":"http.request.method","value":"POST"},{"name":"url.full","value":"http://placeholder/body-length"},{"name":"http.response.status_code","value":"200"},{"name":"http.response.body.size","value":"31"}]}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"scheduled","spanId":"0000000000000003"}{"type":"spanClose","outcome":"ok"}{"type":"spanOpen","name":"scheduled","spanId":"0000000000000004"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":""}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":"* * * * 30"}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":""}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":"* * * * 30"}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"cacheMode","scriptTags":[],"info":{"type":"custom"}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"GET","url":"http://placeholder/not-found","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":404}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"GET","url":"http://placeholder/web-socket","headers":[{"name":"upgrade","value":"websocket"}]}}{"type":"exception","name":"Error","message":"The Workers runtime canceled this request because it detected that your Worker\'s code had hung and would never generate a response. Refer to: https://developers.cloudflare.com/workers/observability/errors/"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
Expand All @@ -100,11 +100,11 @@ export const test = {
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"https://fake-host/message","headers":[{"name":"content-type","value":"application/octet-stream"},{"name":"x-msg-fmt","value":"json"}]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"https://fake-host/message","headers":[{"name":"content-type","value":"application/octet-stream"},{"name":"x-msg-fmt","value":"v8"}]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"fetch","method":"POST","url":"https://fake-host/batch","headers":[{"name":"cf-queue-batch-bytes","value":"31"},{"name":"cf-queue-batch-count","value":"4"},{"name":"cf-queue-largest-msg","value":"13"},{"name":"content-type","value":"application/json"},{"name":"x-msg-delay-secs","value":"2"}]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"queue","queueName":"test-queue","batchSize":5}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"queue","queueName":"test-queue","batchSize":5}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',

// actor-alarms-test.js: alarm events
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"DurableObjectExample","scriptTags":[],"info":{"type":"fetch","method":"GET","url":"http://foo/test","headers":[]}}{"type":"return","info":{"type":"fetch","statusCode":200}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"DurableObjectExample","scriptTags":[],"info":{"type":"alarm","scheduledTime":"1970-01-01T00:00:00.000Z"}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"DurableObjectExample","scriptTags":[],"info":{"type":"alarm","scheduledTime":"1970-01-01T00:00:00.000Z"}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',

// legacy tail worker, triggered via alarm test. It would appear that these being recorded
// after the onsets above is not guaranteed, but since the streaming tail worker is invoked
Expand All @@ -116,8 +116,8 @@ export const test = {
// tests/websocket-hibernation.js: hibernatableWebSocket events
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"DurableObjectExample","scriptTags":[],"info":{"type":"fetch","method":"GET","url":"http://example.com/","headers":[{"name":"upgrade","value":"websocket"}]}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"DurableObjectExample","scriptTags":[],"info":{"type":"fetch","method":"GET","url":"http://example.com/hibernation","headers":[{"name":"upgrade","value":"websocket"}]}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"DurableObjectExample","scriptTags":[],"info":{"type":"hibernatableWebSocket","info":{"type":"message"}}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"DurableObjectExample","scriptTags":[],"info":{"type":"hibernatableWebSocket","info":{"type":"close","code":1000,"wasClean":true}}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"DurableObjectExample","scriptTags":[],"info":{"type":"hibernatableWebSocket","info":{"type":"message"}}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"DurableObjectExample","scriptTags":[],"info":{"type":"hibernatableWebSocket","info":{"type":"close","code":1000,"wasClean":true}}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',

// tail-worker-test-jsrpc: Regression test for EW-9282 (missing onset event with
// JsRpcSessionCustomEventImpl). This is derived from tests/js-rpc-test.js.
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,8 @@ auto TraceCustomEventImpl::run(kj::Own<IoContext::IncomingRequest> incomingReque
waitUntilTasks.add(sendTracesToExportedHandler(
kj::mv(incomingRequest), entrypointNamePtr, kj::mv(props), traces));

// Reporting a proper return code here would be nice, but for that we'd need to await running the
// tail handler...
// Reporting a proper outcome and return event here would be nice, but for that we'd need to await
// running the tail handler...
return Result{
.outcome = EventOutcome::OK,
};
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1958,6 +1958,9 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEventImpl::r
// and server as part of this session.
co_await donePromise.exclusiveJoin(ioctx.onAbort());

KJ_IF_SOME(t, ioctx.getWorkerTracer()) {
t.setReturn(ioctx.now());
}
co_return WorkerInterface::CustomEvent::Result{.outcome = EventOutcome::OK};
} catch (...) {
// Make sure the top-level capability is revoked with the same exception that `run()` is
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/io/trace-stream.c++
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,9 @@ kj::Promise<WorkerInterface::CustomEvent::Result> TailStreamCustomEventImpl::run
kj::throwRecoverableException(kj::mv(e));
KJ_UNREACHABLE;
});
KJ_IF_SOME(t, ioContext.getWorkerTracer()) {
t.setReturn(ioContext.now());
}

co_return WorkerInterface::CustomEvent::Result{.outcome = eventOutcome};
}
Expand Down
Loading
Loading