Skip to content

Commit

Permalink
Add FuturePromisedWorkerInterface
Browse files Browse the repository at this point in the history
This is a new WorkerInterface class that will be used in the internal
PR, see comments to understand what it does.
  • Loading branch information
danlapid committed Nov 1, 2024
1 parent bbde7e4 commit 5bdfd45
Showing 1 changed file with 96 additions and 0 deletions.
96 changes: 96 additions & 0 deletions src/workerd/io/worker-interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,102 @@ class WorkerInterface: public kj::HttpService {
// for the promise, then invoke the destination object.
kj::Own<WorkerInterface> newPromisedWorkerInterface(kj::Promise<kj::Own<WorkerInterface>> promise);

template <typename Func>
class FuturePromisedWorkerInterface final: public WorkerInterface {
public:
FuturePromisedWorkerInterface(Func func): func(kj::mv(func)) {}

void resolveFunc() {
promise = func()
.then([this](kj::Own<WorkerInterface> result) { worker = kj::mv(result); })
.eagerlyEvaluate(nullptr)
.fork();
}

kj::Promise<void> request(kj::HttpMethod method,
kj::StringPtr url,
const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody,
Response& response) override {
resolveFunc();
KJ_IF_SOME(w, worker) {
co_await w->request(method, url, headers, requestBody, response);
} else {
co_await KJ_ASSERT_NONNULL(promise);
co_await KJ_ASSERT_NONNULL(worker)->request(method, url, headers, requestBody, response);
}
}

kj::Promise<void> connect(kj::StringPtr host,
const kj::HttpHeaders& headers,
kj::AsyncIoStream& connection,
ConnectResponse& response,
kj::HttpConnectSettings settings) override {
resolveFunc();
KJ_IF_SOME(w, worker) {
co_await w->connect(host, headers, connection, response, kj::mv(settings));
} else {
co_await KJ_ASSERT_NONNULL(promise);
co_await KJ_ASSERT_NONNULL(worker)->connect(
host, headers, connection, response, kj::mv(settings));
}
}

kj::Promise<void> prewarm(kj::StringPtr url) override {
resolveFunc();
KJ_IF_SOME(w, worker) {
co_return co_await w->prewarm(url);
} else {
co_await KJ_ASSERT_NONNULL(promise);
co_return co_await KJ_ASSERT_NONNULL(worker)->prewarm(url);
}
}

kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
resolveFunc();
KJ_IF_SOME(w, worker) {
co_return co_await w->runScheduled(scheduledTime, cron);
} else {
co_await KJ_ASSERT_NONNULL(promise);
co_return co_await KJ_ASSERT_NONNULL(worker)->runScheduled(scheduledTime, cron);
}
}

kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override {
resolveFunc();
KJ_IF_SOME(w, worker) {
co_return co_await w->runAlarm(scheduledTime, retryCount);
} else {
co_await KJ_ASSERT_NONNULL(promise);
co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime, retryCount);
}
}

kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override {
resolveFunc();
KJ_IF_SOME(w, worker) {
co_return co_await w->customEvent(kj::mv(event));
} else {
co_await KJ_ASSERT_NONNULL(promise);
co_return co_await KJ_ASSERT_NONNULL(worker)->customEvent(kj::mv(event));
}
}

private:
Func func;
kj::Maybe<kj::ForkedPromise<void>> promise;
kj::Maybe<kj::Own<WorkerInterface>> worker;
};
// Similar to newPromisedWorkerInterface but receives a function that returns a Promise for a
// WorkerInterface. This is useful when you are not sure if the worker will be used or not and
// you don't want it to be created in case it isn't used. If you just create a
// PromisedWorkerInterface then the async loop might run the promise before it is eventually
// destroyed even if it was never used.
template <typename Func>
kj::Own<WorkerInterface> newFuturePromisedWorkerInterface(Func func) {
return kj::heap<FuturePromisedWorkerInterface<Func>>(kj::mv(func));
}

// Adapts WorkerInterface to HttpClient, including taking ownership.
//
// (Use kj::newHttpClient() if you don't want to take ownership.)
Expand Down

0 comments on commit 5bdfd45

Please sign in to comment.