Skip to content

Add docs for the changefeed kafka header option #19588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/current/v25.2/changefeed-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,75 @@ CREATE CHANGEFEED INTO 'scheme://sink-URI' WITH updated AS SELECT column, column

For details on syntax and examples, refer to the [Change Data Capture Queries]({% link {{ page.version.version }}/cdc-queries.md %}) page.

### Specify a column as a Kafka header

{% include_cached new-in.html version="v25.2" %} Use the `headers_json_column_name` option to specify a [JSONB]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed emits as Kafka headers for each row’s change event. You can send metadata, such as routing or tracing information, at the protocol level in the header, separate from the message payload. This allows for Kafka brokers or routers to filter the metadata the header contains without deserializing the payload.

Headers enable efficient routing, filtering, and distributed tracing by intermediate systems, such as Kafka brokers, stream processors, or observability tools.

{{site.data.alerts.callout_info}}
The `headers_json_column_name` option is supported with changefeeds emitting to [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}).
{{site.data.alerts.end}}

For example, define a table that updates compliance events. This schema includes a `kafka_meta` column of type `JSONB`, used to store a trace ID and other metadata for the Kafka header:

{% include_cached copy-clipboard.html %}
~~~ sql
CREATE TABLE compliance_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
event_type STRING NOT NULL,
event_timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
details STRING,
kafka_meta JSONB
);
~~~

Insert example rows into the table, populating the `kafka_meta` column with the `JSONB` data. The changefeed will emit this column as Kafka headers alongside the row changes:

{% include_cached copy-clipboard.html %}
~~~ sql
INSERT INTO compliance_events (
user_id, event_type, details, kafka_meta
) VALUES
(gen_random_uuid(), 'policy_ack', 'User accepted data policy v2.1', '{"trace_id": "abc123", "compliance_level": "low"}'),
(gen_random_uuid(), 'access_review', 'Admin approved elevated access for app A', '{"trace_id": "def456", "compliance_level": "high"}'),
(gen_random_uuid(), 'policy_ack', 'User accepted retention policy update', '{"trace_id": "ghi789", "compliance_level": "medium"}'),
(gen_random_uuid(), 'access_review', 'User confirmed access to sensitive dataset', '{"trace_id": "xyz123", "compliance_level": "high"}'),
(gen_random_uuid(), 'policy_ack', 'Policy v3.0 acknowledged by contractor', '{"trace_id": "mno456", "compliance_level": "low"}');
~~~

Create a changefeed that emits messages from the `compliance_events` table to Kafka and specify the `kafka_meta` column using the `headers_json_column_name` option:

{% include_cached copy-clipboard.html %}
~~~ sql
CREATE CHANGEFEED FOR TABLE compliance_events INTO 'kafka://localhost:9092' WITH headers_json_column_name = 'kafka_meta';
~~~

The changefeed will emit each row’s `kafka_meta` data as Kafka headers, which Kafka brokers or stream processors can use to access the metadata without inspecting the payload.

The Kafka topic receives the message payload with the row-level change, excluding the specified header column (`kafka_meta`):

~~~json
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also show the headers that would be received by kafka? rather than showing a query which would simulate it.

perhaps something like

| key | value | headers |
| A    |  {..}     | x=y, z=q|

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if I've covered how you want this — I added in a rendered table to make it easier to read. Lmk if you want a change

{"after": {"details": "User accepted data policy v2.1", "event_id": "ee321dc6-388b-4416-a389-adfafab50ee4", "event_timestamp": "2025-05-09T21:20:29.203923Z", "event_type": "policy_ack", "user_id": "06ba6114-529c-4a99-9811-1dd3d12dad07"}}
{"after": {"details": "User accepted retention policy update", "event_id": "59d391f8-c141-4dc9-9622-9079c3462201", "event_timestamp": "2025-05-09T21:20:29.203923Z", "event_type": "policy_ack", "user_id": "98213553-9c1a-43a6-a598-921c3c6c3b20"}}
{"after": {"details": "Admin approved elevated access for app A", "event_id": "41cf0dbe-c0bc-48aa-9b60-ef343bcef9e1", "event_timestamp": "2025-05-09T21:20:29.203923Z", "event_type": "access_review", "user_id": "ed192798-f7ef-4fe8-a496-f22bb5738b04"}}
. . .
~~~

The Kafka headers will contain:

Key (`event_id`) | Value (Kafka payload) | Headers
------------------+-----------------------+--------
`3e2a9b4a-f1e3-4202-b343-1a52e1ffb0d4` | `{"event_type": "policy_ack", "details": "User accepted data policy v2.1"}` | `trace_id=abc123, compliance_level=low`
`7c90a289-2f91-4666-a8d5-962dc894e1c2` | `{"event_type": "access_review", "details": "Admin approved elevated access for app A"}` | `trace_id=def456, compliance_level=high`
`1a6e0d3f-7191-4d99-9a36-7f4b85e5cd23` | `{"event_type": "policy_ack", "details": "User accepted retention policy update"} `| `trace_id=ghi789, compliance_level=medium`
`89af6b6e-f34d-4a1d-a69d-91d29526e9f7` | `{"event_type": "access_review", "details": "User confirmed access to sensitive dataset"}` | `trace_id=xyz123, compliance_level=high`
`587cf30d-3f17-4942-8a01-f110ef8a5ae3` | `{"event_type": "policy_ack", "details": "Policy v3.0 acknowledged by contractor"}` | `trace_id=mno456, compliance_level=low`

If you would like to filter the table columns that a changefeed emits, refer to the [CDC Queries]({% link {{ page.version.version }}/cdc-queries.md %}) page. To customize the message envelope, refer to the [Changefeed Message Envelope](#message-envelopes) page.
{% comment %}update message envelope link to the new page once PR #19542 is merged{% endcomment %}

## Message formats

{% include {{ page.version.version }}/cdc/message-format-list.md %}
Expand Down
1 change: 1 addition & 0 deletions src/current/v25.2/create-changefeed.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ Option | Value | Description
<a name="format"></a>`format` | `json` / `avro` / `csv` / `parquet` | Format of the emitted message. <br><br>`avro`: For mappings of CockroachDB types to Avro types, [refer-to-the-table]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-types) and detail on [Avro-limitations]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-limitations). **Note:** [`confluent_schema_registry`](#confluent-schema-registry) is required with `format=avro`. <br><br>`csv`: You cannot combine `format=csv` with the [`diff`](#diff) or [`resolved`](#resolved) options. Changefeeds use the same CSV format as the [`EXPORT`](export.html) statement. Refer to [Export-data-with-changefeeds]({% link {{ page.version.version }}/export-data-with-changefeeds.md %}) for details using these options to create a changefeed as an alternative to `EXPORT`. **Note:** [`initial_scan = 'only'`](#initial-scan) is required with `format=csv`. <br><br>`parquet`: Cloud storage is the only supported sink. The [`topic_in_value`](#topic-in-value) option is not compatible with `parquet` format.<br><br>Default: `format=json`.
<a name="full-table-name"></a>`full_table_name` | N/A | Use fully qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases.<br><br>**Note:** This option cannot modify existing table names used as topics, subjects, etc., as part of an [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) statement. To modify a topic, subject, etc., to use a fully qualified table name, create a new changefeed with this option. <br><br>Example: `CREATE CHANGEFEED FOR foo... WITH full_table_name` will create the topic name `defaultdb.public.foo` instead of `foo`.
<a name="gc-protect-expires-after"></a>`gc_protect_expires_after` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Automatically expires protected timestamp records that are older than the defined duration. In the case where a changefeed job remains paused, `gc_protect_expires_after` will trigger the underlying protected timestamp record to expire and cancel the changefeed job to prevent accumulation of protected data.<br><br>Refer to [Protect-Changefeed-Data-from-Garbage-Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data.
<a name="headers-json-column-name"></a><span class="version-tag">New in v25.2:</span>`headers_json_column_name` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specify a [JSONB]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed emits as Kafka headers, separate from the message payload, for each row’s change event. `headers_json_column_name` is supported for Kafka sinks. For more details, refer to [Specify a column as a Kafka header]({% link {{ page.version.version }}/changefeed-messages.md %}#specify-a-column-as-a-kafka-header).
<a name="ignore-disable-changefeed-replication"></a>`ignore_disable_changefeed_replication` | [`BOOL`]({% link {{ page.version.version }}/bool.md %}) | When set to `true`, the changefeed **will emit** events even if CDC filtering for TTL jobs is configured using the `disable_changefeed_replication` [session variable]({% link {{ page.version.version }}/set-vars.md %}), `sql.ttl.changefeed_replication.disabled` [cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}), or the `ttl_disable_changefeed_replication` [table storage parameter]({% link {{ page.version.version }}/row-level-ttl.md %}).<br><br>Refer to [Filter changefeeds for tables using TTL](#filter-changefeeds-for-tables-using-row-level-ttl) for usage details.
<a name="initial-scan"></a>`initial_scan` | `yes`/`no`/`only` | Control whether or not an initial scan will occur at the start time of a changefeed. Only one `initial_scan` option (`yes`, `no`, or `only`) can be used. If none of these are set, an initial scan will occur if there is no [`cursor`](#cursor), and will not occur if there is one. This preserves the behavior from previous releases. With `initial_scan = 'only'` set, the changefeed job will end with a successful status (`succeeded`) after the initial scan completes. You cannot specify `yes`, `no`, `only` simultaneously. <br><br>If used in conjunction with `cursor`, an initial scan will be performed at the cursor timestamp. If no `cursor` is specified, the initial scan is performed at `now()`. <br><br>Although the [`initial_scan` / `no_initial_scan`]({% link {{ page.version.version }}/create-changefeed.md %}#initial-scan) syntax from previous versions is still supported, you cannot combine the previous and current syntax.<br><br>Default: `initial_scan = 'yes'`
<a name="kafka-sink-config"></a>`kafka_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. Set the message file compression type. See [Kafka sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka-sink-configuration) for more detail on configuring all the available fields for this option. <br><br>Example: `CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}'`
Expand Down
Loading