MongoDB CDC Patterns
The mongodb_cdc input captures document-level changes from MongoDB collections using change streams. Use these patterns to filter, transform, and route MongoDB CDC events to Redpanda, S3, and other destinations.
Use this cookbook to:
-
Apply reusable patterns for capturing MongoDB CDC events
-
Adapt patterns to route CDC data to Redpanda, S3, and other destinations
-
Identify patterns for filtering and transforming change events
Prerequisites
Before using these patterns, configure the following.
Redpanda CLI
Install the Redpanda CLI (rpk) to run Redpanda Connect. See rpk installation for installation instructions.
MongoDB replica set
The mongodb_cdc input requires a MongoDB database running as a replica set or in a sharded cluster. Change streams are not available on standalone MongoDB instances.
To verify your deployment is running as a replica set:
rs.status()
For cloud-managed MongoDB, MongoDB Atlas uses replica sets by default.
Checkpoint cache
The mongodb_cdc input requires a cache to store the oplog position between restarts. The examples in this cookbook use a Redis cache:
export REDIS_URL=redis://localhost:6379
For production use, any persistent cache supported by Redpanda Connect works, such as aws_dynamodb or postgres.
Environment variables
The examples in this cookbook use environment variables for configuration:
export MONGODB_URL="mongodb://user:password@localhost:27017" (1)
export MONGODB_DB="mydb" (2)
export REDIS_URL=redis://localhost:6379 (3)
export REDPANDA_BROKERS=localhost:9092 (4)
export S3_BUCKET=cdc-archive (5)
| 1 | The MongoDB connection string. |
| 2 | The name of the database to capture changes from. |
| 3 | The Redis URL for checkpoint storage. |
| 4 | The Redpanda broker addresses (for Redpanda output examples). |
| 5 | The S3 bucket name (for S3 output examples). |
Capture CDC events
The simplest pattern captures all change events from MongoDB collections and outputs them with metadata:
input:
mongodb_cdc:
url: ${MONGODB_URL}
database: ${MONGODB_DB}
collections:
- orders
checkpoint_cache: redis_cache
stream_snapshot: true
pipeline:
processors:
- mapping: |
root.operation = meta("operation")
root.collection = meta("collection")
root.operation_time = meta("operation_time")
root.data = this
root.timestamp = now()
output:
stdout:
codec: lines
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
For details on the CDC event message structure and available metadata fields, see the metadata section in the connector reference.
Filter CDC events
Filter events to process only specific change types using the operation metadata field:
input:
mongodb_cdc:
url: ${MONGODB_URL}
database: ${MONGODB_DB}
collections:
- orders
checkpoint_cache: redis_cache
stream_snapshot: false
pipeline:
processors:
- mapping: |
root = if meta("operation") == "delete" || meta("operation") == "read" {
deleted()
}
- mapping: |
root.operation = meta("operation")
root.collection = meta("collection")
root.data = this
root.timestamp = now()
output:
stdout:
codec: lines
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
This pattern:
-
Filters to only
insert,update, andreplaceoperations, droppingdeleteandreadevents -
Transforms the event to a simplified format with a timestamp
Route to Redpanda
Stream MongoDB changes to Redpanda for real-time processing:
input:
mongodb_cdc:
url: ${MONGODB_URL}
database: ${MONGODB_DB}
collections:
- orders
- customers
checkpoint_cache: redis_cache
stream_snapshot: true
pipeline:
processors:
- mapping: |
meta topic = meta("collection")
output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: ${! meta("topic") }
key: ${! json("_id") }
batching:
count: 100
period: 1s
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
This pattern:
-
Uses the collection name as the Redpanda topic
-
Batches messages for efficient delivery
-
Sets the message key to the document’s
_idfield
Route to S3
Archive CDC events to Amazon S3 for long-term storage and analytics:
input:
mongodb_cdc:
url: ${MONGODB_URL}
database: ${MONGODB_DB}
collections:
- orders
checkpoint_cache: redis_cache
stream_snapshot: true
pipeline:
processors:
- mapping: |
root.operation = meta("operation")
root.collection = meta("collection")
root.data = this
root.timestamp = now()
output:
aws_s3:
bucket: ${S3_BUCKET}
path: >-
cdc/${! meta("collection") }/${! timestamp_unix().format_timestamp("2006/01/02/15") }/${! uuid_v4() }.ndjson
batching:
count: 1000
period: 5m
processors:
- archive:
format: lines
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
This pattern:
-
Organizes files by collection and time-based partitions (year/month/day/hour)
-
Batches events and archives them as newline-delimited JSON
-
Uses UUID file names to prevent collisions
Route by event type
Route different event types to different destinations:
input:
mongodb_cdc:
url: ${MONGODB_URL}
database: ${MONGODB_DB}
collections:
- orders
checkpoint_cache: redis_cache
stream_snapshot: false
output:
switch:
cases:
- check: meta("operation") == "insert"
output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: mongodb.orders.inserts
- output:
redpanda:
seed_brokers:
- ${REDPANDA_BROKERS}
topic: mongodb.orders.changes
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
This pattern:
-
Routes
insertevents to one Redpanda topic and all other change events to another -
Supports specialized downstream consumers per operation type
Configure replication mode
The mongodb_cdc input supports two replication modes controlled by the stream_snapshot field:
-
stream_snapshot: true: Captures a full snapshot of existing collection data before streaming live changes. Use this when you need a complete initial load. -
stream_snapshot: false: Skips the snapshot and streams only changes from the current oplog position. Use this when you only need new changes going forward.
input:
mongodb_cdc:
url: ${MONGODB_URL}
database: ${MONGODB_DB}
collections:
- orders
checkpoint_cache: redis_cache
stream_snapshot: true
snapshot_parallelism: 4 (1)
cache_resources:
- label: redis_cache
redis:
url: ${REDIS_URL}
| 1 | Number of parallel connections to use when reading the snapshot. |
| If the pipeline restarts during a snapshot, Redpanda Connect must restart the snapshot from the beginning to record a fresh oplog position in the checkpoint cache. |
Troubleshoot common issues
Use these steps to diagnose and fix the most common problems with the mongodb_cdc input.
No events received
If no events arrive:
-
Verify the MongoDB deployment is a replica set:
rs.status() -
Grant the required privileges to the user:
db.grantRolesToUser("your_user", [ { role: "read", db: "mydb" }, { role: "clusterMonitor", db: "admin" } ]) -
Check that the collection names in
collectionsexist in the specifieddatabase.
Pipeline restarts lose position
If the pipeline restarts and replays events from the beginning:
-
Verify the checkpoint cache is persistent and accessible.
-
Check that
checkpoint_keyis consistent across restarts (default:mongodb_cdc_checkpoint). -
Use a durable cache backend such as Redis with persistence enabled,
aws_dynamodb, orpostgres.
Duplicate events
The mongodb_cdc input provides at-least-once delivery. If the pipeline fails between checkpoints, events may be re-read on restart. To handle duplicates:
-
Use idempotent processing in downstream systems.
-
Deduplicate using the
operation_timemetadata field. -
Lower
checkpoint_limitto reduce the window of possible duplicates.