Connect

Google Cloud Spanner CDC Patterns

The gcp_spanner_cdc input captures row-level changes from Google Cloud Spanner tables using Spanner change streams. Use these patterns to filter, transform, and route Spanner change records to Redpanda, Amazon S3, and other destinations.

Use this cookbook to:

  • Apply reusable patterns for capturing Google Cloud Spanner CDC events

  • Adapt integration patterns to route CDC data to Redpanda and S3

  • Identify patterns for filtering and transforming Spanner change records

The gcp_spanner_cdc input is in beta. The API is subject to change.

Prerequisites

Before using these patterns, configure the following.

Redpanda CLI

Install the Redpanda CLI (rpk) to run Redpanda Connect. See rpk installation for installation instructions.

Spanner change stream

The gcp_spanner_cdc input reads from a Spanner change stream. The change stream must be created in your Spanner database before running the pipeline. To create a change stream that tracks all tables:

CREATE CHANGE STREAM AllChanges FOR ALL;

To track specific tables:

CREATE CHANGE STREAM OrderChanges FOR orders, customers;

For more information, see the Spanner change streams documentation.

Google Cloud Platform (GCP) authentication

The gcp_spanner_cdc input supports two authentication methods:

  • Application Default Credentials (ADC): The recommended approach for GCP-native environments (Cloud Run, GKE, Compute Engine). No configuration needed when running with a service account attached to the instance.

  • Service account key: Set the credentials_json field to a base64-encoded service account key JSON file. Use this approach when running outside GCP or when ADC is not available:

    export GCP_CREDENTIALS="$(base64 < /path/to/service-account.json | tr -d '\n')"

    Then reference it in your pipeline config:

    input:
      gcp_spanner_cdc:
        project_id: ${GCP_PROJECT_ID}
        instance_id: ${GCP_INSTANCE_ID}
        database_id: ${GCP_DATABASE_ID}
        stream_id: ${GCP_STREAM_ID}
        credentials_json: ${GCP_CREDENTIALS}

Progress tracking

The gcp_spanner_cdc input stores progress in a Spanner metadata table (cdc_metadata_<stream_id> by default). No external cache is required. The pipeline resumes from the last acknowledged change record after a restart.

Environment variables

The examples in this cookbook use environment variables for configuration:

export GCP_PROJECT_ID="analytics-prod" (1)
export GCP_INSTANCE_ID="primary" (2)
export GCP_DATABASE_ID="orders" (3)
export GCP_STREAM_ID="AllChanges" (4)
export REDPANDA_BROKERS=localhost:9092 (5)
export S3_BUCKET=cdc-archive (6)
1 The GCP project ID.
2 The Spanner instance ID.
3 The Spanner database ID.
4 The name of the change stream to read from.
5 The Redpanda broker addresses (for Redpanda output examples).
6 The Amazon S3 bucket name (for S3 output examples).

Capture CDC events

The simplest pattern captures all change records from a Spanner change stream and outputs them with key fields:

input:
  gcp_spanner_cdc:
    project_id: ${GCP_PROJECT_ID}
    instance_id: ${GCP_INSTANCE_ID}
    database_id: ${GCP_DATABASE_ID}
    stream_id: ${GCP_STREAM_ID}

pipeline:
  processors:
    - mapping: |
        root.mod_type = meta("mod_type")
        root.table_name = meta("table_name")
        root.commit_timestamp = meta("commit_timestamp")
        root.data = this
        root.transaction_id = meta("server_transaction_id")

output:
  stdout:
    codec: lines

For details on the change record structure and available fields, see the Spanner change record reference.

The gcp_spanner_cdc input emits one message per data change. Fields such as mod_type (INSERT, UPDATE, or DELETE), table_name, commit_timestamp, and server_transaction_id are available as message metadata (use meta("field_name")). The message body contains the row data with keys, new_values, and old_values.

Filter CDC events

The gcp_spanner_cdc input supports built-in filtering with the allowed_mod_types field. This filters events at the source before they reach your pipeline, which is more efficient than post-ingestion filtering:

input:
  gcp_spanner_cdc:
    project_id: ${GCP_PROJECT_ID}
    instance_id: ${GCP_INSTANCE_ID}
    database_id: ${GCP_DATABASE_ID}
    stream_id: ${GCP_STREAM_ID}
    allowed_mod_types:
      - INSERT
      - UPDATE

pipeline:
  processors:
    - mapping: |
        root.mod_type = meta("mod_type")
        root.table_name = meta("table_name")
        root.commit_timestamp = meta("commit_timestamp")
        root.new_values = this.new_values

output:
  stdout:
    codec: lines

This pattern:

  • Captures only INSERT and UPDATE changes at the input level, ignoring DELETE events entirely

  • Extracts new values from each change record

Route to Redpanda

Stream Spanner changes to Redpanda, using the table name as the topic:

input:
  gcp_spanner_cdc:
    project_id: ${GCP_PROJECT_ID}
    instance_id: ${GCP_INSTANCE_ID}
    database_id: ${GCP_DATABASE_ID}
    stream_id: ${GCP_STREAM_ID}

pipeline:
  processors:
    - mapping: |
        meta topic = meta("table_name")

output:
  redpanda:
    seed_brokers:
      - ${REDPANDA_BROKERS}
    topic: ${! meta("topic") }
    key: ${! meta("server_transaction_id") }
    batching:
      count: 100
      period: 1s

This pattern:

  • Uses the table name as the Redpanda topic

  • Batches messages for efficient delivery

  • Sets the message key to server_transaction_id, grouping related changes together

Route to S3

Archive change records to Amazon S3 for long-term storage and analytics:

input:
  gcp_spanner_cdc:
    project_id: ${GCP_PROJECT_ID}
    instance_id: ${GCP_INSTANCE_ID}
    database_id: ${GCP_DATABASE_ID}
    stream_id: ${GCP_STREAM_ID}

pipeline:
  processors:
    - mapping: |
        root.mod_type = meta("mod_type")
        root.table_name = meta("table_name")
        root.commit_timestamp = meta("commit_timestamp")
        root.data = this
        root.transaction_id = meta("server_transaction_id")

output:
  aws_s3:
    bucket: ${S3_BUCKET}
    path: >-
      cdc/${! meta("table_name") }/${! timestamp_unix().format_timestamp("2006/01/02/15") }/${! uuid_v4() }.ndjson
    batching:
      count: 1000
      period: 5m
      processors:
        - archive:
            format: lines

This pattern:

  • Organizes files by table name and time-based partitions (year/month/day/hour)

  • Batches events and archives them as newline-delimited JSON

  • Uses UUID file names to prevent collisions

Route by event type

Route different change types to different Redpanda topics:

input:
  gcp_spanner_cdc:
    project_id: ${GCP_PROJECT_ID}
    instance_id: ${GCP_INSTANCE_ID}
    database_id: ${GCP_DATABASE_ID}
    stream_id: ${GCP_STREAM_ID}

output:
  switch:
    cases:
      - check: meta("mod_type") == "INSERT"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: spanner.inserts
      - output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: spanner.changes

This pattern:

  • Routes INSERT events to one Redpanda topic and all other changes (UPDATE, DELETE) to another

  • Supports specialized downstream consumers per operation type

Stream a time range

Use start_timestamp and end_timestamp to process a specific window of changes. Both fields accept RFC 3339 timestamps:

input:
  gcp_spanner_cdc:
    project_id: ${GCP_PROJECT_ID}
    instance_id: ${GCP_INSTANCE_ID}
    database_id: ${GCP_DATABASE_ID}
    stream_id: ${GCP_STREAM_ID}
    start_timestamp: "2024-01-01T00:00:00Z" (1)
    end_timestamp: "2024-01-02T00:00:00Z" (2)
1 Start reading from this timestamp. Defaults to the current time if not set.
2 Stop reading at this timestamp. Omit to stream continuously.

Troubleshoot common issues

Use these steps to diagnose and fix common problems with the gcp_spanner_cdc input.

No events received

If no events arrive:

  1. Verify the change stream exists in the database:

    SELECT * FROM INFORMATION_SCHEMA.CHANGE_STREAMS
    WHERE CHANGE_STREAM_NAME = 'AllChanges';
  2. Confirm the service account has the required IAM roles:

    • roles/spanner.databaseReader on the Spanner database

    • roles/spanner.databaseUser to create and read the metadata table

  3. Check that stream_id matches the exact name of the change stream.

Metadata table creation fails

If the pipeline fails because it cannot create the metadata table:

  • Confirm the service account has roles/spanner.databaseUser on the database.

  • Use the metadata_table field to specify an existing table if you prefer not to create a new one.

Duplicate events after restart

The gcp_spanner_cdc input provides at-least-once delivery. If the pipeline fails mid-stream, it may replay some records on restart. To handle duplicates:

  • Use idempotent processing in downstream systems.

  • Deduplicate using the server_transaction_id and record_sequence fields from the change record.