Kafka 入门与实践

前言 Kafka是一个高吞吐量,分布式,轻量级,可分区和具有赋值备份,开源的的发布-订阅消息系统。

基本概念

主题 一组消息抽象归纳为一个主题,一个主题就是对消息的一个分类。生产者将消息发送到特定主题,消费者订阅或主题的某些分区进行消费

消息 Kafka通信的基本单位,由固定长度的消息头和可变长度的消息体构成

分区和副本 每个主题被分为一个或多个分区。每个分区由消息组成,是一个有序队列。物理层面,每个分区对应一个文件夹,每个分区又有多个副本分布在集群不同代理上,副本抽象为一个日志对象,分区是Kafka保证消息被顺序消费以及对消息进行负载均衡的基础 不同分区的消息有序性不能得到保证。kafka对于已被消费的消息处理策略有两种:基于消息已存储的时间长度;基于分区大小

Leader副本和Follower副本 为了保证一个分区的多个副本之间数据的一致性,kafka会选择该分区一个副本作为leader副本,其他为follower副本。只有leader副本才负责处理客户端读/写请求。在没有leader副本的情况下所有副本都会继续读写请求,但是这样会很难保证副本之间数据的一致性。

偏移量 发布到分区的消息会被直接追加到日志文件尾部,每条消息在日志文件中位置都会对应一个按序递增的偏移量。

日志段 日志被划分为多个日志段,一个日志段对应磁盘上一个具体日志文件和两个索引文件(消息偏移量索引文件和消时间戳索引文件)

代理 kafka集群由一个或多个kafka实例构成,每一个实例称为代理,一台服务器配置一个或多个代理。每个代理在集群中使用一个非负整数作为唯一标识id,

生产者 向kafka代理发送消息的客户端

消费者和消费组 kafka消费者以拉的方式处理消息,每一个消费者属于一个特定消费组,每个消费者也有一个全局唯一的id。同一个主题的消息只能被同一个消费组下某一个消费者消费,不同消费组的消费者可同时消费该消息。 实现消息广播只需指定各消费者均属于不同消费组,消息单播则只需让各消费者属于同一个消息组

ISR kafka在zk中动态维护一个ISR用来保存同步的副本列表,该列表中保存与Leader副本保持消息同步的所有副本对应代理节点id。在fllower失效的情况下,该副本节点会从ISR列表中移除

特性

持久化 kafka依赖于文件系统存储和缓存消息,因为kafka的写入/读取时线性执行的(600mb/s,而随机写只有100kb/s),所以在设计上kafka采用的是时间复杂度为O(1)的磁盘结构,而不是基于JVM的Java对象持久化,避免了频繁的垃圾回收。 避免了使用B树等结构在大数据量情况下性能的现象下降。与其在内存中维护尽可能多的内存,并在空间用完时慌乱地将其全部刷新到文件系统,不如将其颠倒过来。 所有数据立即写入文件系统上的持久日志

高吞吐量 除了写入顺序写入磁盘,kafka在数据写入和数据同步采用零拷贝技术,采用sendFile()函数调用(直接在两个文件描述符之间传递数据,不通过用户内存空间,避免了内核缓冲区和用户缓冲区之间数据拷贝)。kafka还支持数据压缩和批量发送。 提高影响系统效率低下的两个原因,太多小I/O操作和过多的字节复制。集中组织消息在一起,允许网络组合消息一起分摊网络往返开销 代理维护的消息对于生产者和消费者来说都是使用相同格式写入磁盘(以此优化操作:持久性日志块的网络传输)

文件传输到套接字通用数据路径

而sendfile直接将pagecache数据发送到网络,也就是说只需拷贝最终副本到NIC缓冲区

以及客户端内存积累数据,更大批量的异步发送数据(配置累计固定数量消息,以及等待延迟绑定)

扩展性 生产者将数据发送到作为分区的leader代理,不需要任何中间路由层。利用zk对集群进行管理,以达到负载均衡和数据复制的目的。

多客户端支持 支持java,scale,c,pythong,go,node等语言客户端的接入支持

安全机制 SSL,通信数据加密,认证服务

数据备份 主题指定副本数,数据持久化备份

轻量级 代理不记录消息是否被消费,消费偏移量管理交由消费或组协调器维护

数据压缩 大部分冗余是由于同一类型的消息之间的重复(例如 JSON 中的字段名或 web 日志中的用户代理或公共字符串值)。 Kafka 用一种有效的批处理格式,把多条消息放到一起组成MessageSet,然后把MessageSet放到一条消息里面

从代理leader通过拉的方式获取数据 消费者向作为leader的代理,”获取”分区数据请求。 消费者每个请求在日志中指定其偏移量,并从该位置开始接收回一个日志块。 如果需要,可以将其回退以重新使用数据

在以“推”为方式想消息系统中,意味着当消费速度低于生产速度时,消费者会发生分布式拒绝服务;而且推的方式会考虑立即发送还是积累更多数据再发送, 而后者消息还是会被缓冲和持久化,而“拉”的消息处理方式则没有上述问题

部署

安装jdk环境 安装 部署 : zookeper 配置zoo.cfg下 {dataDir}{dataLogDir}{clientPort}等配置 启动: sh zkServer.sh start 安装 部署 : kafka:配置config/server.properties 下 {broker.id}{log.dirs}{zookeeper.connect}等配置 启动:sh kafka-server-start.sh -daemon ../config/server.properties 安装 部署 kafka manager 1 github下载源码
2 到源码目录进行编译 ./sbt clean dist 3 编译成功后会给一个zip压缩包的目录,到目录下获取生成的压缩文件kafka-manager-*.zip文件 4 将压缩文件解压到指定位置 usr/local/software/kafka-manager目录下 5 修改安装目录下conf目录,打开application.conf文件 修改kafka-manger.zkhosts=”kafka-manager-zookeeper:2181”配置项为实际zookeper的地址 6 启动 nohup ./kafka-manager -Dconfig.file=../conf/application.conf & 7 访问 serverIp:9000

8 关闭kafka-manager:通过杀死进程的方式,同时删除RUNNING_PID文件

核心组件

延迟组件

delayedOperation delayedOperation是一个基于事件启动有失效时间的timerTadk。实现了runnable接口,每个task维护一个timetaskentry对象,timetaskentry对象添加到TimerTaskList(一个双向环形链表)

kafka流程

kafka依赖zookeper来管理节点,所以需提前启动zookeper

查看kafka节点 get /brokers/ids/0

kafkaserver.startup()启动并初始化组件,kafkascheduler,logManager,sockeserver,replicamanger,kafkacontroller,groupcoordinator,dynamicconfigmanager,kafkahealthchecker kafkaserver实例化时会在log.dir指定目录下创建一个meta.properties文件,该文件记录当前kafka版本对应的一个版本version字段,一个代理broker.id字段。也就是说我们在不改变代理对应log.dir配置而修改改立brokerId时,不仅server.property需要修改,该处也需要修改

单server下启动单节点 sh kafka-server-start.sh -daemon ../config/server.properties

创建主题 sh kafka-topics.sh –create –bootstrap-server localhost:9092 –replacation-factor 1 –partitions 1 –topic test

列出server下所有主题 sh kafka-topics.sh –list –bootstrap-server localhost:9092

查看某个topic的分区,副本情况 sh kafka-topics.sh –describe –bootstrap-server localhost:9092 –topic test

向某主题发送消息 sh kafka-console-producer.sh –broker-list localhost:9092 –topic test

从某主题获取消息 sh kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

使用kafka连接器从topic导入导出数据 通过配置 source和sink配置文件来启动多个输入,输出连接器 sh connect-standalone.sh - daemon ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties

kafka配置详解

server.properties

zookeeper.connect 配置分布式管理协调器zookeper的调用地址,多个zookeper集群使用,隔开(hostname1:port1,hostname2:port2,hostname3:port3),也可以添加/chroot/path路径作为配置

advertised.listeners 如果我们在外网访问的模式下,kafka客户端连接访问其他broker时,需要获取外网id,该监听器配置将其broker绑定的外网id注册到zookeper。 如果没有配置则使用listener的配置

auto.create.topics.enable 在topic不存在的情况下,是否自动创建topic

auto.leader.rebalance.enable 是否开启kafka的leader自动均衡机制

background.threads 该broker可使用线程处理数量

broker.id 该服务器下唯一节点标志id

compression.type 主题压缩类型(’gzip’, ‘snappy’, ‘lz4’, ‘zstd’)

delete.topic.enable 通过管理员工具删除主题 是否有效

leader.imbalance.check.interval.seconds 分区平衡检查频率设置

leader.imbalance.per.broker.percentage 每个broker所允许的leader最大不平衡比率

listeners 每个broker内部绑定ip和端口地址

log.dirs 该broker日志数据保存地址

log.flush.interval.messages 该broker分区消息刷盘触发值

log.flush.interval.ms 该broker分区消息刷盘最大停留时间,如果未配置,则指定已经消费过消息刷盘

log.flush.offset.checkpoint.interval.ms 距上次刷盘的时间控制,以便于数据恢复

log.flush.scheduler.interval.ms 检查是否需要刷盘的时间间隔

log.flush.start.offset.checkpoint.interval.ms 更新日志起始偏移量持久记录的频率

log.retention.bytes 日志保留最大数据量

log.retention.hours\log.retention.minutes\log.retention.ms 日志保留最长时间,后者依次覆盖前者


data


data


data

kafka源码理解

producer

生产者初始化配置,默认ConfigDef,并且new一个新线程,创建一个实现了runnable接口send对象,待发送