debezium有几种方式
部署debezium有几种方式
选择一:依托于kafka connect
选择二:Debezium Server
选择三:Embed Debezium
选择四:Debezium on Kubernetes
侧重点
debezium侧重于 CDC,就是souce端的收集,比如把mysql放到kafka中,那么后续的怎么再从kafka sink 到其他地方,比如 OLAP数据库,比如Elasticsearch,再比如还sink回相同的DB
大概有三种
A:source DB —> kafka —> sink(schemaless DB,比如Elasticsearch)
B:source DB —> kafka —> sink(schema DB)
C:source DB —> kafka —> sink(stream computing)
这里B又分为两种,
B1、sink和source是相同的DB,比如mysql–>mysql
B2、sink和source是不同的DB,比如mysql–>postgresql、mysql–> Apache Doris
数据库的CDC是有顺序的消息,所以 connect-mysql-source.properties 中的配置 tasks.max=1 只能是1,即使是分布式集群部署,这里也是一个。
因为CDC这一端是有顺序的消息。
Debezium连接器默认行为是将每个更改事件记录发送到一个用数据库的名称+表名构成的kafka主题。这是个最小的粒度。
那么后续进入kafka,怎么顺序消费要看souce数据有没有强制的表依赖(比如显性的外键依赖),如果有外键依赖的表先被消费了那么就会报错,此时就不能并行的去消费多张表了
目前想到的办法
1、(或者)一个是规避这种强外键依赖的表
2、(或者)将有外键依赖的这种表路由到相同的kafka中去
3、(或者)在sink端try catch 同步一直尝试等待
SMT
https://debezium.io/documentation/reference/stable/transformations/index.html
接口:
- org.apache.kafka.connect.transforms.Transformation
实现:
- io.debezium.transforms.outbox.EventRouter
- io.debezium.transforms.SchemaChangeEventFilter
- io.debezium.transforms.ExtractChangedRecordState
- io.debezium.transforms.HeaderToValue
- io.debezium.transforms.TimezoneConverter
- io.debezium.transforms.ScriptingTransformation
- io.debezium.transforms.ByLogicalTableRouter
- io.debezium.transforms.partitions/PartitionRouting
- io.debezium.transforms.partitions/ComputePartition
- io.debezium.transforms.tracing/ActivateTracingSpan
- io.debezium.transforms.AbstractExtractNewRecordState
- io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb
其中路由topic(ByLogicalTableRouter)比较实用,对于想要把多张表聚合到同一个topic中,比如被分表的表、有外键依赖的表。
io.debezium.engine.spi.OffsetCommitPolicy
- io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy
- io.debezium.engine.spi.OffsetCommitPolicy.PeriodicCommitOffsetPolicy
io.debezium.engine.format.SerializationFormat
- io.debezium.engine.format.Avro
- io.debezium.engine.format.Json
- io.debezium.engine.format.JsonByteArray
- io.debezium.engine.format.Protobuf
- io.debezium.engine.format.CloudEvents
- io.debezium.embedded.Connect
-
io.debezium.engine.format.ChangeEventFormat
- io.debezium.engine.format.KeyValueChangeEventFormat
- io.debezium.engine.format.KeyValueHeaderChangeEventFormat
org.apache.kafka.connect.storage.Converter
- io.debezium.converters.CloudEventsConverter
- io.debezium.converters.ByteArrayConverter
- io.debezium.converters.BinaryDataConverter
io.debezium.relational.history.SchemaHistory
接口:
io.debezium.relational.history.SchemaHistory
实现:
io.debezium.storage.file.history.FileSchemaHistory
io.debezium.storage.azure.blob.history.AzureBlobSchemaHistory
io.debezium.storage.kafka.history.KafkaSchemaHistory
io.debezium.storage.redis.history.RedisSchemaHistory
io.debezium.storage.rocketmq.history.RocketMqSchemaHistory
io.debezium.storage.jdbc.history.JdbcSchemaHistory
io.debezium.storage.s3.history.S3SchemaHistory
debezium.format.value
The name of the output format for value, one of json/jsonbytearray/avro/protobuf/cloudevents.
op 字段的取值也有四种,分别是c、u、d、r,各自对应create、update、delete、read