Skip to content

Commit

Permalink
Add Workflow entrypoint and types
Browse files Browse the repository at this point in the history
Includes abstract class Workflow in cloudflare:workers, ensures Workflow entrypoint class is only registered if workerd is running with the experimental compatibility flag
  • Loading branch information
sidharthachatterjee committed Aug 14, 2024
1 parent eaa4a50 commit 75ca532
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 3 deletions.
7 changes: 7 additions & 0 deletions src/cloudflare/internal/workers.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ export class WorkerEntrypoint {
public env: unknown;
}

export class Workflow {
public constructor(ctx: unknown, env: unknown);

public ctx: unknown;
public env: unknown;
}

export class RpcStub {
public constructor(server: object);
}
Expand Down
1 change: 1 addition & 0 deletions src/cloudflare/workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ export const WorkerEntrypoint = entrypoints.WorkerEntrypoint;
export const DurableObject = entrypoints.DurableObject;
export const RpcStub = entrypoints.RpcStub;
export const RpcTarget = entrypoints.RpcTarget;
export const Workflow = entrypoints.Workflow;
1 change: 1 addition & 0 deletions src/node/internal/workers.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
declare namespace _default {
class WorkerEntrypoint {}
class Workflow {}
class DurableObject {}
class RpcPromise {}
class RpcProperty {}
Expand Down
14 changes: 14 additions & 0 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1831,4 +1831,18 @@ jsg::Ref<DurableObjectBase> DurableObjectBase::constructor(
return jsg::alloc<DurableObjectBase>();
}

jsg::Ref<Workflow> Workflow::constructor(
const v8::FunctionCallbackInfo<v8::Value>& args,
jsg::Ref<ExecutionContext> ctx, jsg::JsObject env) {
// HACK: We take `FunctionCallbackInfo` mostly so that we can set properties directly on
// `This()`. There ought to be a better way to get access to `this` in a constructor.
// We *also* declare `ctx` and `env` params more explicitly just for the sake of type checking.
jsg::Lock& js = jsg::Lock::from(args.GetIsolate());

jsg::JsObject self(args.This());
self.set(js, "ctx", jsg::JsValue(args[0]));
self.set(js, "env", jsg::JsValue(args[1]));
return jsg::alloc<Workflow>();
}

}; // namespace workerd::api
23 changes: 23 additions & 0 deletions src/workerd/api/worker-rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,27 @@ class DurableObjectBase: public jsg::Object {
JSG_RESOURCE_TYPE(DurableObjectBase) {}
};

// Base class for Workflows
//
// When the worker's top-level module exports a class that extends this class, it means that it
// is a Workflow.
//
// import {Workflow} from "cloudflare:workers";
// export class MyWorkflow extends Workflow {
// async run(batch, fns) { ... }
// }
//
// `env` and `ctx` are automatically available as `this.env` and `this.ctx`, without the need to
// define a constructor.
class Workflow: public jsg::Object {
public:
static jsg::Ref<Workflow> constructor(
const v8::FunctionCallbackInfo<v8::Value>& args,
jsg::Ref<ExecutionContext> ctx, jsg::JsObject env);

JSG_RESOURCE_TYPE(Workflow) {}
};

// The "cloudflare:workers" module, which exposes the WorkerEntrypoint and DurableObject types
// for extending.
class EntrypointsModule: public jsg::Object {
Expand All @@ -487,6 +508,7 @@ class EntrypointsModule: public jsg::Object {

JSG_RESOURCE_TYPE(EntrypointsModule) {
JSG_NESTED_TYPE(WorkerEntrypoint);
JSG_NESTED_TYPE(Workflow);
JSG_NESTED_TYPE_NAMED(DurableObjectBase, DurableObject);
JSG_NESTED_TYPE_NAMED(JsRpcPromise, RpcPromise);
JSG_NESTED_TYPE_NAMED(JsRpcProperty, RpcProperty);
Expand All @@ -501,6 +523,7 @@ class EntrypointsModule: public jsg::Object {
api::JsRpcStub, \
api::JsRpcTarget, \
api::WorkerEntrypoint, \
api::Workflow, \
api::DurableObjectBase, \
api::EntrypointsModule

Expand Down
6 changes: 6 additions & 0 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,7 @@ Worker::Worker(kj::Own<const Script> scriptParam,

auto& api = script->isolate->getApi();
auto handlers = api.unwrapExports(lock, ns);
auto features = api.getFeatureFlags();
auto entrypointClasses = api.getEntrypointClasses(lock);

for (auto& handler: handlers.fields) {
Expand All @@ -1563,6 +1564,11 @@ Worker::Worker(kj::Own<const Script> scriptParam,
} else if (handle == entrypointClasses.workerEntrypoint) {
impl->statelessClasses.insert(kj::mv(handler.name), kj::mv(cls));
return;
} else if (handle == entrypointClasses.workflow) {
if (features.getWorkerdExperimental()) {
impl->statelessClasses.insert(kj::mv(handler.name), kj::mv(cls));
}
return;
}

handle = KJ_UNWRAP_OR(handle.getPrototype(js).tryCast<jsg::JsObject>(), {
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ class Worker::Api {

// Class constructor for DurableObject (aka api::DurableObjectBase).
jsg::JsObject durableObject;

// Class constructor for Workflow.
jsg::JsObject workflow;
};

// Get the constructors for classes from which entrypoint classes may inherit.
Expand Down
2 changes: 2 additions & 0 deletions src/workerd/server/workerd-api.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// https://opensource.org/licenses/Apache-2.0

#include "workerd-api.h"
#include "workerd/api/worker-rpc.h"

#include <workerd/jsg/jsg.h>
#include <workerd/jsg/modules.h>
Expand Down Expand Up @@ -240,6 +241,7 @@ WorkerdApi::EntrypointClasses WorkerdApi::getEntrypointClasses(jsg::Lock& lock)
return {
.workerEntrypoint = typedLock.getConstructor<api::WorkerEntrypoint>(lock.v8Context()),
.durableObject = typedLock.getConstructor<api::DurableObjectBase>(lock.v8Context()),
.workflow = typedLock.getConstructor<api::Workflow>(lock.v8Context()),
};
}
const jsg::TypeHandler<Worker::Api::ErrorInterface>&
Expand Down
50 changes: 47 additions & 3 deletions types/defines/rpc.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ declare namespace Rpc {
export const __RPC_TARGET_BRAND: "__RPC_TARGET_BRAND";
export const __WORKER_ENTRYPOINT_BRAND: "__WORKER_ENTRYPOINT_BRAND";
export const __DURABLE_OBJECT_BRAND: "__DURABLE_OBJECT_BRAND";
export const __WORKFLOW_BRAND: "__WORKFLOW_BRAND";
export interface RpcTargetBranded {
[__RPC_TARGET_BRAND]: never;
}
Expand All @@ -19,9 +20,13 @@ declare namespace Rpc {
export interface DurableObjectBranded {
[__DURABLE_OBJECT_BRAND]: never;
}
export interface WorkflowBranded {
[__WORKFLOW_BRAND]: never;
}
export type EntrypointBranded =
| WorkerEntrypointBranded
| DurableObjectBranded;
| DurableObjectBranded
| WorkflowBranded;

// Types that can be used through `Stub`s
export type Stubable = RpcTargetBranded | ((...args: any[]) => any);
Expand Down Expand Up @@ -150,7 +155,8 @@ declare module "cloudflare:workers" {
// `protected` fields don't appear in `keyof`s, so can't be accessed over RPC

export abstract class WorkerEntrypoint<Env = unknown>
implements Rpc.WorkerEntrypointBranded {
implements Rpc.WorkerEntrypointBranded
{
[Rpc.__WORKER_ENTRYPOINT_BRAND]: never;

protected ctx: ExecutionContext;
Expand All @@ -166,7 +172,8 @@ declare module "cloudflare:workers" {
}

export abstract class DurableObject<Env = unknown>
implements Rpc.DurableObjectBranded {
implements Rpc.DurableObjectBranded
{
[Rpc.__DURABLE_OBJECT_BRAND]: never;

protected ctx: DurableObjectState;
Expand All @@ -187,4 +194,41 @@ declare module "cloudflare:workers" {
): void | Promise<void>;
webSocketError?(ws: WebSocket, error: unknown): void | Promise<void>;
}

export type DurationLabel =
| "second"
| "minute"
| "hour"
| "day"
| "week"
| "month"
| "year";
export type SleepDuration = `${number} ${DurationLabel}${"s" | ""}` | number;

type WorkflowStep = {
do: <T extends Rpc.Serializable>(
name: string,
callback: () => T
) => T | Promise<T>;
sleep: (name: string, duration: SleepDuration) => void | Promise<void>;
};

export abstract class Workflow<
Env = unknown,
T extends Rpc.Serializable | unknown = unknown,
> implements Rpc.WorkflowBranded
{
[Rpc.__WORKFLOW_BRAND]: never;

protected ctx: ExecutionContext;
protected env: Env;

run(
events: Array<{
payload: T;
timestamp: Date;
}>,
step: WorkflowStep
): unknown | Promise<unknown>;
}
}

0 comments on commit 75ca532

Please sign in to comment.