Stream data to BigQuery with the Storage Write API
The gcp_bigquery_write_api output streams messages into BigQuery using the Storage Write API, which offers higher throughput and lower latency than the legacy streaming API or load jobs. This cookbook provides reusable patterns for streaming inserts, change data capture (CDC) upserts and deletes, automatic table creation, multi-table routing, and throughput tuning.
Use this cookbook to:
-
Find patterns for streaming JSON into BigQuery
-
Look up CDC upsert and delete patterns for BigQuery
-
Identify auto-create, multi-table routing, and throughput-tuning patterns
Prerequisites
Before using these patterns, ensure you have the following configured:
Redpanda CLI
Install the Redpanda CLI (rpk) to run Redpanda Connect. See Get Started with Redpanda Connect using rpk for installation instructions.
Enterprise license
The gcp_bigquery_write_api output is an enterprise component. To use it, you need a Redpanda Connect Enterprise license. See Enterprise Licensing.
BigQuery dataset and permissions
You need a Google Cloud project with a BigQuery dataset, and a service account with permission to write to (and, for the auto-create pattern, create) tables in that dataset. The BigQuery Data Editor (roles/bigquery.dataEditor) role covers both.
The output loads credentials from the standard Application Default Credentials chain. You can also supply a service account key directly with the credentials_json field.
Environment variables
The examples in this cookbook use environment variables to keep configuration separate from credentials.
export GCP_PROJECT=my-project (1)
export BQ_DATASET=analytics (2)
export BQ_TABLE=events (3)
export REDPANDA_BROKERS=localhost:9092 (4)
export SOURCE_TOPIC=events (5)
| 1 | The Google Cloud project ID that owns the dataset. |
| 2 | The BigQuery dataset that holds the destination tables. |
| 3 | The destination table for single-table examples. |
| 4 | The Redpanda broker addresses for the source topic. |
| 5 | The Redpanda topic to consume from. |
Stream JSON to BigQuery
The simplest pattern consumes JSON messages from a Redpanda topic and streams them into a single table using the default stream, which provides at-least-once delivery with the lowest latency:
input:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topics: ["${SOURCE_TOPIC}"]
consumer_group: bigquery_sink
pipeline:
processors:
# BigQuery's proto3 JSON mapping encodes int64/uint64 as strings, so
# convert integer columns to strings before writing.
- mapping: |
root = this
root.id = this.id.string()
output:
gcp_bigquery_write_api:
project: ${GCP_PROJECT}
dataset: ${BQ_DATASET}
table: ${BQ_TABLE}
message_format: json
write_mode: default_stream
batching:
count: 1000
period: 5s
|
The proto3 JSON mapping encodes |
For the full list of message formats and connection options, see the connector reference.
Guarantee exactly-once delivery per batch
To commit each batch atomically with exactly-once semantics, set write_mode to pending_stream. Each batch is written to a per-batch pending stream that commits as a single unit:
input:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topics: ["${SOURCE_TOPIC}"]
consumer_group: bigquery_sink
pipeline:
processors:
# Project only the columns the table expects and convert the integer id to
# a string for BigQuery's proto3 JSON mapping.
- mapping: |
root.id = this.id.string()
root.event = this.event
output:
gcp_bigquery_write_api:
project: ${GCP_PROJECT}
dataset: ${BQ_DATASET}
table: ${BQ_TABLE}
message_format: json
write_mode: pending_stream
batching:
count: 5000
period: 10s
Exactly-once applies within a single committed batch. Tune batch size to balance latency against commit overhead. See Message Batching.
Create the destination table automatically
Set auto_create_table to true to have Redpanda Connect create the table if it does not exist, using the supplied schema. You can also define partitioning and clustering for the new table:
input:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topics: ["${SOURCE_TOPIC}"]
consumer_group: bigquery_sink
pipeline:
processors:
# Shape each message to match the table schema. The id column is INTEGER,
# so send it as a string for BigQuery's proto3 JSON mapping.
- mapping: |
root.id = this.id.string()
root.name = this.name
root.created_at = this.created_at
output:
gcp_bigquery_write_api:
project: ${GCP_PROJECT}
dataset: ${BQ_DATASET}
table: ${BQ_TABLE}
message_format: json
auto_create_table: true
schema:
- name: id
type: INTEGER
mode: REQUIRED
- name: name
type: STRING
- name: created_at
type: TIMESTAMP
mode: REQUIRED
time_partitioning:
type: DAY
field: created_at
clustering:
- id
batching:
count: 1000
period: 5s
Each schema column accepts a name, type, and optional mode (NULLABLE, REQUIRED, or REPEATED). Partitioning and clustering apply only when the table is created.
Apply CDC upserts
To keep a BigQuery table in sync with a changing source, use change data capture (CDC) upserts. Set write_mode to upsert, declare the primary_keys, and provide a change_type expression. BigQuery injects the _CHANGE_TYPE pseudo-column for each row:
input:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topics: ["${SOURCE_TOPIC}"]
consumer_group: bigquery_sink
pipeline:
processors:
# Stringify the integer primary key for BigQuery's proto3 JSON mapping.
- mapping: |
root = this
root.id = this.id.string()
output:
gcp_bigquery_write_api:
project: ${GCP_PROJECT}
dataset: ${BQ_DATASET}
table: ${BQ_TABLE}
message_format: json
write_mode: upsert
primary_keys:
- id
change_type: '${! "UPSERT" }'
auto_create_table: true
schema:
- name: id
type: INTEGER
mode: REQUIRED
- name: name
type: STRING
- name: updated_at
type: TIMESTAMP
batching:
count: 1000
period: 5s
In upsert mode, change_type must resolve to UPSERT for every row. The target table must have a primary key: when auto_create_table is true, primary_keys defines it; for a pre-existing table, the output falls back to the PRIMARY KEY declared on the table. Up to 16 columns are supported, and composite keys are matched in the order listed.
CDC modes write through the default stream, as required by BigQuery, so they provide at-least-once delivery rather than the per-batch exactly-once semantics of pending_stream.
|
Handle inserts, updates, and deletes
To propagate deletes as well as upserts, set write_mode to upsert_delete and resolve change_type to either UPSERT or DELETE per row (case-insensitive). The following example derives the change type from a source operation field in a mapping processor, stores it in metadata, and references it from the output. It also uses change_sequence_number to control ordering when rows for the same key can arrive out of order:
input:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topics: ["${SOURCE_TOPIC}"]
consumer_group: bigquery_sink
pipeline:
processors:
# Derive the CDC change type and sequence number from the source event.
# This example assumes a Debezium-style envelope with an "op" field
# ("c"/"u" for upsert, "d" for delete), the row in "after", and a log
# sequence number in "lsn".
- mapping: |
root = this.after
root.id = this.after.id.string()
meta change_type = if this.op == "d" { "DELETE" } else { "UPSERT" }
meta change_seq = this.lsn.string()
output:
gcp_bigquery_write_api:
project: ${GCP_PROJECT}
dataset: ${BQ_DATASET}
table: ${BQ_TABLE}
message_format: json
write_mode: upsert_delete
primary_keys:
- id
change_type: '${! metadata("change_type") }'
# Order changes by a monotonic sequence number so a late-arriving older
# change cannot overwrite a newer row.
change_sequence_number: '${! metadata("change_seq") }'
batching:
count: 1000
period: 5s
When change_sequence_number is unset, BigQuery resolves ordering by arrival time. Set it from a monotonic source value (such as a log sequence number) so that a late-arriving older change cannot overwrite a newer one.
Route to multiple tables
The table field supports interpolation, so you can route messages to different tables based on their content or metadata. The table name is resolved from the first message in each batch, so batch by the routing key to keep each batch single-table:
input:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topics: ["${SOURCE_TOPIC}"]
consumer_group: bigquery_sink
pipeline:
processors:
# Stringify the integer id for BigQuery's proto3 JSON mapping.
- mapping: |
root = this
root.id = this.id.string()
# Split each incoming batch into single-table batches keyed by event_type.
# The output resolves the table from the first message in each batch, so
# every batch must contain rows for one table only.
- group_by_value:
value: '${! json("event_type") }'
output:
gcp_bigquery_write_api:
project: ${GCP_PROJECT}
dataset: ${BQ_DATASET}
table: '${! json("event_type") }'
message_format: json
# No batching block here: this preserves the single-table batches produced
# by group_by_value. Adding an output batching policy would re-mix tables.
max_in_flight: 8
max_cached_streams: 1024
When routing across many tables, raise max_cached_streams so the output can keep a write stream open per active table.
Tune batching and throughput
Throughput depends on batch size and the number of concurrent in-flight requests. Increase max_in_flight to send more batches in parallel, and size batches with a batching policy:
input:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topics: ["${SOURCE_TOPIC}"]
consumer_group: bigquery_sink
pipeline:
processors:
# Stringify the integer id for BigQuery's proto3 JSON mapping.
- mapping: |
root = this
root.id = this.id.string()
output:
gcp_bigquery_write_api:
project: ${GCP_PROJECT}
dataset: ${BQ_DATASET}
table: ${BQ_TABLE}
message_format: json
write_mode: default_stream
# Send more batches concurrently for higher throughput.
max_in_flight: 16
# Larger batches reduce per-request overhead at the cost of latency.
batching:
count: 10000
byte_size: 5000000
period: 10s
Troubleshoot
Writes fail with an unmarshalling error
BigQuery rejects integer fields supplied as JSON numbers. Ensure int64 and uint64 values are encoded as strings, for example "count": "42". Use a mapping processor to convert numeric fields to strings before the output if your source emits them as numbers.
Schema resolution times out
The output fetches table metadata before writing. If you route to many tables or a backend is slow, increase schema_resolve_timeout (default 15s). On the auto_create_table path this budget also covers table creation, so allow extra headroom.