diff --git a/src/workerd/io/worker-interface.h b/src/workerd/io/worker-interface.h index 0048cdb3d6a..d5322b0a8ea 100644 --- a/src/workerd/io/worker-interface.h +++ b/src/workerd/io/worker-interface.h @@ -151,6 +151,102 @@ class WorkerInterface: public kj::HttpService { // for the promise, then invoke the destination object. kj::Own newPromisedWorkerInterface(kj::Promise> promise); +template +class FuturePromisedWorkerInterface final: public WorkerInterface { +public: + FuturePromisedWorkerInterface(Func func): func(kj::mv(func)) {} + + void resolveFunc() { + promise = func() + .then([this](kj::Own result) { worker = kj::mv(result); }) + .eagerlyEvaluate(nullptr) + .fork(); + } + + kj::Promise request(kj::HttpMethod method, + kj::StringPtr url, + const kj::HttpHeaders& headers, + kj::AsyncInputStream& requestBody, + Response& response) override { + resolveFunc(); + KJ_IF_SOME(w, worker) { + co_await w->request(method, url, headers, requestBody, response); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_await KJ_ASSERT_NONNULL(worker)->request(method, url, headers, requestBody, response); + } + } + + kj::Promise connect(kj::StringPtr host, + const kj::HttpHeaders& headers, + kj::AsyncIoStream& connection, + ConnectResponse& response, + kj::HttpConnectSettings settings) override { + resolveFunc(); + KJ_IF_SOME(w, worker) { + co_await w->connect(host, headers, connection, response, kj::mv(settings)); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_await KJ_ASSERT_NONNULL(worker)->connect( + host, headers, connection, response, kj::mv(settings)); + } + } + + kj::Promise prewarm(kj::StringPtr url) override { + resolveFunc(); + KJ_IF_SOME(w, worker) { + co_return co_await w->prewarm(url); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_return co_await KJ_ASSERT_NONNULL(worker)->prewarm(url); + } + } + + kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { + resolveFunc(); + KJ_IF_SOME(w, worker) { + co_return co_await w->runScheduled(scheduledTime, cron); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_return co_await KJ_ASSERT_NONNULL(worker)->runScheduled(scheduledTime, cron); + } + } + + kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override { + resolveFunc(); + KJ_IF_SOME(w, worker) { + co_return co_await w->runAlarm(scheduledTime, retryCount); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_return co_await KJ_ASSERT_NONNULL(worker)->runAlarm(scheduledTime, retryCount); + } + } + + kj::Promise customEvent(kj::Own event) override { + resolveFunc(); + KJ_IF_SOME(w, worker) { + co_return co_await w->customEvent(kj::mv(event)); + } else { + co_await KJ_ASSERT_NONNULL(promise); + co_return co_await KJ_ASSERT_NONNULL(worker)->customEvent(kj::mv(event)); + } + } + +private: + Func func; + kj::Maybe> promise; + kj::Maybe> worker; +}; +// Similar to newPromisedWorkerInterface but receives a function that returns a Promise for a +// WorkerInterface. This is useful when you are not sure if the worker will be used or not and +// you don't want it to be created in case it isn't used. If you just create a +// PromisedWorkerInterface then the async loop might run the promise before it is eventually +// destroyed even if it was never used. +template +kj::Own newFuturePromisedWorkerInterface(Func func) { + return kj::heap>(kj::mv(func)); +} + // Adapts WorkerInterface to HttpClient, including taking ownership. // // (Use kj::newHttpClient() if you don't want to take ownership.)