分类
devops

debezium总结

debezium有几种方式

部署debezium有几种方式
选择一:依托于kafka connect
选择二:Debezium Server
选择三:Embed Debezium
选择四:Debezium on Kubernetes

侧重点

debezium侧重于 CDC,就是souce端的收集,比如把mysql放到kafka中,那么后续的怎么再从kafka sink 到其他地方,比如 OLAP数据库,比如Elasticsearch,再比如还sink回相同的DB

大概有三种

A:source DB —> kafka —> sink(schemaless DB,比如Elasticsearch)
B:source DB —> kafka —> sink(schema DB)
C:source DB —> kafka —> sink(stream computing)

这里B又分为两种,

B1、sink和source是相同的DB,比如mysql–>mysql
B2、sink和source是不同的DB,比如mysql–>postgresql、mysql–> Apache Doris

数据库的CDC是有顺序的消息,所以 connect-mysql-source.properties 中的配置 tasks.max=1 只能是1,即使是分布式集群部署,这里也是一个。

因为CDC这一端是有顺序的消息

Debezium连接器默认行为是将每个更改事件记录发送到一个用数据库的名称+表名构成的kafka主题。这是个最小的粒度。

那么后续进入kafka,怎么顺序消费要看souce数据有没有强制的表依赖(比如显性的外键依赖),如果有外键依赖的表先被消费了那么就会报错,此时就不能并行的去消费多张表了

目前想到的办法

1、(或者)一个是规避这种强外键依赖的表
2、(或者)将有外键依赖的这种表路由到相同的kafka中去
3、(或者)在sink端try catch 同步一直尝试等待

SMT

https://debezium.io/documentation/reference/stable/transformations/index.html

接口:

  • org.apache.kafka.connect.transforms.Transformation

实现:

  • io.debezium.transforms.outbox.EventRouter
  • io.debezium.transforms.SchemaChangeEventFilter
  • io.debezium.transforms.ExtractChangedRecordState
  • io.debezium.transforms.HeaderToValue
  • io.debezium.transforms.TimezoneConverter
  • io.debezium.transforms.ScriptingTransformation
  • io.debezium.transforms.ByLogicalTableRouter
  • io.debezium.transforms.partitions/PartitionRouting
  • io.debezium.transforms.partitions/ComputePartition
  • io.debezium.transforms.tracing/ActivateTracingSpan
  • io.debezium.transforms.AbstractExtractNewRecordState
  • io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb

其中路由topic(ByLogicalTableRouter)比较实用,对于想要把多张表聚合到同一个topic中,比如被分表的表、有外键依赖的表。

io.debezium.engine.spi.OffsetCommitPolicy

  • io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy
  • io.debezium.engine.spi.OffsetCommitPolicy.PeriodicCommitOffsetPolicy

io.debezium.engine.format.SerializationFormat

  • io.debezium.engine.format.Avro
  • io.debezium.engine.format.Json
  • io.debezium.engine.format.JsonByteArray
  • io.debezium.engine.format.Protobuf
  • io.debezium.engine.format.CloudEvents
  • io.debezium.embedded.Connect

  • io.debezium.engine.format.ChangeEventFormat

  • io.debezium.engine.format.KeyValueChangeEventFormat
  • io.debezium.engine.format.KeyValueHeaderChangeEventFormat

org.apache.kafka.connect.storage.Converter

  • io.debezium.converters.CloudEventsConverter
  • io.debezium.converters.ByteArrayConverter
  • io.debezium.converters.BinaryDataConverter

io.debezium.relational.history.SchemaHistory

接口:
io.debezium.relational.history.SchemaHistory
实现:
io.debezium.storage.file.history.FileSchemaHistory
io.debezium.storage.azure.blob.history.AzureBlobSchemaHistory
io.debezium.storage.kafka.history.KafkaSchemaHistory
io.debezium.storage.redis.history.RedisSchemaHistory
io.debezium.storage.rocketmq.history.RocketMqSchemaHistory
io.debezium.storage.jdbc.history.JdbcSchemaHistory
io.debezium.storage.s3.history.S3SchemaHistory

debezium.format.value
The name of the output format for value, one of json/jsonbytearray/avro/protobuf/cloudevents.

op 字段的取值也有四种,分别是c、u、d、r,各自对应create、update、delete、read

ref