本文实践Debezium connect
实现mysql CDC to mysql.
实现目标为mysql souce instance的example库-----> CDC----> mysql sink的example库
.
版本说明
- Kafka 4.0.0
- Debezium 3.2.1.Final
一 Prerequisites
- Kafka HA cluster
- Kafka connect-distributed cluster with debezium-connector-mysql AND debezium-connector-jdbc plugins 或者也可以直接使用 debezium/connect
- source mysql instance and mysql sink instance
二 重要手工创建connect-distributed三个topic
# 手工创建connect-distributed三个topic
(
./bin/kafka-topics.sh --create --topic connect-offsets --bootstrap-server 192.168.68.152:9092 --config cleanup.policy=compact --partitions 10 --replication-factor 3
./bin/kafka-topics.sh --create --topic connect-configs --bootstrap-server 192.168.68.152:9092 --config cleanup.policy=compact --partitions 1 --replication-factor 3
./bin/kafka-topics.sh --create --topic connect-status --bootstrap-server 192.168.68.152:9092 --config cleanup.policy=compact --partitions 10 --replication-factor 3
)
三 mysql source instance
- 开启binlog
- 增加复制用户并授权
Property | Expected Value | Verification SQL Statement |
---|---|---|
log_bin | ON | SHOW VARIABLES LIKE ‘log_bin’; |
binlog_format | ROW | SHOW VARIABLES LIKE ‘binlog_format’; |
binlog_row_image | FULL | SHOW VARIABLES LIKE ‘binlog_row_image’; |
-- CREATE USER 'debezium'@'%' IDENTIFIED WITH mysql_native_password BY 'dbz123456';
CREATE USER if not exists 'debezium'@'%' IDENTIFIED BY 'dbz123456';
GRANT SELECT,INSERT,UPDATE,DELETE,DROP,RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';
FLUSH PRIVILEGES;
四 mysql sink instance
- 增加一个用户并授权
五 创建debezium.heartbeat表
create database if not exists debezium default character set utf8mb4 collate utf8mb4_unicode_ci;
CREATE TABLE IF NOT EXISTS debezium.heartbeat (
id INT AUTO_INCREMENT,
last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
INSERT INTO debezium.heartbeat(id) VALUES (1);
## 参考 https://gist.github.com/CHERTS/2735a4db93ffa18ee997bfb627a208f2
六 发布 mysql-source-example
七 发布 jdbc-sink-example
io.debezium.connector.jdbc.JdbcSinkConnector
自动建表? 不能 Debezium connector for JDBCio.confluent.connect.jdbc.JdbcSinkConnector
自动建表? 能 JDBC Source and Sink Connector for Confluent Platform
注意这俩参数略有不同
八 Schema Sync?
定时Schema同步?
有一定的局限性
1 挂掉后sink将报错
2 DROP表造成DML数据无法被消费
九 confluent hub
# https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
# https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html
wget https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.4/confluentinc-kafka-connect-jdbc-10.8.4.zip