Workflow
Source:
src/Cloudflare/Workers/Workflow.ts
A Cloudflare Workflow that orchestrates durable, multi-step tasks with automatic retries and at-least-once delivery.
A Workflow follows the same two-phase pattern as Workers and Durable
Objects. The outer Effect.gen resolves shared dependencies. The inner
Effect.fn is the workflow body — a function from a typed input
payload to an Effect that runs steps using task, sleep, and
sleepUntil.
Effect.gen(function* () { // Phase 1: resolve dependencies const notifier = yield* NotificationService;
return Effect.fn(function* (input: { orderId: string }) { // Phase 2: workflow body (durable steps) const result = yield* Cloudflare.task("process", doWork(input.orderId)); yield* Cloudflare.sleep("cooldown", "10 seconds"); return result; });})Defining a Workflow
Section titled “Defining a Workflow”export default class MyWorkflow extends Cloudflare.Workflow<MyWorkflow>()( "MyWorkflow", Effect.gen(function* () { return Effect.fn(function* (input: { name: string }) { return { received: input.name }; }); }),) {}Step Primitives
Section titled “Step Primitives”Running a named task
const result = yield* Cloudflare.task( "process-order", Effect.succeed({ orderId: "abc", total: 42 }),);Sleeping between steps
yield* Cloudflare.sleep("cooldown", "30 seconds");Accessing env bindings inside a task
Bind a resource (e.g. KVNamespace, R2Bucket) in the workflow’s
outer init phase to get a typed Effect-native client, then use it
directly inside task. task threads the binding’s service
requirement (WorkerEnvironment) through automatically so the inner
Effect needs no extra plumbing.
Effect.gen(function* () { const kv = yield* Cloudflare.KVNamespace.bind(KV);
return Effect.fn(function* (input: { roomId: string; message: string }) { const { roomId, message } = input;
const stored = yield* Cloudflare.task( "kv-roundtrip", Effect.gen(function* () { const key = `workflow:${roomId}`; yield* kv.put(key, message); return yield* kv.get(key); }).pipe(Effect.orDie), );
return stored; });});Starting and Monitoring Instances
Section titled “Starting and Monitoring Instances”Creating an instance from a Worker
const workflow = yield* MyWorkflow;const instance = yield* workflow.create({ orderId: "abc" });Checking instance status
const workflow = yield* MyWorkflow;const handle = yield* workflow.get(instanceId);const status = yield* handle.status();Triggering from a Worker
Section titled “Triggering from a Worker”Wire the workflow into HTTP routes so callers can fire instances and poll for completion.
const notifier = yield* MyWorkflow;
return { fetch: Effect.gen(function* () { const request = yield* HttpServerRequest;
if (request.url.startsWith("/workflow/start/")) { const id = request.url.split("/").pop()!; const instance = yield* notifier.create({ id }); return HttpServerResponse.json({ instanceId: instance.id }); }
if (request.url.startsWith("/workflow/status/")) { const id = request.url.split("/").pop()!; const instance = yield* notifier.get(id); return HttpServerResponse.json(yield* instance.status()); }
return HttpServerResponse.text("Not Found", { status: 404 }); }),};Testing Workflows
Section titled “Testing Workflows”Workflows run asynchronously, so tests start an instance and
poll until it reaches a terminal status. A simple recipe with
alchemy/Test/Bun:
test( "workflow completes", Effect.gen(function* () { const { url } = yield* stack;
const start = yield* HttpClient.post(`${url}/workflow/start/x`); const { instanceId } = (yield* start.json) as { instanceId: string };
let status: { status: string } | undefined; const deadline = Date.now() + 60_000; while (Date.now() < deadline) { const res = yield* HttpClient.get( `${url}/workflow/status/${instanceId}`, ); status = (yield* res.json) as { status: string }; if (status.status === "complete" || status.status === "errored") { break; } yield* Effect.sleep("2 seconds"); }
expect(status?.status).toBe("complete"); }), { timeout: 120_000 },);