- 
                Notifications
    You must be signed in to change notification settings 
- Fork 503
Real Time MySQL Monitoring with Debezium and AutoMQ
In modern enterprises, the growing demand for data processing makes real-time monitoring and response to database changes increasingly important. Whether it is order processing and inventory management on e-commerce platforms or transaction monitoring in financial systems, the ability to capture and handle database changes in real-time is crucial. This not only improves system response speed but also allows timely business operations when data changes occur.
To achieve this goal, this article will introduce how to use Debezium [1] to monitor changes in MySQL databases and send these change events to AutoMQ [2]. AutoMQ, as an efficient and cost-effective stream processing system, offers high elasticity and availability, making it an ideal choice for real-time data processing in enterprises. In this manner, enterprises can implement features such as order changes, inventory changes, and set up alerting rules for automated monitoring. Downstream services can consume these messages to grasp database changes in real-time, respond to business needs promptly, optimize system performance, and ensure business continuity and stability.
AutoMQ is a cloud-native redesigned stream processing system that remains 100% compatible with Apache Kafka while significantly enhancing cost efficiency and elasticity by offloading storage to object storage. Specifically, AutoMQ achieves this by building its stream repository, S3Stream, on S3, utilizing shared cloud storage provided by cloud providers such as EBS and S3, thus offering low-cost, low-latency, highly available, durable, and virtually infinite-capacity stream storage.
Compared to traditional Shared Nothing architecture, AutoMQ adopts a Shared Storage architecture, significantly reducing storage and maintenance complexity while improving system elasticity and reliability. AutoMQ's design philosophy and technical advantages make it an ideal choice for replacing existing Kafka clusters in enterprises. By adopting AutoMQ, enterprises can significantly reduce storage costs, simplify operations and maintenance, and achieve automatic scaling and traffic balancing, thereby responding more efficiently to changing business demands. Furthermore, AutoMQ’s architecture supports efficient cold-read operations and zero-downtime service, ensuring stable operation under high loads and sudden traffic spikes. The storage structure of AutoMQ is as follows:
Debezium is an open-source project that provides a low-latency streaming platform for Change Data Capture (CDC). By installing and configuring Debezium, you can monitor database changes and convert these change events into Kafka messages. Debezium supports various databases as data sources, including MySQL, PostgreSQL, and MongoDB, ensuring that only committed changes are visible, so applications do not need to worry about transactions or rollbacks. Additionally, because Debezium uses a persistent, replicated log to record the history of database data changes, your application can stop and restart at any time without missing the events that occurred while it was not running, ensuring that all events are correctly and fully processed.
Debezium leverages the persistence, durability, and fault tolerance of Kafka and Kafka Connect. Each connector monitors an upstream database server, captures all database changes, and records them into Kafka Topics. This way, multiple clients can independently consume the same data change events with minimal impact on the upstream database. Common use cases for Debezium include cache invalidation, simplifying monolithic applications, shared databases, and data integration. Through Debezium, enterprises can achieve real-time monitoring and processing of database changes, addressing various business scenarios such as real-time data synchronization and event-driven architectures. Its architecture diagram is as follows:
- 
An available Docker environment. 
- 
An available AutoMQ node to receive the event messages for data changes. 
- 
MySQL database with the binlog feature enabled. 
- 
Available Kafka Connect service that can connect to AutoMQ nodes. 
- 
Register the Debezium MySQL plugin with Kafka Connect to monitor and transform data changes. 
You can refer to the AutoMQ official documentation for setup: [Quick Start | AutoMQ] [5]. You will obtain the service access address for AutoMQ, such as 192.168.123.41:9092, and then connect to AutoMQ through Kafka Connect.
You can quickly deploy and configure MySQL using Docker, utilizing the official Debezium provided image, which contains some initial database tables to simplify the deployment process. A container named "mysql" can be created with the following command:
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:2.7
- 
MYSQL_ROOT_PASSWORD: Sets the password for the root user. 
- 
MYSQL_USER and MYSQL_PASSWORD: Sets the username and password for a regular user. 
Connecting to MySQL Client as a Regular User:
docker exec -it mysql mysql -u mysqluser -pmysqlpw
Verify data through the command line tool and view all current database tables:
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| inventory          |
| performance_schema |
+--------------------+
mysql> use inventory;
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
Pull the Kafka Connect image and start the container using the following command. Be sure to specify the AutoMQ service address:
docker run -it --rm --name connect -p 8083:8083 \
  -e GROUP_ID=1 \
  -e CONFIG_STORAGE_TOPIC=my_connect_configs \
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
  -e STATUS_STORAGE_TOPIC=my_connect_statuses \
  -e BOOTSTRAP_SERVERS=192.168.123.41:9092 \
  --link mysql:mysql quay.io/debezium/connect:2.7
Parameter Description:
| GROUP_ID | The group ID for the Kafka Connect cluster. | 
|---|---|
| CONFIG_STORAGE_TOPIC | The AutoMQ Topic used for storing connector configurations. | 
| OFFSET_STORAGE_TOPIC | AutoMQ Topic for storing connector offsets. | 
| STATUS_STORAGE_TOPIC | AutoMQ Topic for storing connector states. | 
| --link mysql:mysql | Connected to a container named `mysql`. | 
| -e BOOTSTRAP_SERVERS=192.168.123.41:9092 | Specify the AutoMQ node address. | 
If the connection fails, please check if the AutoMQ and MySQL services have successfully started, and verify if the open address is correct.
The Debezium MySQL connector acts as a plugin for the Kafka Connector to monitor data changes in MySQL databases. Here, you can register the MySQL connector using the curl command by executing the following command:
# Navigate into a directory and create a file
cd /home
vim mysql-connector.json
The JSON file content is:
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "192.168.123.41:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory"
  }
}
Use the following command to submit the connector configuration file to Kafka Connect:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @mysql-connector.json http://localhost:8083/connectors/
The successful response content is as follows:
HTTP/1.1 201 Created
Date: Mon, 05 Aug 2024 01:51:43 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 518
Server: Jetty(9.4.53.v20231009)
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","topic.prefix":"dbserver1","database.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"192.168.123.41:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","name":"inventory-connector-new"},"tasks":[],"type":"source"}
Verify if the Kafka Connector can capture changes by performing data insertions, deletions, and updates in the MySQL console:
-- insert
INSERT INTO customers (first_name, last_name, email) VALUES ('John', 'Doe', '[email protected]');
-- update
UPDATE customers SET email='[email protected]' WHERE first_name='John' AND last_name='Doe';
-- delete
DELETE FROM customers WHERE first_name='John' AND last_name='Doe';
Due to the Kafka Connector's logs being less obvious, you can choose a more evident way to verify data change capture: by checking the Topic data in AutoMQ to confirm successful capture. Validation of Topic data can be done through scripts or GUI-based monitoring tools. Refer to the following content for detailed operations.
Obtain the AutoMQ project binary package: [GitHub Releases][6]. After extracting, execute the script command in the root directory of the project. This command will capture the changes in the customers table data:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.123.41:9092 --topic dbserver1.inventory.customers --from-beginning
Note the replacement of the AutoMQ service address.
The result is as follows:
You can view the status of AutoMQ nodes using various GUI-based management tools, such as [Redpanda Console][7] and [Kafdrop][8]. Using Redpanda Console as an example, you can view all Topics data and detailed information of the current AutoMQ nodes. For specific Redpanda Console deployment instructions, refer to the AutoMQ official documentation: [Redpanda Console | AutoMQ][9].
You can see that all database tables monitored by the connector are displayed, along with the connector's configuration files, offsets, and each table corresponds to a Topic.
You can view detailed information about data changes, such as updates to the `customers` table.
You can verify data capture scenarios by using more customized SQL data updates.
You can quickly clean up the Docker environment by executing the following command:
docker stop mysql connect
Since the --rm parameter is specified at startup, the container will be deleted once it stops.
Through the introduction in this article, we explored how to use [Debezium] to monitor MySQL database changes and send these change events to AutoMQ for processing. By deploying MySQL and Kafka Connect, and configuring the Debezium MySQL connector, enterprises can achieve real-time monitoring and processing of database changes, meeting business requirements such as order changes and inventory management. The efficiency and elasticity of AutoMQ, coupled with Debezium's low latency and reliability, make it an ideal choice for real-time data processing in enterprises. For more scalability in listening to data change events, refer to: Debezium [10].
[1] Debezium: https://debezium.io/
[2] AutoMQ: https://www.automq.com/
[3] Kafka Connect: https://docs.confluent.io/platform/current/connect/index.html
[4] Debezium Structure: https://docs.redhat.com/zh_hans/documentation/red_hat_integration/2023.q2/html/debezium_user_guide/description-of-debezium-architecture
[5] Quick Start AutoMQ: https://docs.automq.com/en/automq/getting-started
[6] Github Release: https://github.com/AutoMQ/automq/releases
[7] Redpanda Console: https://redpanda.com/redpanda-console-kafka-ui
[8] Kafdrop: https://github.com/obsidiandynamics/kafdrop
[9] Redpanda Console | AutoMQ: https://docs.automq.com/zh/automq/integrations/kafka-ui/redpanda-console
[10] Debezium: https://debezium.io/
- What is automq: Overview
- Difference with Apache Kafka
- Difference with WarpStream
- Difference with Tiered Storage
- Compatibility with Apache Kafka
- Licensing
- Deploy Locally
- Cluster Deployment on Linux
- Cluster Deployment on Kubernetes
- Example: Produce & Consume Message
- Example: Simple Benchmark
- Example: Partition Reassignment in Seconds
- Example: Self Balancing when Cluster Nodes Change
- Example: Continuous Data Self Balancing
- Architecture: Overview
- S3stream shared streaming storage
- Technical advantage
- Deployment: Overview
- Runs on Cloud
- Runs on CEPH
- Runs on CubeFS
- Runs on MinIO
- Runs on HDFS
- Configuration
- Data analysis
- Object storage
- Kafka ui
- Observability
- Data integration




