Skip to content

Commit

Permalink
Merge pull request #2677 from cloudflare/sidharthachatterjee/workflow…
Browse files Browse the repository at this point in the history
…-binding

Add Workflow binding module
  • Loading branch information
sidharthachatterjee authored Sep 11, 2024
2 parents 6a236a1 + fd2c74e commit e2d40ae
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 11 deletions.
7 changes: 7 additions & 0 deletions src/cloudflare/internal/test/workflows/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
load("//:build/wd_test.bzl", "wd_test")

wd_test(
src = "workflows-api-test.wd-test",
args = ["--experimental"],
data = glob(["*.js"]),
)
21 changes: 21 additions & 0 deletions src/cloudflare/internal/test/workflows/workflows-api-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import * as assert from 'node:assert';

export const tests = {
async test(_, env) {
{
// Test create instance
const instance = await env.workflow.create('foo', { bar: 'baz' });
assert.deepStrictEqual(instance.id, 'foo');
}

{
// Test get instance
const instance = await env.workflow.get('bar');
assert.deepStrictEqual(instance.id, 'bar');
}
},
};
36 changes: 36 additions & 0 deletions src/cloudflare/internal/test/workflows/workflows-api-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "workflows-api-test",
worker = (
modules = [
(name = "worker", esModule = embed "workflows-api-test.js")
],
compatibilityDate = "2024-09-02",
compatibilityFlags = ["experimental", "nodejs_compat"],
bindings = [
(
name = "workflow",
wrapped = (
moduleName = "cloudflare-internal:workflows-api",
innerBindings = [(
name = "fetcher",
service = "workflows-mock"
)],
)
)
],
)
),
( name = "workflows-mock",
worker = (
compatibilityDate = "2024-09-02",
compatibilityFlags = ["experimental", "nodejs_compat"],
modules = [
(name = "worker", esModule = embed "workflows-mock.js")
],
)
)
]
);
41 changes: 41 additions & 0 deletions src/cloudflare/internal/test/workflows/workflows-mock.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export default {
async fetch(request, env, ctx) {
const data = await request.json();

if (request.url.includes('/get') && request.method === 'POST') {
return Response.json(
{
result: {
instanceId: data.id,
},
},
{
status: 200,
headers: {
'content-type': 'application/json',
},
}
);
}

if (request.url.includes('/create') && request.method === 'POST') {
return Response.json(
{
result: {
instanceId: data.id,
},
},
{
status: 201,
headers: {
'content-type': 'application/json',
},
}
);
}
},
};
117 changes: 113 additions & 4 deletions src/cloudflare/internal/workflows-api.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,121 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export class NonRetryableError extends Error {
// `__brand` is needed for Workflows' engine to validate if the user returned a NonRetryableError
// this provides better DX because they can just extend NonRetryableError for their own Errors
// and override name.
// This needs to be a public field because it's serialized over RPC to the Workflows' engine
// `__brand` is how engine validates that the user returned a `NonRetryableError`
// imported from "cloudflare:workflows"
// This enables them to extend NonRetryableError for their own Errors
// as well by overriding name
// Private fields are not serialized over RPC
public readonly __brand: string = 'NonRetryableError';

public constructor(message: string, name = 'NonRetryableError') {
super(message);
this.name = name;
}
}

interface Fetcher {
fetch: typeof fetch;
}

async function callFetcher<T>(
fetcher: Fetcher,
path: string,
body: object
): Promise<T> {
const res = await fetcher.fetch(`http://workflow-binding.local${path}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Version': '1',
},
body: JSON.stringify(body),
});

const response = (await res.json()) as {
result: T;
error?: WorkflowError;
};

if (res.ok) {
return response.result;
} else {
throw new Error(response.error?.message);
}
}

class InstanceImpl implements Instance {
private readonly fetcher: Fetcher;
public readonly id: string;

public constructor(id: string, fetcher: Fetcher) {
this.id = id;
this.fetcher = fetcher;
}

public async pause(): Promise<void> {
await callFetcher(this.fetcher, '/pause', {
id: this.id,
});
}
public async resume(): Promise<void> {
await callFetcher(this.fetcher, '/resume', {
id: this.id,
});
}

public async abort(): Promise<void> {
await callFetcher(this.fetcher, '/abort', {
id: this.id,
});
}

public async restart(): Promise<void> {
await callFetcher(this.fetcher, '/restart', {
id: this.id,
});
}

public async status(): Promise<InstanceStatus> {
const result = await callFetcher<InstanceStatus>(this.fetcher, '/status', {
id: this.id,
});
return result;
}
}

class WorkflowImpl {
private readonly fetcher: Fetcher;

public constructor(fetcher: Fetcher) {
this.fetcher = fetcher;
}

public async get(id: string): Promise<Instance> {
const result = await callFetcher<{ instanceId: string }>(
this.fetcher,
'/get',
{ id }
);

return new InstanceImpl(result.instanceId, this.fetcher);
}

public async create(id: string, params: object): Promise<Instance> {
const result = await callFetcher<{ instanceId: string }>(
this.fetcher,
'/create',
{ id, params }
);

return new InstanceImpl(result.instanceId, this.fetcher);
}
}

export function makeBinding(env: { fetcher: Fetcher }): Workflow {
return new WorkflowImpl(env.fetcher);
}

export default makeBinding;
87 changes: 87 additions & 0 deletions src/cloudflare/internal/workflows.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) 2022-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

/*****************************
*
* NOTE: this is copy & pasted from the types/ folder, as when bazel
* runs it doesn't have access to that directly and thusly is sad.
* TODO: come up with a better system for this.
*
****************************** /
/**
* NonRetryableError allows for a user to throw a fatal error
* that makes a Workflow instance fail immediately without triggering a retry
*/
declare abstract class NonRetryableError extends Error {
/**
* `__brand` is used to differentiate between `NonRetryableError` and `Error`
* and is omitted from the constructor because users should not set it
*/
public constructor(message: string, name?: string);
}

declare abstract class Workflow {
/**
* Get a handle to an existing instance of the Workflow.
* @param id Id for the instance of this Workflow
* @returns A promise that resolves with a handle for the Instance
*/
public get(id: string): Promise<Instance>;

/**
* Create a new instance and return a handle to it. If a provided id exists, an error will be thrown.
* @param id Id to create the instance of this Workflow with
* @param params The payload to send over to this instance
* @returns A promise that resolves with a handle for the Instance
*/
public create(id: string, params: object): Promise<Instance>;
}

type InstanceStatus = {
status:
| 'queued'
| 'running'
| 'paused'
| 'errored'
| 'terminated'
| 'complete'
| 'unknown';
error?: string;
output?: object;
};

interface WorkflowError {
code?: number;
message: string;
}

declare abstract class Instance {
public id: string;

/**
* Pause the instance.
*/
public pause(): Promise<void>;

/**
* Resume the instance. If it is already running, an error will be thrown.
*/
public resume(): Promise<void>;

/**
* Abort the instance. If it is errored, terminated or complete, an error will be thrown.
*/
public abort(): Promise<void>;

/**
* Restart the instance.
*/
public restart(): Promise<void>;

/**
* Returns the current status of the instance.
*/
public status(): Promise<InstanceStatus>;
}
4 changes: 4 additions & 0 deletions src/cloudflare/workflows.ts
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export { NonRetryableError } from 'cloudflare-internal:workflows-api';
Loading

0 comments on commit e2d40ae

Please sign in to comment.