Skip to content

PipelineSink

Source: src/Cloudflare/Pipelines/Sink.ts

A Cloudflare Pipelines sink — the destination of the Pipelines product. A SQL {@link Pipeline} reads events from a {@link PipelineStream} and writes them to a sink, which stores them in R2 either as raw files (r2) or as Iceberg tables via the R2 Data Catalog (r2_data_catalog).

Sinks have no update API: every property change triggers a replacement. With engine-generated names this is seamless (the new sink gets a fresh name before the old one is deleted); with an explicit name the create-before-delete replacement collides, so prefer generated names.

R2 sink with JSON output

The S3-compatible credentials are derived from a Cloudflare API token: the access key id is the token id and the secret is the SHA-256 hex digest of the token value.

const bucket = yield* Cloudflare.R2Bucket("events", {});
const sink = yield* Cloudflare.PipelineSink("events-sink", {
type: "r2",
config: {
bucket: bucket.bucketName,
credentials: {
accessKeyId: alchemy.secret.env.R2_ACCESS_KEY_ID,
secretAccessKey: alchemy.secret.env.R2_SECRET_ACCESS_KEY,
},
path: "ingest",
rollingPolicy: { intervalSeconds: 30 },
},
});

Parquet output

const sink = yield* Cloudflare.PipelineSink("parquet-sink", {
type: "r2",
config: { bucket: bucket.bucketName, credentials },
format: { type: "parquet", compression: "zstd" },
});
const sink = yield* Cloudflare.PipelineSink("iceberg-sink", {
type: "r2_data_catalog",
config: {
bucket: bucket.bucketName,
tableName: "events",
namespace: "default",
token: alchemy.secret.env.CATALOG_TOKEN,
},
});