doris_stream_load
Writes batches of messages into Apache Doris using Stream Load.
Introduced in version 4.96.0.
Each output batch is encoded into a single Doris Stream Load request. The output first contacts a Doris frontend (FE) node and follows the Stream Load redirect to a backend (BE) node before uploading the batch body. A batch is only acknowledged when Doris reports success.
Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field max_in_flight.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more in this doc.
-
Common
-
Advanced
outputs:
label: ""
doris_stream_load:
fe_urls: []
database: "" # No default (required)
table: "" # No default (required)
username: "" # No default (required)
password: "" # No default (required)
format: json
read_json_by_line: true
strip_outer_array: false
columns: []
label_prefix: redpanda_connect
timeout: 30s
max_in_flight: 8
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
outputs:
label: ""
doris_stream_load:
url: "" # No default (optional)
fe_urls: []
database: "" # No default (required)
table: "" # No default (required)
username: "" # No default (required)
password: "" # No default (required)
query_port: 9030
format: json
read_json_by_line: true
strip_outer_array: false
jsonpaths: ""
json_root: ""
columns: []
where: ""
column_separator: ,
line_delimiter:
group_commit: ""
max_filter_ratio: 0
partitions: []
temporary_partitions: []
skip_lines: 0
empty_field_as_null: false
trim_double_quotes: false
num_as_string: false
fuzzy_parse: false
label_prefix: redpanda_connect
headers: {}
strict_mode: false
timeout: 30s
max_in_flight: 8
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
Fields
batching
Allows you to configure a batching policy.
Type: object
# Examples:
batching:
byte_size: 5000
count: 0
period: 1s
# ---
batching:
count: 10
period: 1s
# ---
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
batching.byte_size
An amount of bytes at which the batch should be flushed. If 0 disables size based batching.
Type: int
Default: 0
batching.check
A Bloblang query that should return a boolean value indicating whether a message should end a batch.
Type: string
Default: ""
# Examples:
check: this.type == "end_of_transaction"
batching.count
A number of messages at which the batch should be flushed. If 0 disables count based batching.
Type: int
Default: 0
batching.period
A period in which an incomplete batch should be flushed regardless of its size.
Type: string
Default: ""
# Examples:
period: 1s
# ---
period: 1m
# ---
period: 500ms
batching.processors[]
A list of processors to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
Type: processor
# Examples:
processors:
- archive:
format: concatenate
# ---
processors:
- archive:
format: lines
# ---
processors:
- archive:
format: json_array
columns[]
Optional Doris columns header. When set the values are joined with commas.
Type: array
Default: []
empty_field_as_null
Whether Doris should treat empty input fields as NULL.
Type: bool
Default: false
fe_urls[]
A list of Doris FE HTTP URLs. The sink will try these FE endpoints in order with per-request failover, and the starting FE is rotated across requests.
Type: array
Default: []
# Examples:
fe_urls:
- "http://fe1:8030"
- "http://fe2:8030"
group_commit
Optional Doris group_commit mode. Valid values are sync_mode, async_mode, and off_mode. When omitted Doris uses the server default behavior.
Type: string
Default: ""
headers
Optional additional static Stream Load headers. Reserved Doris headers configured by this component take precedence.
Type: string
Default: {}
partitions[]
Optional Doris partitions header. Values are joined with commas.
Type: array
Default: []
password
Doris password.
|
This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets. |
Type: string
query_port
Doris FE MySQL query port, used by ConnectionTest to verify that the target database and table exist. Set to 0 to disable query-port checks.
Type: int
Default: 9030
read_json_by_line
Encode a JSON batch as newline-delimited JSON and set the Doris header read_json_by_line=true.
Type: bool
Default: true
strip_outer_array
Encode a JSON batch as a single JSON array and set the Doris header strip_outer_array=true.
Type: bool
Default: false
temporary_partitions[]
Optional Doris temporary_partitions header. Values are joined with commas.
Type: array
Default: []
trim_double_quotes
Whether Doris should trim double quotes in CSV/text imports.
Type: bool
Default: false
url
Backward-compatible single Doris FE HTTP URL, for example http://fe_host:8030. When fe_urls is provided it takes precedence.
Type: string
# Examples:
url: http://127.0.0.1:8030
Examples
JSON lines from stdin
Read newline-delimited JSON messages and write them into Doris using one Stream Load request per batch.
input:
stdin:
scanner:
lines: {}
output:
doris_stream_load:
url: http://127.0.0.1:8030
database: test_db
table: events
username: root
password: secret
format: json
read_json_by_line: true
columns: [id, name, ts]