SQL Server CDC Patterns
The microsoft_sql_server_cdc input captures row-level changes from SQL Server tables using SQL Server’s built-in CDC change tables. Use these patterns to filter, transform, and route SQL Server CDC events to Redpanda, S3, and other destinations.
Use this cookbook to:
-
Apply reusable patterns for capturing SQL Server CDC events
-
Adapt integration patterns to route CDC data to Redpanda, S3, and other destinations
-
Identify patterns for filtering and transforming change events
The microsoft_sql_server_cdc input is in beta. The API is subject to change.
|
Prerequisites
Before using these patterns, configure the following.
Redpanda CLI
Install the Redpanda CLI (rpk) to run Redpanda Connect. See rpk installation for installation instructions.
SQL Server CDC
CDC must be enabled on both the database and each table you want to capture. Run the following as a user with db_owner or sysadmin privileges:
-- Enable CDC on the database
EXEC sys.sp_cdc_enable_db;
-- Enable CDC on a table
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'orders',
@role_name = NULL;
For cloud-managed SQL Server, see:
Checkpoint storage
By default, the microsoft_sql_server_cdc input creates a table (rpcn.CdcCheckpointCache) and stored procedure in your SQL Server database to store the last processed Log Sequence Number (LSN). The pipeline user must have CREATE TABLE and CREATE PROCEDURE permissions.
To use an external cache instead, set the checkpoint_cache field to a cache label defined in cache_resources. See the connector reference for details.
Environment variables
The examples in this cookbook use environment variables for configuration:
export MSSQL_CONN="sqlserver://user:password@localhost:1433?database=mydb" (1)
export REDPANDA_BROKERS=localhost:9092 (2)
export S3_BUCKET=cdc-archive (3)
| 1 | The SQL Server connection string in sqlserver://user:password@host:port?database=dbname format. |
| 2 | The Redpanda broker addresses (for Redpanda output examples). |
| 3 | The Amazon S3 bucket name (for S3 output examples). |
Capture CDC events
The simplest pattern captures all change events from SQL Server tables and outputs them with metadata:
input:
microsoft_sql_server_cdc:
connection_string: ${MSSQL_CONN}
include:
- dbo.orders
stream_snapshot: true
pipeline:
processors:
- mapping: |
root.operation = meta("operation")
root.schema = meta("database_schema")
root.table = meta("table")
root.lsn = meta("lsn")
root.data = this
root.timestamp = now()
output:
stdout:
codec: lines
For details on the CDC event message structure and available metadata fields, see the metadata section in the connector reference.
SQL Server CDC generates two events for each row update: update_before (the previous row values) and update_after (the new row values). Most downstream use cases only need the update_after event.
|
Filter CDC events
Filter events to process only the current row state after each change, using the operation metadata field:
input:
microsoft_sql_server_cdc:
connection_string: ${MSSQL_CONN}
include:
- dbo.orders
stream_snapshot: true
pipeline:
processors:
- mapping: |
root = if meta("operation") != "insert" && meta("operation") != "update_after" {
deleted()
}
- mapping: |
root.operation = meta("operation")
root.table = meta("table")
root.data = this
root.timestamp = now()
output:
stdout:
codec: lines
This pattern:
-
Keeps only
insertandupdate_afteroperations, droppingdelete,update_before, andreadevents -
Transforms the event to a simplified format with a timestamp
Route to Redpanda
Stream SQL Server changes to Redpanda for real-time processing:
input:
microsoft_sql_server_cdc:
connection_string: ${MSSQL_CONN}
include:
- dbo.orders
- dbo.customers
stream_snapshot: true
pipeline:
processors:
- mapping: |
meta topic = meta("database_schema") + "." + meta("table")
output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: ${! meta("topic") }
key: ${! json("id") }
batching:
count: 100
period: 1s
This pattern:
-
Uses
database_schema.tableas the Redpanda topic name -
Batches messages for efficient delivery
-
Sets the message key to the primary key field (update
json("id")to match your table’s primary key column)
Route to S3
Archive CDC events to S3 for long-term storage and analytics:
input:
microsoft_sql_server_cdc:
connection_string: ${MSSQL_CONN}
include:
- dbo.orders
stream_snapshot: true
pipeline:
processors:
- mapping: |
root.operation = meta("operation")
root.schema = meta("database_schema")
root.table = meta("table")
root.data = this
root.timestamp = now()
output:
aws_s3:
bucket: ${S3_BUCKET}
path: >-
cdc/${! meta("database_schema") }/${! meta("table") }/${! timestamp_unix().format_timestamp("2006/01/02/15") }/${! uuid_v4() }.ndjson
batching:
count: 1000
period: 5m
processors:
- archive:
format: lines
This pattern:
-
Organizes files by database schema, table, and time-based partitions (year/month/day/hour)
-
Batches events and archives them as newline-delimited JSON
-
Uses UUID file names to prevent collisions
Route by event type
Route different event types to different destinations:
input:
microsoft_sql_server_cdc:
connection_string: ${MSSQL_CONN}
include:
- dbo.orders
stream_snapshot: true
output:
switch:
cases:
- check: meta("operation") == "insert"
output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: sqlserver.orders.inserts
- output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: sqlserver.orders.changes
This pattern:
-
Routes
insertevents tosqlserver.orders.inserts, andupdate_before,update_after, anddeleteevents tosqlserver.orders.changes -
Supports specialized downstream consumers per operation type
Configure replication mode
The microsoft_sql_server_cdc input supports two replication modes controlled by the stream_snapshot field:
-
stream_snapshot: true: Captures a full snapshot of existing table data before streaming live changes. Use this when you need a complete initial load. -
stream_snapshot: false: Skips the snapshot and streams only changes from the current LSN position. Use this when you only need new changes going forward.
input:
microsoft_sql_server_cdc:
connection_string: ${MSSQL_CONN}
include:
- dbo.orders
- dbo.customers
stream_snapshot: true
max_parallel_snapshot_tables: 4 (1)
snapshot_max_batch_size: 1000 (2)
| 1 | Number of tables to snapshot in parallel. |
| 2 | Number of rows to read per batch during snapshot processing. |
Filter tables with patterns
The include field accepts regular expressions, so you can capture multiple tables without listing each one:
input:
microsoft_sql_server_cdc:
connection_string: ${MSSQL_CONN}
include:
- dbo.* (1)
exclude:
- dbo.audit_log (2)
stream_snapshot: false
| 1 | Captures all tables in the dbo schema. |
| 2 | Excludes a specific table matched by the include pattern. |
Troubleshoot common issues
Use these steps to diagnose and fix the most common problems with the microsoft_sql_server_cdc input.
No events received
If no events arrive:
-
Verify CDC is enabled on the database and table:
SELECT name, is_cdc_enabled FROM sys.databases WHERE name = 'mydb'; SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name = 'orders'; -
Confirm CDC agent jobs are running:
EXEC sys.sp_cdc_help_jobs; -
Check that the table pattern in
includematches the target table, usingschema.tableformat.
Checkpoint table creation fails
If the pipeline fails to start because it cannot create the checkpoint table:
-
Grant
CREATE TABLEandCREATE PROCEDUREpermissions to the pipeline user, or -
Create the schema manually (
CREATE SCHEMA rpcn) and grant the user permissions on it, or -
Use an external
checkpoint_cacheto avoid creating a table in SQL Server.
Duplicate events
The microsoft_sql_server_cdc input provides at-least-once delivery. If the pipeline fails between checkpoints, events may be re-read on restart. To handle duplicates:
-
Use idempotent processing in downstream systems.
-
Deduplicate using the
lsnmetadata field. -
Lower
checkpoint_limitto reduce the window of possible duplicates.