Skip to content

Stream

Source: src/AWS/Kinesis/Stream.ts

An Amazon Kinesis Data Stream.

Stream owns the stream’s lifecycle and mutable control-plane configuration, including retention, encryption, monitoring, warm throughput, record size, tags, and stream resource policy. A stream name is auto-generated from the app, stage, and logical ID unless you provide one explicitly.

On-Demand Stream

import * as Kinesis from "alchemy/AWS/Kinesis";
const stream = yield* Kinesis.Stream("OrdersStream");

Provisioned Stream

const stream = yield* Kinesis.Stream("AnalyticsStream", {
streamMode: "PROVISIONED",
shardCount: 2,
retentionPeriodHours: 48,
});

Encrypted Stream

const stream = yield* Kinesis.Stream("SecureStream", {
encryption: true,
kmsKeyId: "alias/my-key",
});

Bind producer operations in the init phase and use them in runtime handlers.

// init
const putRecord = yield* Kinesis.PutRecord.bind(stream);
return {
fetch: Effect.gen(function* () {
// runtime
yield* putRecord({
PartitionKey: "order-123",
Data: new TextEncoder().encode(JSON.stringify({ orderId: "123" })),
});
return HttpServerResponse.text("Sent");
}),
};

Process records from a Kinesis stream using a Lambda event source mapping.

// init
yield* Kinesis.records(stream).process(
Effect.fn(function* (record) {
const data = new TextDecoder().decode(record.data);
yield* Effect.log(`Received: ${data}`);
}),
);