messages
Source:
src/Cloudflare/Queue/QueueEventSource.ts
Subscribe to a Cloudflare Queue with an Effect stream handler.
Mirrors AWS.SQS.messages(queue).subscribe(...) on the
Cloudflare side. Wires both halves of the consumer in one call:
- Runtime: registers a
queueevent listener on the Worker. Each batch is piped throughprocessas aStream.Stream. - Deploy-time: yields a
Cloudflare.QueueConsumerresource so Cloudflare actually dispatches messages fromqueueto this Worker. No manualQueueConsumerwiring needed inalchemy.run.ts.
Acking semantics: if process succeeds, every message in the
batch is ack()ed; if it fails, every message is retry()ed
and Cloudflare applies maxRetries / retryDelay from the
settings before dead-lettering. Per-message control is still
available by calling msg.ack() / msg.retry() inside the
handler.
Examples
Section titled “Examples”import * as Cloudflare from "alchemy/Cloudflare";import * as Duration from "effect/Duration";import * as Effect from "effect/Effect";import * as Stream from "effect/Stream";
yield* Cloudflare.messages<MyEvent>(queueResource, { batchSize: 25, maxRetries: 3, maxWaitTime: "5 seconds", retryDelay: Duration.seconds(30),}).subscribe((stream) => Stream.runForEach(stream, (msg) => Effect.log(`event ${msg.body.id}`), ),);