Connect

MySQL CDC Patterns

The mysql_cdc input captures row-level changes from MySQL tables using the binary log (binlog). Use these patterns to filter, transform, and route MySQL CDC events to Redpanda, S3, and other destinations.

Use this cookbook to:

  • Apply reusable patterns for capturing MySQL CDC events

  • Adapt integration patterns to route CDC data to Redpanda and S3

  • Identify patterns for filtering and transforming change events

Prerequisites

Before using these patterns, configure the following.

Redpanda CLI

Install the Redpanda CLI (rpk) to run Redpanda Connect. See rpk installation for installation instructions.

MySQL binlog

The source MySQL database must have binary logging enabled with row-based replication:

-- Verify binary logging is enabled
SHOW VARIABLES LIKE 'log_bin';

-- Verify row-based format
SHOW VARIABLES LIKE 'binlog_format';

Set log_bin to ON and binlog_format to ROW. For cloud-managed MySQL, see:

Checkpoint cache

The mysql_cdc input requires a cache to store the binlog position between restarts. The examples in this cookbook use a Redis cache:

export REDIS_URL=redis://localhost:6379

For production use, any persistent cache supported by Redpanda Connect works, such as aws_dynamodb or postgres.

Environment variables

The examples in this cookbook use environment variables for configuration:

export MYSQL_DSN="user:password@tcp(localhost:3306)/mydb" (1)
export MYSQL_TABLES="mydb.orders,mydb.customers" (2)
export REDIS_URL=redis://localhost:6379 (3)
export REDPANDA_BROKERS=localhost:9092 (4)
export S3_BUCKET=cdc-archive (5)
1 The MySQL DSN in user:password@tcp(host:port)/database format.
2 Comma-separated list of tables to capture in database.table format.
3 The Redis URL for checkpoint storage.
4 The Redpanda broker addresses (for Redpanda output examples).
5 The S3 bucket name (for S3 output examples).

Capture CDC events

The simplest pattern captures all change events from MySQL tables and outputs them with metadata:

input:
  mysql_cdc:
    dsn: ${MYSQL_DSN}
    tables:
      - mydb.orders
    checkpoint_cache: redis_cache
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        root.operation = meta("operation")
        root.table = meta("table")
        root.binlog_position = meta("binlog_position")
        root.data = this
        root.timestamp = now()

output:
  stdout:
    codec: lines

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

For details on the CDC event message structure and available metadata fields, see the metadata section in the connector reference.

Filter CDC events

Filter events to process only specific change types using the operation metadata field:

input:
  mysql_cdc:
    dsn: ${MYSQL_DSN}
    tables:
      - mydb.orders
    checkpoint_cache: redis_cache
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        # Drop delete events, pass through inserts and updates
        root = if meta("operation") == "delete" {
          deleted()
        } else {
          {
            "operation": meta("operation"),
            "table": meta("table"),
            "data": this,
            "timestamp": now()
          }
        }

output:
  stdout:
    codec: lines

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

This pattern:

  • Filters out delete events, passing through insert and update operations

  • Transforms the event to a simplified format with a timestamp

Route to Redpanda

Stream MySQL changes to Redpanda for real-time processing:

input:
  mysql_cdc:
    dsn: ${MYSQL_DSN}
    tables:
      - mydb.orders
      - mydb.customers
    checkpoint_cache: redis_cache
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        root = this
        meta topic = meta("table")

output:
  redpanda:
    seed_brokers:
      - ${REDPANDA_BROKERS}
    topic: ${! meta("topic") }
    key: ${! json("id") }
    batching:
      count: 100
      period: 1s

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

This pattern:

  • Uses the table name as the Redpanda topic

  • Batches messages for efficient delivery

  • Sets the message key to the row’s primary key field (update json("id") to match your table’s primary key column)

Route to S3

Archive CDC events to S3 for long-term storage and analytics:

input:
  mysql_cdc:
    dsn: ${MYSQL_DSN}
    tables:
      - mydb.orders
    checkpoint_cache: redis_cache
    stream_snapshot: true

pipeline:
  processors:
    - mapping: |
        root.operation = meta("operation")
        root.table = meta("table")
        root.data = this
        root.timestamp = now()

output:
  aws_s3:
    bucket: ${S3_BUCKET}
    path: >-
      cdc/${! meta("table") }/${! timestamp_unix().format_timestamp("2006/01/02/15") }/${! uuid_v4() }.ndjson
    batching:
      count: 1000
      period: 5m
      processors:
        - archive:
            format: lines

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

This pattern:

  • Organizes files by table and time-based partitions (year/month/day/hour)

  • Batches events and archives them as newline-delimited JSON

  • Uses UUID file names to prevent collisions

Route by event type

Route different event types to different destinations:

input:
  mysql_cdc:
    dsn: ${MYSQL_DSN}
    tables:
      - mydb.orders
    checkpoint_cache: redis_cache
    stream_snapshot: true

output:
  switch:
    cases:
      - check: meta("operation") == "insert"
        output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: mysql.orders.inserts
      - output:
          redpanda:
            seed_brokers:
              - ${REDPANDA_BROKERS}
            topic: mysql.orders.changes

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}

This pattern:

  • Routes insert events to one Redpanda topic and all other change events to another

  • Supports specialized downstream consumers per operation type

Configure replication mode

The mysql_cdc input supports two replication modes controlled by the stream_snapshot field:

  • stream_snapshot: true: Captures a full snapshot of existing table data before streaming live changes. Use this when you need a complete initial load.

  • stream_snapshot: false: Skips the snapshot and streams only changes from the current binlog position. Use this when you only need new changes going forward.

input:
  mysql_cdc:
    dsn: ${MYSQL_DSN}
    tables:
      - mydb.orders
    checkpoint_cache: redis_cache
    stream_snapshot: true
    snapshot_max_batch_size: 1000 (1)

cache_resources:
  - label: redis_cache
    redis:
      url: ${REDIS_URL}
1 Number of rows to read per batch during snapshot processing.

Troubleshoot common issues

Use these steps to diagnose and fix the most common problems with the mysql_cdc input.

No events received

If you’re not receiving events:

  1. Verify binary logging is enabled:

    SHOW VARIABLES LIKE 'log_bin';
    SHOW VARIABLES LIKE 'binlog_format';
  2. Confirm the user has the required MySQL privileges:

    GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'your_user'@'%';
    GRANT SELECT ON mydb.* TO 'your_user'@'%';
  3. Check that the table names in tables use the database.table format.

Pipeline restarts lose position

If the pipeline restarts and replays events from the beginning:

  • Verify the checkpoint cache is persistent and accessible.

  • Check that checkpoint_key is consistent across restarts (default: mysql_binlog_position).

  • Use a durable cache backend such as Redis with persistence enabled, aws_dynamodb, or postgres.

Duplicate events

The mysql_cdc input provides at-least-once delivery. If the pipeline fails between checkpoints, events may be re-read on restart. To handle duplicates:

  • Use idempotent processing in downstream systems.

  • Deduplicate using the binlog_position metadata field.

  • Lower checkpoint_limit to reduce the window of possible duplicates.