RpcWorker
Source:
src/Cloudflare/Workers/RpcWorker.ts
RpcWorker is a thin sugar over {@link Worker} for the common case
where a worker’s entire fetch surface is a typed Effect RpcGroup.
It takes the rpc schema directly in props alongside main, and
accepts an init Effect that returns the already-piped
RpcServer.toHttpEffect(...)-producing Effect (no { fetch }
wrapper) — the wrapper plugs it into the worker’s fetch for you.
Functionally identical to writing Cloudflare.Worker(...) with
return { fetch: RpcServer.toHttpEffect(schema).pipe(...) }; use
whichever style you prefer.
The class form (class X extends Cloudflare.RpcWorker<X>()(...))
carries Self through the result type as Rpc<Self>, so other
workers binding to this one see the rpc shape pinned to Self.
Defining the rpc group
Section titled “Defining the rpc group”The rpc group and its schemas live outside any worker so both the
server (RpcWorker) and any consumers (RpcClient.make /
RpcDurableObjectNamespace) import the same value.
import * as Schema from "effect/Schema";import { Rpc, RpcGroup } from "effect/unstable/rpc";
export class TaskNotFound extends Schema.TaggedClass<TaskNotFound>()( "TaskNotFound", { id: Schema.String },) {}
const getTask = Rpc.make("getTask", { payload: { id: Schema.String }, success: Schema.String, error: TaskNotFound,});
export class TaskRpcs extends RpcGroup.make(getTask) {}Implementing the worker
Section titled “Implementing the worker”Class form (recommended)
Mirrors Cloudflare.Worker<Self>()(...) — class X extends ...
works the same. The init Effect builds a handlers Layer from the
group and returns the RpcServer.toHttpEffect(schema)-piped Effect
directly.
import * as Cloudflare from "alchemy/Cloudflare";import * as Effect from "effect/Effect";import * as Layer from "effect/Layer";import { RpcSerialization, RpcServer } from "effect/unstable/rpc";import { TaskRpcs } from "./rpcs.ts";
export default class Worker extends Cloudflare.RpcWorker<Worker>()( "Worker", { main: import.meta.filename, schema: TaskRpcs }, Effect.gen(function* () { const handlers = TaskRpcs.toLayer({ getTask: ({ id }) => Effect.succeed(`task-${id}`), }); return RpcServer.toHttpEffect(TaskRpcs).pipe( Effect.provide(Layer.mergeAll(handlers, RpcSerialization.layerJson)), ); }),) {}NDJSON for streaming rpcs
If any rpc in the group is a streaming rpc, the wire serialization
must be RpcSerialization.layerNdjson — streaming rpcs need
newline framing on the wire.
return RpcServer.toHttpEffect(ChatRpcs).pipe( Effect.provide(handlers), Effect.provide(RpcSerialization.layerNdjson),);Modular form: separate the class from its runtime
Section titled “Modular form: separate the class from its runtime”The inline class form above bundles the runtime into the class
declaration. The two-arg form (id, props) declares the class
as a pure tagged identifier; provide the runtime separately via
WorkerClass.make(impl) so consumers can import the class for
binding without pulling the host’s runtime into their bundle.
export class TaskWorker extends Cloudflare.RpcWorker<TaskWorker>()( "TaskWorker", { main: import.meta.filename, schema: TaskRpcs },) {}
// Only the host script imports this default export; consumers// import the class above for `RpcWorker.bind(TaskWorker)`.export default TaskWorker.make( Effect.gen(function* () { const handlers = TaskRpcs.toLayer({ getTask: ({ id }) => Effect.succeed(`task-${id}`), }); return RpcServer.toHttpEffect(TaskRpcs).pipe( Effect.provide(Layer.mergeAll(handlers, RpcSerialization.layerJson)), ); }),);Hosting a Durable Object for cross-script binding
Section titled “Hosting a Durable Object for cross-script binding”The optional second type argument Deps mirrors
Cloudflare.Worker<Self, Bindings, Deps> — it declares the DOs
this Worker publishes for cross-script binding. With Counter
named in Deps, any other Worker can write
Counter.from(TaskWorker) and have it type-check.
import { Counter } from "./counter.ts";
export class TaskWorker extends Cloudflare.RpcWorker<TaskWorker, Counter>()( "TaskWorker", { main: import.meta.filename, schema: TaskRpcs },) {}See {@link RpcDurableObjectNamespace} for the consumer side
(Counter.from(TaskWorker)).
Binding it from another worker
Section titled “Binding it from another worker”Inside another worker’s init, RpcWorker.bind(WorkerClass)
registers the service binding on the surrounding worker and returns
a typed RpcClient you can call directly from any per-request
handler. Internally each method invocation builds a fresh underlying
client (because Cloudflare rejects cross-request reuse of the
stub I/O), but that’s hidden behind a Proxy so the consumer sees a
normal RpcClient.
import TaskWorker from "./task-worker.ts";
export default class Caller extends Cloudflare.RpcWorker<Caller>()( "Caller", { main: import.meta.filename, schema: CallerRpcs }, Effect.gen(function* () { // INIT: register binding, get the typed client const tasks = yield* Cloudflare.RpcWorker.bind(TaskWorker);
const handlers = CallerRpcs.toLayer({ // PER-REQUEST: just call methods directly proxyGetTask: ({ id }) => tasks.getTask({ id }), }); return RpcServer.toHttpEffect(CallerRpcs).pipe( Effect.provide(Layer.mergeAll(handlers, RpcSerialization.layerJson)), ); }),) {}Driving it from a test
Section titled “Driving it from a test”The same RpcGroup drives a typed client. Test.make deploys the
stack once for the file; each test yields the deploy handle for its
URL and calls procedures directly.
import { expect } from "@effect/vitest";import * as Cloudflare from "alchemy/Cloudflare";import * as Test from "alchemy/Test/Vitest";import * as Effect from "effect/Effect";import * as Layer from "effect/Layer";import * as Schedule from "effect/Schedule";import * as FetchHttpClient from "effect/unstable/http/FetchHttpClient";import * as RpcClient from "effect/unstable/rpc/RpcClient";import * as RpcSerialization from "effect/unstable/rpc/RpcSerialization";import Stack from "../alchemy.run.ts";import { TaskRpcs } from "../src/rpcs.ts";
const { test, beforeAll, afterAll, deploy, destroy } = Test.make({ providers: Cloudflare.providers(),});const stack = beforeAll(deploy(Stack));afterAll.skipIf(!!process.env.NO_DESTROY)(destroy(Stack));
const layer = (url: string) => RpcClient.layerProtocolHttp({ url }).pipe( Layer.provide(FetchHttpClient.layer), Layer.provide( Layer.succeed(RpcSerialization.RpcSerialization, RpcSerialization.json), ), );
test( "getTask", Effect.gen(function* () { const { url } = yield* stack; yield* Effect.gen(function* () { const client = yield* RpcClient.make(TaskRpcs); const result = yield* client .getTask({ id: "abc" }) .pipe(Effect.retry({ schedule: Schedule.exponential("500 millis"), times: 5 })); expect(result).toBe("task-abc"); }).pipe(Effect.scoped, Effect.provide(layer(url))); }),);Yielding the surrounding worker from inside the impl
Section titled “Yielding the surrounding worker from inside the impl”Mirrors yield* DurableObjectNamespace — yield the tag to access
the surrounding worker.
Effect.gen(function* () { const self = yield* Cloudflare.RpcWorker;});