Google Cloud Spanner CDC Patterns
The gcp_spanner_cdc input captures row-level changes from Google Cloud Spanner tables using Spanner change streams. Use these patterns to filter, transform, and route Spanner change records to Redpanda, Amazon S3, and other destinations.
Use this cookbook to:
-
Apply reusable patterns for capturing Google Cloud Spanner CDC events
-
Adapt integration patterns to route CDC data to Redpanda and S3
-
Identify patterns for filtering and transforming Spanner change records
The gcp_spanner_cdc input is in beta. The API is subject to change.
|
Prerequisites
Before using these patterns, configure the following.
Redpanda CLI
Install the Redpanda CLI (rpk) to run Redpanda Connect. See rpk installation for installation instructions.
Spanner change stream
The gcp_spanner_cdc input reads from a Spanner change stream. The change stream must be created in your Spanner database before running the pipeline. To create a change stream that tracks all tables:
CREATE CHANGE STREAM AllChanges FOR ALL;
To track specific tables:
CREATE CHANGE STREAM OrderChanges FOR orders, customers;
For more information, see the Spanner change streams documentation.
Google Cloud Platform (GCP) authentication
The gcp_spanner_cdc input supports two authentication methods:
-
Application Default Credentials (ADC): The recommended approach for GCP-native environments (Cloud Run, GKE, Compute Engine). No configuration needed when running with a service account attached to the instance.
-
Service account key: Set the
credentials_jsonfield to a base64-encoded service account key JSON file. Use this approach when running outside GCP or when ADC is not available:export GCP_CREDENTIALS="$(base64 < /path/to/service-account.json | tr -d '\n')"Then reference it in your pipeline config:
input: gcp_spanner_cdc: project_id: ${GCP_PROJECT_ID} instance_id: ${GCP_INSTANCE_ID} database_id: ${GCP_DATABASE_ID} stream_id: ${GCP_STREAM_ID} credentials_json: ${GCP_CREDENTIALS}
Progress tracking
The gcp_spanner_cdc input stores progress in a Spanner metadata table (cdc_metadata_<stream_id> by default). No external cache is required. The pipeline resumes from the last acknowledged change record after a restart.
Environment variables
The examples in this cookbook use environment variables for configuration:
export GCP_PROJECT_ID="analytics-prod" (1)
export GCP_INSTANCE_ID="primary" (2)
export GCP_DATABASE_ID="orders" (3)
export GCP_STREAM_ID="AllChanges" (4)
export REDPANDA_BROKERS=localhost:9092 (5)
export S3_BUCKET=cdc-archive (6)
| 1 | The GCP project ID. |
| 2 | The Spanner instance ID. |
| 3 | The Spanner database ID. |
| 4 | The name of the change stream to read from. |
| 5 | The Redpanda broker addresses (for Redpanda output examples). |
| 6 | The Amazon S3 bucket name (for S3 output examples). |
Capture CDC events
The simplest pattern captures all change records from a Spanner change stream and outputs them with key fields:
input:
gcp_spanner_cdc:
project_id: ${GCP_PROJECT_ID}
instance_id: ${GCP_INSTANCE_ID}
database_id: ${GCP_DATABASE_ID}
stream_id: ${GCP_STREAM_ID}
pipeline:
processors:
- mapping: |
root.mod_type = meta("mod_type")
root.table_name = meta("table_name")
root.commit_timestamp = meta("commit_timestamp")
root.data = this
root.transaction_id = meta("server_transaction_id")
output:
stdout:
codec: lines
For details on the change record structure and available fields, see the Spanner change record reference.
The gcp_spanner_cdc input emits one message per data change. Fields such as mod_type (INSERT, UPDATE, or DELETE), table_name, commit_timestamp, and server_transaction_id are available as message metadata (use meta("field_name")). The message body contains the row data with keys, new_values, and old_values.
Filter CDC events
The gcp_spanner_cdc input supports built-in filtering with the allowed_mod_types field. This filters events at the source before they reach your pipeline, which is more efficient than post-ingestion filtering:
input:
gcp_spanner_cdc:
project_id: ${GCP_PROJECT_ID}
instance_id: ${GCP_INSTANCE_ID}
database_id: ${GCP_DATABASE_ID}
stream_id: ${GCP_STREAM_ID}
allowed_mod_types:
- INSERT
- UPDATE
pipeline:
processors:
- mapping: |
root.mod_type = meta("mod_type")
root.table_name = meta("table_name")
root.commit_timestamp = meta("commit_timestamp")
root.new_values = this.new_values
output:
stdout:
codec: lines
This pattern:
-
Captures only
INSERTandUPDATEchanges at the input level, ignoringDELETEevents entirely -
Extracts new values from each change record
Route to Redpanda
Stream Spanner changes to Redpanda, using the table name as the topic:
input:
gcp_spanner_cdc:
project_id: ${GCP_PROJECT_ID}
instance_id: ${GCP_INSTANCE_ID}
database_id: ${GCP_DATABASE_ID}
stream_id: ${GCP_STREAM_ID}
pipeline:
processors:
- mapping: |
meta topic = meta("table_name")
output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: ${! meta("topic") }
key: ${! meta("server_transaction_id") }
batching:
count: 100
period: 1s
This pattern:
-
Uses the table name as the Redpanda topic
-
Batches messages for efficient delivery
-
Sets the message key to
server_transaction_id, grouping related changes together
Route to S3
Archive change records to Amazon S3 for long-term storage and analytics:
input:
gcp_spanner_cdc:
project_id: ${GCP_PROJECT_ID}
instance_id: ${GCP_INSTANCE_ID}
database_id: ${GCP_DATABASE_ID}
stream_id: ${GCP_STREAM_ID}
pipeline:
processors:
- mapping: |
root.mod_type = meta("mod_type")
root.table_name = meta("table_name")
root.commit_timestamp = meta("commit_timestamp")
root.data = this
root.transaction_id = meta("server_transaction_id")
output:
aws_s3:
bucket: ${S3_BUCKET}
path: >-
cdc/${! meta("table_name") }/${! timestamp_unix().format_timestamp("2006/01/02/15") }/${! uuid_v4() }.ndjson
batching:
count: 1000
period: 5m
processors:
- archive:
format: lines
This pattern:
-
Organizes files by table name and time-based partitions (year/month/day/hour)
-
Batches events and archives them as newline-delimited JSON
-
Uses UUID file names to prevent collisions
Route by event type
Route different change types to different Redpanda topics:
input:
gcp_spanner_cdc:
project_id: ${GCP_PROJECT_ID}
instance_id: ${GCP_INSTANCE_ID}
database_id: ${GCP_DATABASE_ID}
stream_id: ${GCP_STREAM_ID}
output:
switch:
cases:
- check: meta("mod_type") == "INSERT"
output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: spanner.inserts
- output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: spanner.changes
This pattern:
-
Routes
INSERTevents to one Redpanda topic and all other changes (UPDATE,DELETE) to another -
Supports specialized downstream consumers per operation type
Stream a time range
Use start_timestamp and end_timestamp to process a specific window of changes. Both fields accept RFC 3339 timestamps:
input:
gcp_spanner_cdc:
project_id: ${GCP_PROJECT_ID}
instance_id: ${GCP_INSTANCE_ID}
database_id: ${GCP_DATABASE_ID}
stream_id: ${GCP_STREAM_ID}
start_timestamp: "2024-01-01T00:00:00Z" (1)
end_timestamp: "2024-01-02T00:00:00Z" (2)
| 1 | Start reading from this timestamp. Defaults to the current time if not set. |
| 2 | Stop reading at this timestamp. Omit to stream continuously. |
Troubleshoot common issues
Use these steps to diagnose and fix common problems with the gcp_spanner_cdc input.
No events received
If no events arrive:
-
Verify the change stream exists in the database:
SELECT * FROM INFORMATION_SCHEMA.CHANGE_STREAMS WHERE CHANGE_STREAM_NAME = 'AllChanges'; -
Confirm the service account has the required IAM roles:
-
roles/spanner.databaseReaderon the Spanner database -
roles/spanner.databaseUserto create and read the metadata table
-
-
Check that
stream_idmatches the exact name of the change stream.
Metadata table creation fails
If the pipeline fails because it cannot create the metadata table:
-
Confirm the service account has
roles/spanner.databaseUseron the database. -
Use the
metadata_tablefield to specify an existing table if you prefer not to create a new one.
Duplicate events after restart
The gcp_spanner_cdc input provides at-least-once delivery. If the pipeline fails mid-stream, it may replay some records on restart. To handle duplicates:
-
Use idempotent processing in downstream systems.
-
Deduplicate using the
server_transaction_idandrecord_sequencefields from the change record.