分类
devops

Real-time Data Integration with Debezium CDC

本文实践Debezium connect实现mysql CDC to mysql.

实现目标为mysql souce instance的example库-----> CDC----> mysql sink的example库.

版本说明

  • Kafka 4.0.0
  • Debezium 3.2.1.Final

一 Prerequisites

  • Kafka HA cluster
  • Kafka connect-distributed cluster with debezium-connector-mysql AND debezium-connector-jdbc plugins 或者也可以直接使用 debezium/connect
  • source mysql instance and mysql sink instance

重要手工创建connect-distributed三个topic

# 手工创建connect-distributed三个topic
(
./bin/kafka-topics.sh --create --topic connect-offsets --bootstrap-server 192.168.68.152:9092 --config cleanup.policy=compact --partitions 10 --replication-factor 3
./bin/kafka-topics.sh --create --topic connect-configs --bootstrap-server 192.168.68.152:9092 --config cleanup.policy=compact --partitions 1  --replication-factor 3
./bin/kafka-topics.sh --create --topic connect-status  --bootstrap-server 192.168.68.152:9092 --config cleanup.policy=compact --partitions 10 --replication-factor 3
)

三 mysql source instance

  • 开启binlog
  • 增加复制用户并授权
Property Expected Value Verification SQL Statement
log_bin ON SHOW VARIABLES LIKE ‘log_bin’;
binlog_format ROW SHOW VARIABLES LIKE ‘binlog_format’;
binlog_row_image FULL SHOW VARIABLES LIKE ‘binlog_row_image’;
-- CREATE USER 'debezium'@'%' IDENTIFIED WITH mysql_native_password BY 'dbz123456';
CREATE USER if not exists 'debezium'@'%' IDENTIFIED BY 'dbz123456';
GRANT SELECT,INSERT,UPDATE,DELETE,DROP,RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT  ON *.* TO 'debezium';
FLUSH PRIVILEGES;

四 mysql sink instance

  • 增加一个用户并授权

五 创建debezium.heartbeat表

create database if not exists debezium default character set utf8mb4 collate utf8mb4_unicode_ci;

CREATE  TABLE IF NOT EXISTS debezium.heartbeat (
  id INT AUTO_INCREMENT,
  last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (id)
);

INSERT INTO debezium.heartbeat(id) VALUES (1);



## 参考 https://gist.github.com/CHERTS/2735a4db93ffa18ee997bfb627a208f2

六 发布 mysql-source-example

Debezium connector for MySQL

七 发布 jdbc-sink-example

注意这俩参数略有不同

八 Schema Sync?

定时Schema同步?

有一定的局限性
1 挂掉后sink将报错
2 DROP表造成DML数据无法被消费

九 confluent hub

# https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
# https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html
wget https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.4/confluentinc-kafka-connect-jdbc-10.8.4.zip

十 debezium 自动创建topic的分区数和副本数如何设置?

{
    ...

    "topic.creation.default.replication.factor": 3,
    "topic.creation.default.partitions": 10,
    "topic.creation.default.cleanup.policy": "compact",
    "topic.creation.default.compression.type": "lz4"

     ...
}
## You can include any Kafka topic-level configuration property in the configuration for the default group.


Customization of Kafka Connect automatic topic creation