Skip to content

Pipeline

Source: src/Cloudflare/Pipelines/Pipeline.ts

A Cloudflare SQL Pipeline — the transform of the Pipelines product. A pipeline is a single SQL statement that reads events from a {@link PipelineStream} and writes them to a {@link PipelineSink}, both referenced by name.

The SQL is fixed at creation: changing it (or the name) triggers a replacement. Nothing references a pipeline downstream, so replacements are cheap.

Stream → Sink passthrough

const stream = yield* Cloudflare.PipelineStream("events", {});
const sink = yield* Cloudflare.PipelineSink("events-sink", {
type: "r2",
config: { bucket: bucket.bucketName, credentials },
});
const pipeline = yield* Cloudflare.Pipeline("etl", {
sql: Output.interpolate`INSERT INTO ${sink.name} SELECT * FROM ${stream.name}`,
});

Filtering transform

const pipeline = yield* Cloudflare.Pipeline("errors-only", {
sql: Output.interpolate`INSERT INTO ${sink.name} SELECT * FROM ${stream.name} WHERE level = 'error'`,
});