PipelineSink
Source:
src/Cloudflare/Pipelines/Sink.ts
A Cloudflare Pipelines sink — the destination of the Pipelines product.
A SQL {@link Pipeline} reads events from a {@link PipelineStream} and
writes them to a sink, which stores them in R2 either as raw files
(r2) or as Iceberg tables via the R2 Data Catalog
(r2_data_catalog).
Sinks have no update API: every property change triggers a
replacement. With engine-generated names this is seamless (the new
sink gets a fresh name before the old one is deleted); with an
explicit name the create-before-delete replacement collides, so
prefer generated names.
Creating a Sink
Section titled “Creating a Sink”R2 sink with JSON output
The S3-compatible credentials are derived from a Cloudflare API token: the access key id is the token id and the secret is the SHA-256 hex digest of the token value.
const bucket = yield* Cloudflare.R2Bucket("events", {});
const sink = yield* Cloudflare.PipelineSink("events-sink", { type: "r2", config: { bucket: bucket.bucketName, credentials: { accessKeyId: alchemy.secret.env.R2_ACCESS_KEY_ID, secretAccessKey: alchemy.secret.env.R2_SECRET_ACCESS_KEY, }, path: "ingest", rollingPolicy: { intervalSeconds: 30 }, },});Parquet output
const sink = yield* Cloudflare.PipelineSink("parquet-sink", { type: "r2", config: { bucket: bucket.bucketName, credentials }, format: { type: "parquet", compression: "zstd" },});R2 Data Catalog
Section titled “R2 Data Catalog”const sink = yield* Cloudflare.PipelineSink("iceberg-sink", { type: "r2_data_catalog", config: { bucket: bucket.bucketName, tableName: "events", namespace: "default", token: alchemy.secret.env.CATALOG_TOKEN, },});