salesforce_sink

Writes messages to Salesforce, routing each Kafka topic to its own sObject configuration.

Introduced in version 4.85.0.

Consumes batches of messages and writes them to Salesforce. Each message must have a topic field (set by the per-topic processor) and a data field containing the Salesforce record fields. The topic is used to look up the correct topic_mappings entry which defines the sObject, operation, and write mode.

Realtime mode uses the sObject Collections REST API (synchronous, up to 200 records/call). Bulk mode uses the Bulk API 2.0 (asynchronous, polls until complete).

  • Common

  • Advanced

outputs:
  label: ""
  salesforce_sink:
    org_url: "" # No default (required)
    client_id: "" # No default (required)
    client_secret: "" # No default (required)
    api_version: v65.0
    bulk_batch_size: 1000
    max_concurrent_bulk_jobs: 10
    bulk_poll_interval: 5s
    batch_period: 5s
    max_in_flight: 1
    topic_mappings: [] # No default (required)
outputs:
  label: ""
  salesforce_sink:
    org_url: "" # No default (required)
    client_id: "" # No default (required)
    client_secret: "" # No default (required)
    api_version: v65.0
    bulk_batch_size: 1000
    max_concurrent_bulk_jobs: 10
    bulk_poll_interval: 5s
    batch_period: 5s
    max_in_flight: 1
    topic_mappings: [] # No default (required)
    http:
      timeout: 5s
      tls:
        enabled: false
        skip_cert_verify: false
        enable_renegotiation: false
        root_cas: ""
        root_cas_file: ""
        client_certs: []
      proxy_url: ""
      disable_http2: false
      tps_limit: 0
      tps_burst: 1
      backoff:
        initial_interval: 1s
        max_interval: 30s
        max_retries: 3
      tcp:
        connect_timeout: 0s
        keep_alive:
          idle: 15s
          interval: 15s
          count: 9
        tcp_user_timeout: 0s
      http:
        max_idle_conns: 100
        max_idle_conns_per_host: 0
        max_conns_per_host: 64
        idle_conn_timeout: 1m30s
        tls_handshake_timeout: 10s
        expect_continue_timeout: 1s
        response_header_timeout: 0s
        disable_keep_alives: false
        disable_compression: false
        max_response_header_bytes: 1048576
        max_response_body_bytes: 10485760
        write_buffer_size: 4096
        read_buffer_size: 4096
        h2:
          strict_max_concurrent_requests: false
          max_decoder_header_table_size: 4096
          max_encoder_header_table_size: 4096
          max_read_frame_size: 16384
          max_receive_buffer_per_connection: 1048576
          max_receive_buffer_per_stream: 1048576
          send_ping_timeout: 0s
          ping_timeout: 15s
          write_byte_timeout: 0s
      access_log_level: ""
      access_log_body_limit: 0

Fields

api_version

Salesforce REST API version to target, prefixed with v. Affects endpoint paths (/services/data/{api_version}/…​) and available fields/objects. Must be supported by your org — check Setup → Company Information. Older versions may lack recent fields.

Type: string

Default: v65.0

# Examples:
api_version: v65.0

# ---

api_version: v62.0

batch_period

Maximum period to wait before flushing an incomplete batch.

Type: string

Default: 5s

bulk_batch_size

Number of records per bulk job. Also controls the output batch size.

Type: int

Default: 1000

bulk_poll_interval

How often to poll Salesforce for bulk job completion status.

Type: string

Default: 5s

client_id

Client ID for the Salesforce Connected App.

Type: string

client_secret

Client secret for the Salesforce Connected App.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

http

HTTP client configuration for Salesforce REST calls (OAuth token endpoint and, where applicable, data queries).

Type: object

http.access_log_body_limit

Maximum bytes of request/response body to include in logs. 0 to skip body logging.

Type: int

Default: 0

http.access_log_level

Log level for HTTP request/response logging. Empty disables logging.

Type: string

Default: ""

Options: `, `TRACE, DEBUG, INFO, WARN, ERROR

http.backoff

Adaptive backoff configuration for 429 (Too Many Requests) responses. Always active.

Type: object

http.backoff.initial_interval

Initial interval between retries on 429 responses.

Type: string

Default: 1s

http.backoff.max_interval

Maximum interval between retries on 429 responses.

Type: string

Default: 30s

http.backoff.max_retries

Maximum number of retries on 429 responses.

Type: int

Default: 3

http.disable_http2

Disable HTTP/2 and force HTTP/1.1.

Type: bool

Default: false

http.http

HTTP transport settings controlling connection pooling, timeouts, and HTTP/2.

Type: object

http.http.disable_compression

Disable automatic decompression of gzip responses.

Type: bool

Default: false

http.http.disable_keep_alives

Disable HTTP keep-alive connections; each request uses a new connection.

Type: bool

Default: false

http.http.expect_continue_timeout

Maximum time to wait for a server’s 100-continue response before sending the body. 0 means the body is sent immediately.

Type: string

Default: 1s

http.http.h2

HTTP/2-specific transport settings. Only applied when HTTP/2 is enabled.

Type: object

http.http.h2.max_decoder_header_table_size

Upper limit in bytes for the HPACK header table used to decode headers from the peer. Must be less than 4 MiB.

Type: int

Default: 4096

http.http.h2.max_encoder_header_table_size

Upper limit in bytes for the HPACK header table used to encode headers sent to the peer. Must be less than 4 MiB.

Type: int

Default: 4096

http.http.h2.max_read_frame_size

Largest HTTP/2 frame this endpoint will read. Valid range: 16 KiB to 16 MiB.

Type: int

Default: 16384

http.http.h2.max_receive_buffer_per_connection

Maximum flow-control window size in bytes for data received on a connection. Must be at least 64 KiB and less than 4 MiB.

Type: int

Default: 1048576

http.http.h2.max_receive_buffer_per_stream

Maximum flow-control window size in bytes for data received on a single stream. Must be less than 4 MiB.

Type: int

Default: 1048576

http.http.h2.ping_timeout

Timeout waiting for a PING response before closing the connection.

Type: string

Default: 15s

http.http.h2.send_ping_timeout

Idle timeout after which a PING frame is sent to verify connection health. 0 disables health checks.

Type: string

Default: 0s

http.http.h2.strict_max_concurrent_requests

When true, new requests block when a connection’s concurrency limit is reached instead of opening a new connection.

Type: bool

Default: false

http.http.h2.write_byte_timeout

Timeout for writing data to a connection. The timer resets whenever bytes are written. 0 disables the timeout.

Type: string

Default: 0s

http.http.idle_conn_timeout

How long an idle connection remains in the pool before being closed. 0 disables the timeout.

Type: string

Default: 1m30s

http.http.max_conns_per_host

Maximum total connections (active + idle) per host. 0 means unlimited.

Type: int

Default: 64

http.http.max_idle_conns

Maximum total number of idle (keep-alive) connections across all hosts. 0 means unlimited.

Type: int

Default: 100

http.http.max_idle_conns_per_host

Maximum idle connections to keep per host. 0 (the default) uses GOMAXPROCS+1.

Type: int

Default: 0

http.http.max_response_body_bytes

Maximum bytes of response body the client will read. The response body is wrapped with a limit reader; reads beyond this cap return EOF. 0 disables the limit.

Type: int

Default: 10485760

http.http.max_response_header_bytes

Maximum bytes of response headers to allow.

Type: int

Default: 1048576

http.http.read_buffer_size

Size in bytes of the per-connection read buffer.

Type: int

Default: 4096

http.http.response_header_timeout

Maximum time to wait for response headers after writing the full request. 0 disables the timeout.

Type: string

Default: 0s

http.http.tls_handshake_timeout

Maximum time to wait for a TLS handshake to complete. 0 disables the timeout.

Type: string

Default: 10s

http.http.write_buffer_size

Size in bytes of the per-connection write buffer.

Type: int

Default: 4096

http.proxy_url

HTTP proxy URL. Empty string disables proxying.

Type: string

Default: ""

http.tcp

TCP socket configuration.

Type: object

http.tcp.connect_timeout

Maximum amount of time a dial will wait for a connect to complete. Zero disables.

Type: string

Default: 0s

http.tcp.keep_alive

TCP keep-alive probe configuration.

Type: object

http.tcp.keep_alive.count

Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.

Type: int

Default: 9

http.tcp.keep_alive.idle

Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.

Type: string

Default: 15s

http.tcp.keep_alive.interval

Duration between keep-alive probes. Zero defaults to 15s.

Type: string

Default: 15s

http.tcp.tcp_user_timeout

Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.

Type: string

Default: 0s

http.timeout

HTTP request timeout.

Type: string

Default: 5s

http.tls

Custom TLS settings can be used to override system defaults.

Type: object

http.tls.client_certs[]

A list of client certificates to use. For each certificate either the fields cert and key, or cert_file and key_file should be specified, but not both.

Type: object

Default: []

# Examples:
client_certs:
  - cert: foo
    key: bar


# ---

client_certs:
  - cert_file: ./example.pem
    key_file: ./example.key

http.tls.client_certs[].cert

A plain text certificate to use.

Type: string

Default: ""

http.tls.client_certs[].cert_file

The path of a certificate to use.

Type: string

Default: ""

http.tls.client_certs[].key

A plain text certificate key to use.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

http.tls.client_certs[].key_file

The path of a certificate key to use.

Type: string

Default: ""

http.tls.client_certs[].password

A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete pbeWithMD5AndDES-CBC algorithm is not supported for the PKCS#8 format.

Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
password: foo

# ---

password: ${KEY_PASSWORD}

http.tls.enable_renegotiation

Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you’re seeing the error message local error: tls: no renegotiation.

Requires version 3.45.0 or later.

Type: bool

Default: false

http.tls.enabled

Whether custom TLS settings are enabled.

Type: bool

Default: false

http.tls.root_cas

An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

This field contains sensitive information that usually shouldn’t be added to a configuration directly. For more information, see Secrets.

Type: string

Default: ""

# Examples:
root_cas: |-
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----

http.tls.root_cas_file

An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate.

Type: string

Default: ""

# Examples:
root_cas_file: ./root_cas.pem

http.tls.skip_cert_verify

Whether to skip server side certificate verification.

Type: bool

Default: false

http.tps_burst

Maximum burst size for rate limiting.

Type: int

Default: 1

http.tps_limit

Rate limit in requests per second. 0 disables rate limiting.

Type: float

Default: 0

max_concurrent_bulk_jobs

Maximum number of bulk jobs polling concurrently in the background. Each in-flight job buffers its CSV payload in memory. Lower this value if memory usage is a concern.

Type: int

Default: 10

max_in_flight

Maximum number of batches to send concurrently. Increasing this value improves real-time write throughput.

Type: int

Default: 1

org_url

Salesforce instance base URL (for example, https://your-domain.salesforce.com).

Type: string

# Examples:
org_url: https://acme.my.salesforce.com

# ---

org_url: https://acme--staging.sandbox.my.salesforce.com

topic_mappings[]

Per-topic Salesforce write configuration. Each entry maps a Kafka topic to an sObject and write settings.

Type: object

topic_mappings[].all_or_none

Real-time only: rolls back the entire batch if any record fails.

Type: bool

Default: false

topic_mappings[].external_id_field

External ID field name. Required for upsert operations.

Type: string

Default: ""

topic_mappings[].mode

Write mode: realtime (sObject Collections API) or bulk (Bulk API 2.0).

Type: string

Default: realtime

topic_mappings[].operation

Write operation: insert, update, upsert, or delete.

Type: string

Default: upsert

topic_mappings[].sobject

Salesforce sObject API name (for example, Account, Contact, MyObject__c).

Type: string

topic_mappings[].topic

Kafka topic name to match against the message’s topic field.

Type: string