TigerBeetle CDC Patterns
The tigerbeetle_cdc input streams financial ledger events from a TigerBeetle cluster in real time. Use these patterns to filter, transform, and route transfer events to Redpanda, S3, and other destinations.
Use this cookbook to:
-
Apply reusable patterns for capturing TigerBeetle CDC events
-
Adapt integration patterns to route financial ledger events to Redpanda and S3
-
Identify patterns for filtering and transforming financial transfer events
The tigerbeetle_cdc input is in beta. The API is subject to change.
|
Prerequisites
Before using these patterns, configure the following.
Cgo-enabled binary
The tigerbeetle_cdc input requires a cgo-enabled build of Redpanda Connect. The rpk CLI and the standard Docker image do not include the tigerbeetle_cdc input. To get a cgo-enabled binary:
-
Build Redpanda Connect from source with cgo enabled.
TigerBeetle cluster
The tigerbeetle_cdc input requires a TigerBeetle cluster running version 0.16.57 or later. The Redpanda Connect TigerBeetle client version must not be newer than the cluster version.
For a local cluster, see the TigerBeetle documentation.
Progress cache
The tigerbeetle_cdc input requires a cache to store the last acknowledged event timestamp between restarts. The examples in this cookbook use a Redis cache:
export REDIS_URL=redis://localhost:6379
For production use, any persistent cache supported by Redpanda Connect works, such as aws_dynamodb or sql.
Environment variables
The examples in this cookbook use environment variables for configuration:
export TB_CLUSTER_ID="1" (1)
export TB_REPLICA_1="192.168.1.10:3000" (2)
export TB_REPLICA_2="192.168.1.11:3000"
export TB_REPLICA_3="192.168.1.12:3000"
export REDIS_URL=redis://localhost:6379 (3)
export REDPANDA_BROKERS=localhost:9092 (4)
export S3_BUCKET=cdc-archive (5)
| 1 | The unique 128-bit TigerBeetle cluster ID. Small integers are valid (for example, 1 represents the 128-bit value 1). |
| 2 | The addresses of each replica in host:port format. A standard cluster has three replicas. |
| 3 | The Redis URL for progress tracking. |
| 4 | The Redpanda broker addresses (for Redpanda output examples). |
| 5 | The Amazon S3 bucket name (for S3 output examples). |
Capture CDC events
The simplest pattern captures all events from a TigerBeetle cluster and outputs them with metadata:
input:
tigerbeetle_cdc:
cluster_id: ${TB_CLUSTER_ID}
addresses:
- ${TB_REPLICA_1}
- ${TB_REPLICA_2}
- ${TB_REPLICA_3}
progress_cache: redis_cache
pipeline:
processors:
- mapping: |
root.event_type = meta("event_type")
root.ledger = meta("ledger")
root.transfer_code = meta("transfer_code")
root.timestamp_ms = meta("timestamp_ms")
root.transfer = this.transfer
root.debit_account = this.debit_account
root.credit_account = this.credit_account
output:
stdout:
codec: lines
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
For details on the event structure and available metadata fields, see the metadata section in the connector reference.
Each event contains a full snapshot of the transfer and both the debit and credit accounts at the time of the event.
Filter CDC events
Filter events to process only settled transfers (single_phase for immediate and two_phase_posted for completed two-phase) using the event_type metadata field:
input:
tigerbeetle_cdc:
cluster_id: ${TB_CLUSTER_ID}
addresses:
- ${TB_REPLICA_1}
- ${TB_REPLICA_2}
- ${TB_REPLICA_3}
progress_cache: redis_cache
pipeline:
processors:
- mapping: |
root = if meta("event_type") != "single_phase" && meta("event_type") != "two_phase_posted" {
deleted()
}
- mapping: |
root.event_type = meta("event_type")
root.ledger = meta("ledger")
root.transfer_id = this.transfer.id
root.amount = this.transfer.amount
root.debit_account_id = this.debit_account.id
root.credit_account_id = this.credit_account.id
root.timestamp_ms = meta("timestamp_ms")
output:
stdout:
codec: lines
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
This pattern:
-
Keeps only
single_phaseandtwo_phase_postedevents, dropping pending, voided, and expired events -
Flattens the event to key transfer fields and a millisecond timestamp
Route to Redpanda
Stream TigerBeetle transfer events to Redpanda, partitioned by ledger code:
input:
tigerbeetle_cdc:
cluster_id: ${TB_CLUSTER_ID}
addresses:
- ${TB_REPLICA_1}
- ${TB_REPLICA_2}
- ${TB_REPLICA_3}
progress_cache: redis_cache
pipeline:
processors:
- mapping: |
meta topic = "transfers." + meta("ledger")
output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: ${! meta("topic") }
key: ${! json("transfer.id") }
batching:
count: 100
period: 1s
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
This pattern:
-
Creates a Redpanda topic per ledger (for example,
transfers.1,transfers.2) -
Batches messages for efficient delivery
-
Sets the message key to the transfer ID
Route to S3
Archive transfer events to Amazon S3 for long-term storage and analytics:
input:
tigerbeetle_cdc:
cluster_id: ${TB_CLUSTER_ID}
addresses:
- ${TB_REPLICA_1}
- ${TB_REPLICA_2}
- ${TB_REPLICA_3}
progress_cache: redis_cache
pipeline:
processors:
- mapping: |
root.event_type = meta("event_type")
root.ledger = meta("ledger")
root.transfer = this.transfer
root.debit_account = this.debit_account
root.credit_account = this.credit_account
root.timestamp_ms = meta("timestamp_ms")
output:
aws_s3:
bucket: ${S3_BUCKET}
path: >-
cdc/ledger/${! meta("ledger") }/${! timestamp_unix().format_timestamp("2006/01/02/15") }/${! uuid_v4() }.ndjson
batching:
count: 1000
period: 5m
processors:
- archive:
format: lines
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
This pattern:
-
Organizes files by ledger code and time-based partitions (year/month/day/hour)
-
Batches events and archives them as newline-delimited JSON
-
Uses UUID file names to prevent collisions
Route by event type
Route single-phase and two-phase transfers to separate Redpanda topics:
input:
tigerbeetle_cdc:
cluster_id: ${TB_CLUSTER_ID}
addresses:
- ${TB_REPLICA_1}
- ${TB_REPLICA_2}
- ${TB_REPLICA_3}
progress_cache: redis_cache
output:
switch:
cases:
- check: meta("event_type") == "single_phase"
output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: tigerbeetle.single_phase
- check: meta("event_type") == "two_phase_posted"
output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: tigerbeetle.two_phase
- output:
drop: {}
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
This pattern:
-
Routes
single_phaseevents (immediate settlements) to one topic -
Routes
two_phase_postedevents (settled two-phase transfers) to another topic -
Drops other two-phase events (
two_phase_pending,two_phase_voided,two_phase_expired) that have not yet settled -
Supports specialized consumers for each settlement model
Resume from a timestamp
By default, the tigerbeetle_cdc input streams all events available in the cluster. To start from a specific point in time, set timestamp_initial to a TigerBeetle nanosecond timestamp:
input:
tigerbeetle_cdc:
cluster_id: ${TB_CLUSTER_ID}
addresses:
- ${TB_REPLICA_1}
- ${TB_REPLICA_2}
- ${TB_REPLICA_3}
progress_cache: redis_cache
timestamp_initial: "1745328372758695656" (1)
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
| 1 | Start from events at or after this nanosecond timestamp. Ignored if progress_cache already contains a more recent acknowledged timestamp. |
Troubleshoot common issues
Use these steps to diagnose and fix common problems with the tigerbeetle_cdc input.
Component not available
If the pipeline fails with an error that tigerbeetle_cdc is not a recognized input:
-
Confirm you are using a cgo-enabled build of Redpanda Connect, not
rpkor the standard Docker image. -
Verify the binary version supports
tigerbeetle_cdc(introduced in version 4.65.0).
Cannot connect to cluster
If the pipeline fails to connect to TigerBeetle:
-
Verify the
addresseslist contains the correct IP and port for each replica. -
Confirm
cluster_idmatches the ID used when creating the cluster. -
Check network access between Redpanda Connect and the TigerBeetle replicas.
Duplicate events after restart
The tigerbeetle_cdc input provides at-least-once delivery. During crash recovery, the pipeline may replay events that were already delivered. To handle duplicates:
-
Use idempotent processing in downstream systems.
-
Deduplicate using the
timestampmetadata field, which is unique per event with nanosecond precision.