Skip to content

messages

Source: src/Cloudflare/Queue/QueueEventSource.ts

Subscribe to a Cloudflare Queue with an Effect stream handler.

Mirrors AWS.SQS.messages(queue).subscribe(...) on the Cloudflare side. Wires both halves of the consumer in one call:

  • Runtime: registers a queue event listener on the Worker. Each batch is piped through process as a Stream.Stream.
  • Deploy-time: yields a Cloudflare.QueueConsumer resource so Cloudflare actually dispatches messages from queue to this Worker. No manual QueueConsumer wiring needed in alchemy.run.ts.

Acking semantics: if process succeeds, every message in the batch is ack()ed; if it fails, every message is retry()ed and Cloudflare applies maxRetries / retryDelay from the settings before dead-lettering. Per-message control is still available by calling msg.ack() / msg.retry() inside the handler.

import * as Cloudflare from "alchemy/Cloudflare";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Stream from "effect/Stream";
yield* Cloudflare.messages<MyEvent>(queueResource, {
batchSize: 25,
maxRetries: 3,
maxWaitTime: "5 seconds",
retryDelay: Duration.seconds(30),
}).subscribe((stream) =>
Stream.runForEach(stream, (msg) =>
Effect.log(`event ${msg.body.id}`),
),
);