分类
devops

kafka总结

Serializer

接口:
org.apache.kafka.common.serialization.Serializer
实现:
org.apache.kafka.common.serialization.ByteArraySerializer
org.apache.kafka.common.serialization.ByteBufferSerializer
org.apache.kafka.common.serialization.BytesSerializer
org.apache.kafka.common.serialization.DoubleSerializer
org.apache.kafka.common.serialization.FloatSerializer
org.apache.kafka.common.serialization.IntegerSerializer
org.apache.kafka.common.serialization.ListSerializer
org.apache.kafka.common.serialization.LongSerializer
org.apache.kafka.common.serialization.ShortSerializer
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.UUIDSerializer
org.apache.kafka.common.serialization.VoidSerializer
org.springframework.kafka.support.serializer.JsonSerializer

io.confluent.kafka.serializers.KafkaAvroSerializer

Deserializer

接口:
org.apache.kafka.common.serialization.Deserializer
实现:
org.apache.kafka.common.serialization.ByteArrayDeserializer
org.apache.kafka.common.serialization.ByteBufferDeserializer
org.apache.kafka.common.serialization.BytesDeserializer
org.apache.kafka.common.serialization.DoubleDeserializer
org.apache.kafka.common.serialization.FloatDeserializer
org.apache.kafka.common.serialization.IntegerDeserializer
org.apache.kafka.common.serialization.ListDeserializer
org.apache.kafka.common.serialization.LongDeserializer
org.apache.kafka.common.serialization.ShortDeserializer
org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.serialization.UUIDDeserializer
org.apache.kafka.common.serialization.VoidDeserializer
org.springframework.kafka.support.serializer.JsonDeserializer

io.confluent.kafka.serializers.KafkaAvroDeserializer

connect converter

接口:
org.apache.kafka.connect.storage.Converter
实现:
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
org.apache.kafka.connect.converters.ByteArrayConverter //不做任何转换
com.blueapron.connect.protobuf.ProtobufConverter
io.confluent.connect.avro.AvroConverter
io.confluent.connect.json.JsonSchemaConverter

ConsumerPartitionAssignor

接口:
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
实现:
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
org.apache.kafka.clients.consumer.RangeAssignor
org.apache.kafka.clients.consumer.RoundRobinAssignor
org.apache.kafka.clients.consumer.StickyAssignor

对应参数:partition.assignment.strategy
默认值:class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Partitioner

接口:
org.apache.kafka.clients.producer.Partitioner
实现:
org.apache.kafka.clients.producer.internals.DefaultPartitioner(@Deprecated)
org.apache.kafka.clients.producer.RoundRobinPartitioner
org.apache.kafka.clients.producer.UniformStickyPartitioner

kafka-connect

SourceConnect

接口:
org.apache.kafka.connect.source.SourceConnector
实现:
org.apache.kafka.connect.file.FileStreamSourceConnector

io.debezium.connector.mysql.MySqlConnector
io.debezium.connector.postgresql.PostgresConnector
io.debezium.connector.sqlserver.SqlServerConnector
io.debezium.connector.oracle.OracleConnector
io.debezium.connector.cassandra.Cassandra3Connector
io.debezium.connector.cassandra.Cassandra4Connector

SinkConnect

接口:
org.apache.kafka.connect.sink.SinkConnector
实现:
org.apache.kafka.connect.file.FileStreamSinkConnector
io.debezium.connector.jdbc.JdbcSinkConnector
io.confluent.connect.jdbc.JdbcSinkConnector https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

com.datastax.oss.kafka.sink.CassandraSinkConnector

config

org.apache.kafka.common.config.AbstractConfig
实现:
org.apache.kafka.clients.admin.AdminClientConfig
org.apache.kafka.clients.consumer.ConsumerConfig
org.apache.kafka.connect.storage.ConverterConfig
org.apache.kafka.clients.producer.ProducerConfig
org.apache.kafka.streams.StreamsConfig
org.apache.kafka.connect.mirror.MirrorClientConfig
org.apache.kafka.connect.runtime.WorkerConfig
– https://kafka.apache.org/documentation/#configuration

ConnectorConfig

OffsetBackingStore

接口:
org.apache.kafka.connect.storage.OffsetBackingStore
实现:
org.apache.kafka.connect.storage.KafkaOffsetBackingStore
org.apache.kafka.connect.storage.MemoryOffsetBackingStore
org.apache.kafka.connect.storage.FileOffsetBackingStore
io.debezium.storage.redis.offset.RedisOffsetBackingStore
io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore

Glossary

LSO (Last Stable Offset)
ACKS(Acknowledgments)确认、回执
LW(Low watermark)低水位
HW(High watermark)高水位
LEO(Log end offset)日志末尾偏移量
AR(Assigned replica)分配的副本
ISR(In sync replica)正常同步的副本
OSR(Out sync replica)非正常同步的副本

Failed to create topic, kafka responded with the following error: INVALID_REPLICATION_FACTOR: Replication factor is below 1 or larger than the number of available brokers.


./kafka-topics.sh --bootstrap-server 192.168.6.211:9092 --topic test --partitions 3 --replication-factor 2 --alter
./kafka-topics.sh --bootstrap-server 192.168.6.211:9092 --describe --topic test

config bytes

max.partition.fetch.bytes
max.request.size
max.message.bytes
fetch.max.bytes
replica.fetch.max.bytes
replica.fetch.response.max.bytes
message.max.bytes
offset.metadata.max.bytes

kafka保证消息不丢

producer端
    1、同步发送消息
    2、使用callback捕获异常,重新发送
    3、kafka内置的retries参数(确保最大值)

borker端

    1、broker高可用 + 消息副本机制(多副本)+ 消息确认机制(acks=-1) + 最小同步消息副本数(增大min.insync.replicas=2)

kafka 精准一次性交付

# Exactly-Once Semantics

enable.idempotence=true
retries=2147483647
max.in.flight.requests.per.connection=1

kafka保证消息顺序

# TODO
# 在kafka中, topic的分区是并行计算的单元

kafka producer独有的参数

===========================producer only===========================
acks=all
batch.size=16384
buffer.memory=33554432
compression.type=none
delivery.timeout.ms=120000
enable.idempotence=true
linger.ms=0
max.block.ms=60000
max.in.flight.requests.per.connection=5
max.request.size=1048576
metadata.max.idle.ms=300000
partitioner.adaptive.partitioning.enable=true
partitioner.availability.timeout.ms=0
partitioner.class=null
partitioner.ignore.keys=false
retries=2147483647
transaction.timeout.ms=60000
transactional.id=null

kafka consumer独有的参数

===========================consumer only===========================
allow.auto.create.topics=true
auto.commit.interval.ms=5000
auto.offset.reset=latest
check.crcs=true
client.rack=
default.api.timeout.ms=60000
enable.auto.commit=true
exclude.internal.topics=true
fetch.max.bytes=52428800
fetch.max.wait.ms=500
fetch.min.bytes=1
group.id=null
group.instance.id=null
heartbeat.interval.ms=3000
internal.leave.group.on.close=true
internal.throw.on.fetch.stable.offset.unsupported=false
isolation.level=read_uncommitted
max.partition.fetch.bytes=1048576
max.poll.interval.ms=300000
max.poll.records=500
partition.assignment.strategy=[class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
session.timeout.ms=45000

ref