Skip to content

Workflow

Source: src/Cloudflare/Workers/Workflow.ts

A Cloudflare Workflow that orchestrates durable, multi-step tasks with automatic retries and at-least-once delivery.

A Workflow follows the same two-phase pattern as Workers and Durable Objects. The outer Effect.gen resolves shared dependencies. The inner Effect.fn is the workflow body — a function from a typed input payload to an Effect that runs steps using task, sleep, and sleepUntil.

Effect.gen(function* () {
// Phase 1: resolve dependencies
const notifier = yield* NotificationService;
return Effect.fn(function* (input: { orderId: string }) {
// Phase 2: workflow body (durable steps)
const result = yield* Cloudflare.task("process", doWork(input.orderId));
yield* Cloudflare.sleep("cooldown", "10 seconds");
return result;
});
})
export default class MyWorkflow extends Cloudflare.Workflow<MyWorkflow>()(
"MyWorkflow",
Effect.gen(function* () {
return Effect.fn(function* (input: { name: string }) {
return { received: input.name };
});
}),
) {}

Running a named task

const result = yield* Cloudflare.task(
"process-order",
Effect.succeed({ orderId: "abc", total: 42 }),
);

Sleeping between steps

yield* Cloudflare.sleep("cooldown", "30 seconds");

Accessing env bindings inside a task

Bind a resource (e.g. KVNamespace, R2Bucket) in the workflow’s outer init phase to get a typed Effect-native client, then use it directly inside task. task threads the binding’s service requirement (WorkerEnvironment) through automatically so the inner Effect needs no extra plumbing.

Effect.gen(function* () {
const kv = yield* Cloudflare.KVNamespace.bind(KV);
return Effect.fn(function* (input: { roomId: string; message: string }) {
const { roomId, message } = input;
const stored = yield* Cloudflare.task(
"kv-roundtrip",
Effect.gen(function* () {
const key = `workflow:${roomId}`;
yield* kv.put(key, message);
return yield* kv.get(key);
}).pipe(Effect.orDie),
);
return stored;
});
});

Creating an instance from a Worker

const workflow = yield* MyWorkflow;
const instance = yield* workflow.create({ orderId: "abc" });

Checking instance status

const workflow = yield* MyWorkflow;
const handle = yield* workflow.get(instanceId);
const status = yield* handle.status();

Wire the workflow into HTTP routes so callers can fire instances and poll for completion.

src/worker.ts
const notifier = yield* MyWorkflow;
return {
fetch: Effect.gen(function* () {
const request = yield* HttpServerRequest;
if (request.url.startsWith("/workflow/start/")) {
const id = request.url.split("/").pop()!;
const instance = yield* notifier.create({ id });
return HttpServerResponse.json({ instanceId: instance.id });
}
if (request.url.startsWith("/workflow/status/")) {
const id = request.url.split("/").pop()!;
const instance = yield* notifier.get(id);
return HttpServerResponse.json(yield* instance.status());
}
return HttpServerResponse.text("Not Found", { status: 404 });
}),
};

Workflows run asynchronously, so tests start an instance and poll until it reaches a terminal status. A simple recipe with alchemy/Test/Bun:

test(
"workflow completes",
Effect.gen(function* () {
const { url } = yield* stack;
const start = yield* HttpClient.post(`${url}/workflow/start/x`);
const { instanceId } = (yield* start.json) as { instanceId: string };
let status: { status: string } | undefined;
const deadline = Date.now() + 60_000;
while (Date.now() < deadline) {
const res = yield* HttpClient.get(
`${url}/workflow/status/${instanceId}`,
);
status = (yield* res.json) as { status: string };
if (status.status === "complete" || status.status === "errored") {
break;
}
yield* Effect.sleep("2 seconds");
}
expect(status?.status).toBe("complete");
}),
{ timeout: 120_000 },
);