Skip to content

add changefeed doc #21273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: release-8.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions tidb-cloud/serverless-changefeed-overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
---
title: Changefeed
summary: TiDB Cloud changefeed helps you stream data from TiDB Cloud to other data services.
---

# Changefeed (Beta)

TiDB Cloud changefeed helps you stream data from TiDB Cloud to other data services.

> **Note:**
>
> - Currently, you can manage changefeeds only with [TiDB Cloud CLI](/tidb-cloud/get-started-with-cli.md).
> - Currently, TiDB Cloud only allows up to 100 changefeeds per cluster.
> - Currently, TiDB Cloud only allows up to 100 table filter rules per changefeed.

## View the Changefeed page

To access the changefeed feature, using the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed list --cluster-id <cluster-id>
```

## Create a changefeed

To create a changefeed, refer to the tutorials:

- [Sink to Apache Kafka](/tidb-cloud/serverless-changefeed-sink-to-apache-kafka.md)

## Pause or resume a changefeed

To pause a changefeed, using the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed pause --cluster-id <cluster-id> --changefeed-id <changefeed-id>
```

To resume a changefeed, using the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed resume --cluster-id <cluster-id> --changefeed-id <changefeed-id>
```

## Edit a changefeed

> **Note:**
>
> TiDB Cloud currently only allows editing changefeeds in the paused status.

To edit a changefeed sink to kafka, you can pause the changefeed first, and then edit it with the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed edit --cluster-id <cluster-id> --changefeed-id <changefeed-id> --name <newname> --kafka <full-specified-kafka> --filter <full-specified-filter>
```

## Delete a changefeed

To delete a changefeed, using the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed delete --cluster-id <cluster-id> --changefeed-id <changefeed-id>
```

## Changefeed billing

Changefeed feature is free on beta now.

## Changefeed states

The state of a changefeed represents the running state of the changefeed. During the running process, changefeed might fail with errors, be manually paused or resumed. These behaviors can lead to changes of the changefeed state.

The states are described as follows:

- `CREATING`: the changefeed is being created.
- `CREATE_FAILED`: the changefeed creation fails, you need to delete the changefeed and create a new one.
- `RUNNING`: the changefeed runs normally and the checkpoint-ts proceeds normally.
- `PAUSED`: the changefeed is paused.
- `WARNING`: the changefeed returns a warning. The changefeed cannot continue due to some recoverable errors. The changefeed in this state keeps trying to resume until the state transfers to `RUNNING`. The changefeed in this state blocks [GC operations](https://docs.pingcap.com/tidb/stable/garbage-collection-overview).
- `RUNNING_FAILED`: the changefeed fails. Due to some errors, the changefeed cannot resume and cannot be recovered automatically. If the issues are resolved before the garbage collection (GC) of the incremental data, you can manually resume the failed changefeed. The default Time-To-Live (TTL) duration for incremental data is 24 hours, which means that the GC mechanism does not delete any data within 24 hours after the changefeed is interrupted.
247 changes: 247 additions & 0 deletions tidb-cloud/serverless-changefeed-sink-to-apache-kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
---
title: Sink to Apache Kafka
summary: This document explains how to create a changefeed to stream data from TiDB Cloud to Apache Kafka. It includes restrictions, prerequisites, and steps to configure the changefeed for Apache Kafka. The process involves setting up network connections, adding permissions for Kafka ACL authorization, and configuring the changefeed specification.
---

# Sink to Apache Kafka

This document describes how to create a changefeed to stream data from TiDB Cloud to Apache Kafka.

## Restrictions

- For each TiDB Cloud cluster, you can create up to 100 changefeeds.
- Currently, TiDB Cloud does not support uploading self-signed TLS certificates to connect to Kafka brokers.
- Because TiDB Cloud uses TiCDC to establish changefeeds, it has the same [restrictions as TiCDC](https://docs.pingcap.com/tidb/stable/ticdc-overview#unsupported-scenarios).
- If the table to be replicated does not have a primary key or a non-null unique index, the absence of a unique constraint during replication could result in duplicated data being inserted downstream in some retry scenarios.

## Prerequisites

Before creating a changefeed to stream data to Apache Kafka, you need to complete the following prerequisites:

- Set up your network connection
- Add permissions for Kafka ACL authorization

### Network

Ensure that your TiDB cluster can connect to the Apache Kafka service. Currently, TiDB cluster can only connect to Apache Kafka through the Public IP.

> **Note:**
>
> If you want to expose your Apache Kafka through a more secure method, such as private link or VPC peering, please contact us for help. To request it, click **?** in the lower-right corner of the [TiDB Cloud console](https://tidbcloud.com) and click **Request Support**. Then, fill in "Apply for TiDB Cloud Serverless database audit logging" in the **Description** field and click **Submit**.


To provide Public IP access to your Apache Kafka service, assign Public IP addresses to all your Kafka brokers.

### Kafka ACL authorization

To allow TiDB Cloud changefeeds to stream data to Apache Kafka and create Kafka topics automatically, ensure that the following permissions are added in Kafka:

- The `Create` and `Write` permissions are added for the topic resource type in Kafka.
- The `DescribeConfigs` permission is added for the cluster resource type in Kafka.

For example, if your Kafka cluster is in Confluent Cloud, you can see [Resources](https://docs.confluent.io/platform/current/kafka/authorization.html#resources) and [Adding ACLs](https://docs.confluent.io/platform/current/kafka/authorization.html#adding-acls) in Confluent documentation for more information.

## Create a changefeed sink to Apache Kafka with TiDB Cloud CLI

To create a changefeed to stream data from TiDB Cloud to Apache Kafka, using the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed create --cluster-id <cluster-id> --name <changefeed-name> --type KAFKA --kafka <kafka-json> --filter <filter-json> --start-tso <start-tso>
```

- `<cluster-id>`: the ID of the TiDB Cloud cluster that you want to create the changefeed for.
- `<changefeed-name>`: the name of the changefeed, it is optional. If you do not specify a name, TiDB Cloud automatically generates a name for the changefeed.
- `type`: the type of the changefeed, which is `KAFKA` in this case.
- `kafka`: a JSON string that contains the configurations for the changefeed to stream data to Apache Kafka. See [Kafka configurations](#kafka-configurations) for more information about the configurations.
- `filter:` a JSON string that contains the configurations for the changefeed to filter tables and events. See [Filter configurations](#filter-configurations) for more information about the configurations.
- `start-tso`: the TSO from which the changefeed starts to replicate data. If you do not specify a TSO, the current TSO is used by default. To learn more about TSO, see [TSO in TiDB](https://docs.pingcap.com/tidb/stable/tso/).

### Filter configurations

To get a template of `filter` configurations, using the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed template
```

To get the explanation of the template, using the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed template --explain
```

The configurations in the `filter` JSON string are used to filter tables and events that you want to replicate. Below is an example of a `filter` configuration:

<details>
<summary>Example filter configuration</summary>

```json
{
"filterRule": ["test.t1", "test.t2"],
"mode": "IGNORE_NOT_SUPPORT_TABLE",
"eventFilterRule": [
{
"matcher": ["test.t1", "test.t2"],
"ignore_event": ["all dml", "all ddl"]
}
]
}
```
</details>

1. **Filter Rule**: you can set `filter rules` to filter the tables that you want to replicate. See [Table Filter](https://docs.pingcap.com/tidb/stable/table-filter/) for more information about the rule syntax.
2. **Event Filter Rule**: you can set the `matcher` and `ignore_event` to ignore some events matching the rules. See [Event filter rules](https://docs.pingcap.com/tidb/stable/ticdc-filter/#event-filter-rules) to get all the supported event types.
3. **mode**: set mode to `IGNORE_NOT_SUPPORT_TABLE` to ignore the tables that do not support replication, such as the tables that do not have primary keys or unique indexes. set mode to `FORCE_SYNC` to force the changefeed to replicate all tables.

### Kafka configurations

To get a template of `kafka` configurations, using the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed template
```

To get the explanation of the template, using the TiDB Cloud CLI command:

```bash
ticloud serverless changefeed template --explain
```

The configurations in the `kafka` JSON string are used to configure how the changefeed streams data to Apache Kafka. Below is an example of a `kafka` configuration:

<details>
<summary>Example kafka configuration</summary>

```json
{
"network_info": {
"network_type": "PUBLIC"
},
"broker": {
"kafka_version": "VERSION_2XX",
"broker_endpoints": "broker1:9092,broker2:9092",
"tls_enable": false,
"compression": "NONE"
},
"authentication": {
"auth_type": "DISABLE",
"user_name": "",
"password": ""
},
"data_format": {
"protocol": "CANAL_JSON",
"enable_tidb_extension": false,
"avro_config": {
"decimal_handling_mode": "PRECISE",
"bigint_unsigned_handling_mode": "LONG",
"schema_registry": {
"schema_registry_endpoints": "",
"enable_http_auth": false,
"user_name": "",
"password": ""
}
}
},
"topic_partition_config": {
"dispatch_type": "ONE_TOPIC",
"default_topic": "test-topic",
"topic_prefix": "_prefix",
"separator": "_",
"topic_suffix": "_suffix",
"replication_factor": 1,
"partition_num": 1,
"partition_dispatchers": [{
"partition_type": "TABLE",
"matcher": ["*.*"],
"index_name": "index1",
"columns": ["col1", "col2"]
}]
},
"column_selectors": [{
"matcher": ["*.*"],
"columns": ["col1", "col2"]
}]
}
```
</details>

The main configuration fields are as follows:

1. **network_info**: Only `PUBLIC` network type is supported for now. This means that the TiDB cluster can connect to the Apache Kafka service through the Public IP.

2. **broker**: Contains Kafka broker connection information:

- `kafka_version`: The Kafka version, support `VERSION_2XX` and `VERSION_3XX`.
- `broker_endpoints`: Comma-separated list of broker endpoints.
- `tls_enable`: Whether to enable TLS for the connection.
- `compression`: The compression type for messages, support `NONE`, `GZIP`, `LZ4`, `SNAPPY`, and `ZSTD`.

3. **authentication**: Authentication settings for connecting to Kafka, support `DISABLE`, `SASL_PLAIN`, `SASL_SCRAM_SHA_256` and `SASL_SCRAM_SHA_512`. The `user_name` and `password` fields are required if you set the `auth_type` to `SASL_PLAIN`, `SASL_SCRAM_SHA_256`, or `SASL_SCRAM_SHA_512`.

4. **data_format.protocol**: Support `CANAL_JSON`, `AVRO`, and `OPEN_PROTOCOL`.

- Avro is a compact, fast, and binary data format with rich data structures, which is widely used in various flow systems. For more information, see [Avro data format](https://docs.pingcap.com/tidb/stable/ticdc-avro-protocol).
- Canal-JSON is a plain JSON text format, which is easy to parse. For more information, see [Canal-JSON data format](https://docs.pingcap.com/tidb/stable/ticdc-canal-json).
- Open Protocol is a row-level data change notification protocol that provides data sources for monitoring, caching, full-text indexing, analysis engines, and primary-secondary replication between different databases. For more information, see [Open Protocol data format](https://docs.pingcap.com/tidb/stable/ticdc-open-protocol).

5. **data_format.enable_tidb_extension**: if you want to add TiDB-extension fields to the Kafka message body with `AVRO` or `CANAL_JSON` data format.

For more information about TiDB-extension fields, see [TiDB extension fields in Avro data format](https://docs.pingcap.com/tidb/stable/ticdc-avro-protocol#tidb-extension-fields) and [TiDB extension fields in Canal-JSON data format](https://docs.pingcap.com/tidb/stable/ticdc-canal-json#tidb-extension-field).

6. **data_format.avro_config**: If you select **Avro** as your data format, you need to set the Avro-specific configurations:

- `decimal_handling_mode` and `bigint_unsigned_handling_mode`: specify how TiDB Cloud handles the decimal and unsigned bigint data types in Kafka messages.
- `schema_registry`: the schema registry endpoint. If you enable `enable_http_auth`, the fields for user name and password are required.

7. **topic_partition_config.dispatch_type**: Support `ONE_TOPIC`, `BY_TABLE` and `BY_DATABASE`. Controls how the changefeed creates Kafka topics, by table, by database, or creating one topic for all changelogs.

- **Distribute changelogs by table to Kafka Topics**

If you want the changefeed to create a dedicated Kafka topic for each table, set `dispatch_type` to `BY_TABLE`. Then, all Kafka messages of a table are sent to a dedicated Kafka topic. You can customize topic names for tables by setting a `topic_prefix`, a `separator` and between a database name and table name, and a `topic_suffix`. For example, if you set the separator as `_`, the topic names are in the format of `<Prefix><DatabaseName>_<TableName><Suffix>`.

For changelogs of non-row events, such as Create Schema Event, you can specify a topic name in the `default_topic` field. The changefeed will create a topic accordingly to collect such changelogs.

- **Distribute changelogs by database to Kafka Topics**

If you want the changefeed to create a dedicated Kafka topic for each database, set `dispatch_type` to `BY_DATABASE`. Then, all Kafka messages of a database are sent to a dedicated Kafka topic. You can customize topic names of databases by setting a `topic_prefix` and a `topic_suffix`.

For changelogs of non-row events, such as Resolved Ts Event, you can specify a topic name in the `default_topic` field. The changefeed will create a topic accordingly to collect such changelogs.

- **Send all changelogs to one specified Kafka Topic**

If you want the changefeed to create one Kafka topic for all changelogs, set `dispatch_type` to `ONE_TOPIC`. Then, all Kafka messages in the changefeed will be sent to one Kafka topic. You can define the topic name in the `default_topic` field.

> Note
>
> If you use `AVRO` data format, only `BY_TABLE` dispatch type is supported.

8. **topic_partition_config.replication_factor**: controls how many Kafka servers each Kafka message is replicated to. The valid value ranges from [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) to the number of Kafka brokers.

Check warning on line 218 in tidb-cloud/serverless-changefeed-sink-to-apache-kafka.md

View workflow job for this annotation

GitHub Actions / vale

[vale] reported by reviewdog 🐶 [PingCAP.Ambiguous] Consider using a clearer word than 'many' because it may cause confusion. Raw Output: {"message": "[PingCAP.Ambiguous] Consider using a clearer word than 'many' because it may cause confusion.", "location": {"path": "tidb-cloud/serverless-changefeed-sink-to-apache-kafka.md", "range": {"start": {"line": 218, "column": 64}}}, "severity": "INFO"}

9. **topic_partition_config.partition_num**: controls how many partitions exist in a topic. The valid value range is `[1, 10 * the number of Kafka brokers]`.

Check warning on line 220 in tidb-cloud/serverless-changefeed-sink-to-apache-kafka.md

View workflow job for this annotation

GitHub Actions / vale

[vale] reported by reviewdog 🐶 [PingCAP.Ambiguous] Consider using a clearer word than 'many' because it may cause confusion. Raw Output: {"message": "[PingCAP.Ambiguous] Consider using a clearer word than 'many' because it may cause confusion.", "location": {"path": "tidb-cloud/serverless-changefeed-sink-to-apache-kafka.md", "range": {"start": {"line": 220, "column": 59}}}, "severity": "INFO"}

10. **topic_partition_config.partition_dispatchers**: decide which partition a Kafka message will be sent to. `partition_type` Support `TABLE`, `INDEX_VALUE`, `TS` and `COLUMN`.

- **Distribute changelogs by primary key or index value to Kafka partition**

If you want the changefeed to send Kafka messages of a table to different partitions, set `partition_type` to `INDEX_VALUE` and set the `index_name`. The primary key or index value of a row changelog will determine which partition the changelog is sent to. This distribution method provides a better partition balance and ensures row-level orderliness.

- **Distribute changelogs by table to Kafka partition**

If you want the changefeed to send Kafka messages of a table to one Kafka partition, set `partition_type` to `TABLE`. The table name of a row changelog will determine which partition the changelog is sent to. This distribution method ensures table orderliness but might cause unbalanced partitions.

- **Distribute changelogs by timestamp to Kafka partition**

If you want the changefeed to send Kafka messages to different Kafka partitions randomly, set `partition_type` to `TS`. The commitTs of a row changelog will determine which partition the changelog is sent to. This distribution method provides a better partition balance and ensures orderliness in each partition. However, multiple changes of a data item might be sent to different partitions and the consumer progress of different consumers might be different, which might cause data inconsistency. Therefore, the consumer needs to sort the data from multiple partitions by commitTs before consuming.

- **Distribute changelogs by column value to Kafka partition**

If you want the changefeed to send Kafka messages of a table to different partitions, set `partition_type` to `COLUMN` and set the `columns`. The specified column values of a row changelog will determine which partition the changelog is sent to. This distribution method ensures orderliness in each partition and guarantees that the changelog with the same column values is send to the same partition.

For more information about the matching rules, see [Partition dispatchers](https://docs.pingcap.com/tidb/stable/ticdc-sink-to-kafka/#partition-dispatchers).

11. **column_selectors**: columns from events and send only the data changes related to those columns to the downstream.

- `matcher`: specify which tables the column selector applies to. For tables that do not match any rule, all columns are sent.
- `columns`: specify which columns of the matched tables will be sent to the downstream.

For more information about the matching rules, see [Column selectors](https://docs.pingcap.com/tidb/stable/ticdc-sink-to-kafka/#column-selectors).