Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cloudflare-world.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/world-cloudflare": patch
---

Add Cloudflare Workers world implementation using Durable Objects, D1, and Cloudflare Queues
88 changes: 88 additions & 0 deletions packages/world-cloudflare/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# @workflow/world-cloudflare

A [Workflow DevKit](https://useworkflow.dev) world implementation for Cloudflare Workers, using **Durable Objects** for per-run state, **D1** for cross-run queries, and **Cloudflare Queues** for message dispatch.

## Architecture

- **Durable Objects**: Each workflow run maps to a single Durable Object instance. All run state (events, steps, hooks, waits, stream chunks) lives in the DO's transactional storage.
- **D1 (SQLite)**: A lightweight global index enables cross-run queries like `runs.list()` and `hooks.getByToken()`. The DO writes index updates to D1 after each mutation.
- **Cloudflare Queues**: Workflow and step invocations are dispatched through Cloudflare Queues for reliable, at-least-once delivery.

## Setup

### 1. Install

```bash
pnpm add @workflow/world-cloudflare
```

### 2. Configure bindings

Copy `wrangler.example.toml` and configure your D1 database ID:

```toml
[durable_objects]
bindings = [{ name = "WORKFLOW_RUNS", class_name = "WorkflowRunDO" }]

[[d1_databases]]
binding = "WORKFLOW_DB"
database_name = "workflow"
database_id = "<your-d1-database-id>"

[[queues.producers]]
binding = "WORKFLOW_QUEUE"
queue = "workflow-jobs"

[[queues.consumers]]
queue = "workflow-jobs"
max_batch_size = 10
max_retries = 3
```

### 3. Run D1 migration

```ts
import { migrate } from '@workflow/world-cloudflare/d1';

// Run once during setup or in a migration script
await migrate(env.WORKFLOW_DB);
```

### 4. Create the world

```ts
import { createWorld, WorkflowRunDO } from '@workflow/world-cloudflare';

// Re-export the DO class so Cloudflare can instantiate it
export { WorkflowRunDO };

export default {
async fetch(request, env) {
const world = createWorld({
db: env.WORKFLOW_DB,
runs: env.WORKFLOW_RUNS,
queue: env.WORKFLOW_QUEUE,
});
// Use world with your workflow runtime...
},

async queue(batch, env) {
const world = createWorld({
db: env.WORKFLOW_DB,
runs: env.WORKFLOW_RUNS,
queue: env.WORKFLOW_QUEUE,
});
await world.handleQueueBatch(batch);
},
};
```

## Limitations

- `events.listByCorrelationId()` requires the `runId` context (inherent to DO-per-run architecture)
- `steps.get()` requires `runId` (cannot look up steps without knowing which DO to query)
- `readFromStream()` expects stream names in `runId:streamName` format

## License

Apache-2.0
61 changes: 61 additions & 0 deletions packages/world-cloudflare/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"name": "@workflow/world-cloudflare",
"version": "0.1.0",
"description": "Cloudflare Workers world implementation for Workflow DevKit using Durable Objects, D1, and Cloudflare Queues",
"type": "module",
"main": "dist/index.js",
"exports": {
".": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
},
"./durable-object": {
"types": "./dist/durable-object.d.ts",
"default": "./dist/durable-object.js"
},
"./d1": {
"types": "./dist/d1.d.ts",
"default": "./dist/d1.js"
}
},
"files": [
"dist"
],
"publishConfig": {
"access": "public"
},
"license": "Apache-2.0",
"repository": {
"type": "git",
"url": "https://github.com/vercel/workflow.git",
"directory": "packages/world-cloudflare"
},
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"clean": "tsc --build --clean && rm -rf dist",
"test": "vitest run",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@workflow/errors": "workspace:*",
"@workflow/world": "workspace:*",
"@workflow/world-local": "workspace:*",
"ulid": "catalog:",
"zod": "catalog:"
},
"devDependencies": {
"@cloudflare/workers-types": "4.20250312.0",
"@types/node": "catalog:",
"@workflow/tsconfig": "workspace:*",
"vitest": "catalog:"
},
"keywords": [
"cloudflare",
"workers",
"durable-objects",
"workflow"
],
"author": "",
"packageManager": "pnpm@10.15.1"
}
10 changes: 10 additions & 0 deletions packages/world-cloudflare/src/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export interface CloudflareWorldConfig {
/** D1 database binding for the global index */
db: D1Database;
/** Durable Object namespace binding for WorkflowRunDO */
runs: DurableObjectNamespace;
/** Cloudflare Queue binding for message dispatch */
queue: globalThis.Queue;
/** Optional: port for local executor (same pattern as world-postgres) */
port?: number;
}
71 changes: 71 additions & 0 deletions packages/world-cloudflare/src/d1.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { describe, expect, it, vi } from 'vitest';
import {
deleteHookIndex,
deleteHooksForRunIndex,
insertHookIndex,
upsertRunIndex,
} from './d1.js';

function createMockDb() {
const run = vi.fn().mockResolvedValue({ success: true });
const bind = vi.fn().mockReturnValue({ run });
const prepare = vi.fn().mockReturnValue({ bind });
const exec = vi.fn().mockResolvedValue({ success: true });
return { prepare, exec, run, bind } as unknown as D1Database & {
run: ReturnType<typeof vi.fn>;
bind: ReturnType<typeof vi.fn>;
};
}

describe('d1 index helpers', () => {
it('upserts a run index row', async () => {
const db = createMockDb();
await upsertRunIndex(db, {
runId: 'wrun_01ABC',
workflowName: 'my-workflow',
status: 'running',
deploymentId: 'cloudflare',
specVersion: 2,
createdAt: '2024-01-01T00:00:00.000Z',
updatedAt: '2024-01-01T00:00:01.000Z',
startedAt: '2024-01-01T00:00:00.500Z',
});
expect(db.prepare).toHaveBeenCalledWith(
expect.stringContaining('INSERT INTO workflow_runs_index')
);
});

it('inserts a hook index row', async () => {
const db = createMockDb();
await insertHookIndex(db, {
hookId: 'hook_01ABC',
runId: 'wrun_01ABC',
token: 'tok_secret',
ownerId: 'owner1',
projectId: 'proj1',
environment: 'production',
createdAt: '2024-01-01T00:00:00.000Z',
isWebhook: true,
specVersion: 2,
});
expect(db.prepare).toHaveBeenCalledWith(
expect.stringContaining('INSERT OR IGNORE INTO workflow_hooks_index')
);
});

it('deletes a hook index row', async () => {
const db = createMockDb();
await deleteHookIndex(db, 'hook_01ABC');
expect(db.prepare).toHaveBeenCalledWith(
expect.stringContaining('DELETE FROM workflow_hooks_index WHERE hook_id')
);
});

it('deletes all hooks for a run from the index', async () => {
const db = createMockDb();
await deleteHooksForRunIndex(db, 'wrun_01ABC');
expect(db.prepare).toHaveBeenCalledWith(
expect.stringContaining('DELETE FROM workflow_hooks_index WHERE run_id')
);
});
});
150 changes: 150 additions & 0 deletions packages/world-cloudflare/src/d1.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* D1 index schema for cross-run queries.
*
* The D1 database stores lightweight index data only (no input/output/event payloads).
* The source of truth for all run data lives in Durable Objects.
*/

export const MIGRATION_SQL = `
CREATE TABLE IF NOT EXISTS workflow_runs_index (
run_id TEXT PRIMARY KEY,
workflow_name TEXT NOT NULL,
status TEXT NOT NULL,
deployment_id TEXT NOT NULL,
spec_version INTEGER,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
expired_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_runs_name ON workflow_runs_index(workflow_name);
CREATE INDEX IF NOT EXISTS idx_runs_status ON workflow_runs_index(status);

CREATE TABLE IF NOT EXISTS workflow_hooks_index (
hook_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
token TEXT NOT NULL UNIQUE,
owner_id TEXT NOT NULL,
project_id TEXT NOT NULL,
environment TEXT NOT NULL,
created_at TEXT NOT NULL,
is_webhook INTEGER DEFAULT 1,
spec_version INTEGER
);
CREATE INDEX IF NOT EXISTS idx_hooks_run ON workflow_hooks_index(run_id);
CREATE INDEX IF NOT EXISTS idx_hooks_token ON workflow_hooks_index(token);
`;

/**
* Run the D1 index migration. Safe to call multiple times (uses IF NOT EXISTS).
*/
export async function migrate(db: D1Database): Promise<void> {
const statements = MIGRATION_SQL.split(';')
.map((s) => s.trim())
.filter(Boolean);

for (const sql of statements) {
await db.exec(`${sql};`);
}
}

/** Upsert a run's index row in D1 */
export async function upsertRunIndex(
db: D1Database,
run: {
runId: string;
workflowName: string;
status: string;
deploymentId: string;
specVersion?: number;
createdAt: string;
updatedAt: string;
startedAt?: string;
completedAt?: string;
expiredAt?: string;
}
): Promise<void> {
await db
.prepare(
`INSERT INTO workflow_runs_index
(run_id, workflow_name, status, deployment_id, spec_version, created_at, updated_at, started_at, completed_at, expired_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(run_id) DO UPDATE SET
status = excluded.status,
updated_at = excluded.updated_at,
started_at = COALESCE(excluded.started_at, started_at),
completed_at = COALESCE(excluded.completed_at, completed_at),
expired_at = COALESCE(excluded.expired_at, expired_at)`
)
.bind(
run.runId,
run.workflowName,
run.status,
run.deploymentId,
run.specVersion ?? null,
run.createdAt,
run.updatedAt,
run.startedAt ?? null,
run.completedAt ?? null,
run.expiredAt ?? null
)
.run();
}

/** Insert a hook into the D1 index */
export async function insertHookIndex(
db: D1Database,
hook: {
hookId: string;
runId: string;
token: string;
ownerId: string;
projectId: string;
environment: string;
createdAt: string;
isWebhook?: boolean;
specVersion?: number;
}
): Promise<void> {
await db
.prepare(
`INSERT OR IGNORE INTO workflow_hooks_index
(hook_id, run_id, token, owner_id, project_id, environment, created_at, is_webhook, spec_version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
)
.bind(
hook.hookId,
hook.runId,
hook.token,
hook.ownerId,
hook.projectId,
hook.environment,
hook.createdAt,
hook.isWebhook !== false ? 1 : 0,
hook.specVersion ?? null
)
.run();
}

/** Delete a hook from the D1 index */
export async function deleteHookIndex(
db: D1Database,
hookId: string
): Promise<void> {
await db
.prepare('DELETE FROM workflow_hooks_index WHERE hook_id = ?')
.bind(hookId)
.run();
}

/** Delete all hooks for a run from the D1 index */
export async function deleteHooksForRunIndex(
db: D1Database,
runId: string
): Promise<void> {
await db
.prepare('DELETE FROM workflow_hooks_index WHERE run_id = ?')
.bind(runId)
.run();
}
Loading