KAFKA配置⽂件详解
kafka配置⽂件详解
kafka的配置分为 broker、consumer、producter三个不同的配置conf⽬录下
broker 对应的⽂件是server.properties
1 ------------------------------------------- 系统相关 -------------------------------------------
2##每⼀个broker在集中的唯⼀标⽰,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
3 broker.id =1
4
5##kafka数据的存放地址,多个地址的话⽤逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
6 log.dirs = /tmp/kafka-logs
7
8##提供给客户端响应的端⼝
9 port =6667
10
11##消息体的最⼤⼤⼩,单位是字节
12 message.max.bytes =1000000
13
14## broker 处理消息的最⼤线程数,⼀般情况下不需要去修改
15 numwork.threads =3
16
17## broker处理磁盘IO 的线程数,数值应该⼤于你的硬盘数
18 num.io.threads =8
19
20## ⼀些后台任务处理的线程数,例如过期消息⽂件的删除等,⼀般情况下不需要去做修改
21 background.threads =4
22
23## 等待IO线程处理的请求队列最⼤数,若是等待IO的请求超过这个数值,那么会停⽌接受外部消息,算是⼀种⾃我保护机制
24 quests =500
25
26##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接⼝上,并将其中之⼀发送到ZK,⼀般不设置 27 host.name
28
29## 打⼴告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使⽤还未深究
30 advertised.host.name
31
32## ⼴告地址端⼝,必须不同于port中的设置
33 advertised.port
34
35## socket的发送缓冲区,socket的调优参数SO_SNDBUFF
36 socket.send.buffer.bytes =100*1024
37
38## socket的接受缓冲区,socket的调优参数SO_RCVBUFF
ive.buffer.bytes =100*1024
40
41## socket请求的最⼤数值,防⽌serverOOM,message.max.bytes必然要⼩于quest.max.bytes,会被topic创建时的指定参数覆盖 quest.max.bytes =100*1024*1024
43
44 ------------------------------------------- LOG 相关 -------------------------------------------
45## topic的分区是以⼀堆segment⽂件存储的,这个控制每个segment的⼤⼩,会被topic创建时的指定参数覆盖
46 log.segment.bytes =1024*1024*1024
47
48## 这个参数会在⽇志segment没有达到log.segment.bytes设置的⼤⼩,也会强制新建⼀个segment 会被 topic创建时的指定参数覆盖
ll.hours =24*7
50
51## ⽇志清理策略选择有:delete和compact 主要针对过期数据的处理,或是⽇志⽂件达到限制的额度,会被 topic创建时的指定参数覆盖 52 log.cleanup.policy = delete
53
54## 数据存储的最⼤时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
55## ion.bytes和ion.minutes任意⼀个达到要求,都会执⾏删除,会被topic创建时的指定参数覆盖
ion.minutes=7days
57
58指定⽇志每隔多久检查看是否可以被删除,默认1分钟
59 log.cleanup.interval.mins=1
60
61
62## topic每个分区的最⼤⽂件⼤⼩,⼀个topic的⼤⼩限制 = 分区数*ion.bytes 。-1没有⼤⼩限制
63## ion.bytes和ion.minutes任意⼀个达到要求,都会执⾏删除,会被topic创建时的指定参数覆盖
ion.bytes=-1
65
66## ⽂件⼤⼩检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
ion.check.interval.ms=5minutes
68
69## 是否开启⽇志压缩
70 able=false
71
72## ⽇志压缩运⾏的线程数
73 log.cleaner.threads =1
74
75## ⽇志压缩时候处理的最⼤⼤⼩
76 log.cleaner.io.max.bytes.per.second=None
77
78## ⽇志压缩去重时候的缓存空间,在空间允许的情况下,越⼤越好
79 log.cleaner.dedupe.buffer.size=500*1024*1024
80
81## ⽇志清理时候⽤到的IO块⼤⼩⼀般不需要修改
82 log.cleaner.io.buffer.size=512*1024
83
84## ⽇志清理中hash表的扩⼤因⼦⼀般不需要修改
85 log.cleaner.io.buffer.load.factor =0.9
86
87## 检查是否处罚⽇志清理的间隔
88 log.cleaner.backoff.ms =15000
89
90## ⽇志清理的频率控制,越⼤意味着更⾼效的清理,同时会存在⼀些空间上的浪费,会被topic创建时的指定参数覆盖
91 log.cleaner.min.cleanable.ratio=0.5
92
93## 对于压缩的⽇志保留的最长时间,也是客户端消费消息的最长时间,同ion.minutes的区别在于⼀个控制未压缩数据,⼀个控制压缩后的数据。会被topic创建时的指定参数覆盖 94 log.ion.ms =1day
95
96## 对于segment⽇志的索引⽂件⼤⼩限制,会被topic创建时的指定参数覆盖
97 log.index.size.max.bytes =10*1024*1024
98
99## 当执⾏⼀个fetch操作后,需要⼀定的空间来扫描最近的offset⼤⼩,设置越⼤,代表扫描速度越快,但是也更好内存,⼀般情况下不需要搭理这个参数
100 log.index.interval.bytes =4096
101
102## log⽂件"sync"到磁盘之前累积的消息条数
103## 因为磁盘IO操作是⼀个慢操作,但⼜是⼀个"数据可靠性"的必要⼿段
104## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.
105## 如果此值过⼤,将会导致每次"fsync"的时间较长(IO阻塞)
106## 如果此值过⼩,将会导致"fsync"的次数较多,这也意味着整体的client请求有⼀定的延迟.
107## 物理server故障,将会导致没有fsync的消息丢失.
108 log.ssages=None
109
110## 检查是否需要固化到硬盘的时间间隔
111 log.flush.scheduler.interval.ms =3000
112
113## 仅仅通过interval来控制消息的磁盘写⼊时机,是不⾜的.
114## 此参数⽤于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上⼀次磁盘同步的时间间隔
115## 达到阀值,也将触发.
116 log.flush.interval.ms = None
117
118## ⽂件在索引中清除后保留的时间⼀般不需要去修改
119 log.delete.delay.ms =60000
120
121## 控制上次固化硬盘的时间点,以便于数据恢复⼀般不需要去修改
122 log.flush.offset.checkpoint.interval.ms =60000
123
124 ------------------------------------------- TOPIC 相关 -------------------------------------------
125## 是否允许⾃动创建topic ,若是false,就需要通过命令创建topic
able =true
127
128## ⼀个topic ,默认分区的replication个数,不得⼤于集中broker的个数
plication.factor =1
130
131## 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
132 num.partitions =1
133
134实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有⼀个分区,分区被复制到三个broker上。
135
136 ----------------------------------复制(Leader、replicas) 相关 ----------------------------------
137## partition leader与replicas之间通讯时,socket的超时时间
138 controller.socket.timeout.ms =30000
139
140## partition leader与replicas数据同步时,消息的队列尺⼨
ssage.queue.size=10
142
143## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列⼊ISR(in-sync replicas),并认为它是死的,不会再加⼊管理中
144 replica.lag.time.max.ms =10000
145
146## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
147## 通常,在follower与leader通讯时,因为⽹络延迟或者链接断开,总会导致replicas中消息同步滞后
148## 如果消息之后太多,leader将认为此follower⽹络延迟较⼤或者消息吞吐能⼒有限,将会把此replicas迁移
149## 到其他follower中.
150## 在broker数量较少,或者⽹络不⾜的环境中,建议提⾼此值.
151 replica.ssages =4000
152
153##follower与leader之间的socket超时时间
154 replica.socket.timeout.ms=30*1000
155
156## leader复制时候的socket缓存⼤⼩
157 ive.buffer.bytes=64*1024
158
159## replicas每次获取数据的最⼤⼤⼩
160 replica.fetch.max.bytes =1024*1024
161
162## replicas同leader之间通信的最⼤等待时间,失败了会重试
163 replica.fetch.wait.max.ms =500
164
165## fetch的最⼩数据尺⼨,如果leader中尚未同步的数据不⾜此值,将会阻塞,直到满⾜条件
166 replica.fetch.min.bytes =1
167
168## leader 进⾏复制的线程数,增⼤这个数值会增加follower的IO
plica.fetchers=1
170
171## 每个replica检查是否将最⾼⽔位进⾏固化的频率
172 replica.high.watermark.checkpoint.interval.ms =5000
173
174## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
kafka命令
175 able =false
176
177## 控制器关闭的尝试次数
178 controlled.ies =3
179
180## 每次关闭尝试的时间间隔
181 backoff.ms =5000
182
183## 是否⾃动平衡broker之间的分配策略
184 able =false
185
186## leader的不平衡⽐例,若是超过这个数值,会对分区进⾏重新的平衡
187 leader.imbalance.per.broker.percentage =10
188
189## 检查leader是否不平衡的时间间隔
190 leader.imbalance.check.interval.seconds =300
191
192## 客户端保留offset信息的最⼤空间⼤⼩
adata.max.bytes
194
195 ----------------------------------ZooKeeper 相关----------------------------------
196##zookeeper集的地址,可以是多个,多个之间⽤逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
t = localhost:2181
198
199## ZooKeeper的最⼤超时时间,就是⼼跳的间隔,若是没有反映,那么认为已经死了,不易过⼤
200 zookeeper.session.timeout.ms=6000
201
202## ZooKeeper的连接超时时间
tion.timeout.ms =6000
204
205## ZooKeeper集中leader和follower之间的同步实际那
206 zookeeper.sync.time.ms =2000
207配置的修改
208其中⼀部分配置是可以被每个topic⾃⾝的配置所代替,例如
209新增配置
210 bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --ssage.bytes=ssages=1 211
212修改配置
213 bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --ssage.bytes=128000
214
215删除配置:
216 bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --ssage.bytes
CONSUMER 对应的⽂件是consumer.properties
最为核⼼的配置是group.id、t
## Consumer归属的组ID,broker是根据group.id来判断是队列模式还是发布订阅模式,⾮常重要
group.id
## 消费者的ID,若是没有设置的话,会⾃增
consumer.id
## ⼀个⽤于跟踪调查的ID ,最好同group.id相同
client.id = group id value
## 对于zookeeper集的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使⽤同样的zk配置
## zookeeper的⼼跳超时时间,查过这个时间就认为是dead消费者
zookeeper.session.timeout.ms =6000
## zookeeper的等待连接时间
## zookeeper的follower同leader的同步时间
zookeeper.sync.time.ms =2000
## 当zookeeper中没有初始的offset时候的处理⽅式。smallest :重置为最⼩值 largest:重置为最⼤值 anythingelse:抛出异常
set = largest
## socket的超时时间,实际的超时时间是:max.fetch.wait + socket.timeout.ms.
socket.timeout.ms=30*1000
## socket的接受缓存空间⼤⼩
##从每个分区获取的消息⼤⼩限制
## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset
## ⾃动提交的时间间隔
automit.interval.ms =60*1000
## ⽤来处理消费消息的块,每个块可以等同于ssage.max.bytes中数值
ssage.chunks =10
## 当有新的consumer加⼊到group时,将会reblance,此后将会有partitions的消费端迁移到新
## 的consumer上,如果⼀个consumer获得了某个partition的消费权限,那么它将会向zk注册
## Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
## 此值⽤于控制,注册节点的重试次数.
ies =4
## 每次再平衡的时间间隔
rebalance.backoff.ms =2000
## 每次重新选举leader的时间
refresh.leader.backoff.ms
## server发送到消费端的最⼩数据,若是不满⾜这个数值则会等待,知道满⾜数值要求
fetch.min.bytes =1
## 若是不满⾜最⼩⼤⼩(fetch.min.bytes)的话,等待消费端请求的最长等待时间
fetch.wait.max.ms =100
## 指定时间内没有消息到达就抛出异常,⼀般不需要改
consumer.timeout.ms = -1
PRODUCER 对应的⽂件是producter.properties
⽐较核⼼的配置:metadata.broker.list、quired.acks、pe、serializer.class ## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外⾯设置⼀个vip  metadata.broker.list
##消息的确认模式
##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
##1:发送消息,并会等待leader 收到确认后,⼀定的可靠性
## -1:发送消息,等待leader收到确认,并进⾏复制操作后,才返回,最⾼的可靠性
## 消息发送的最长等待时间
request.timeout.ms =10000
## socket的缓存⼤⼩
send.buffer.bytes=100*1024
## key的序列化⽅式,若是没有设置,同serializer.class
key.serializer.class
## 分区的策略,默认是取模
partitioner.class=kafka.producer.DefaultPartitioner
## 消息的压缩模式,默认是none,可以有gzip和snappy
## 可以针对默写特定的topic进⾏压缩
## 消息发送失败后的重试次数
message.ies =3
## 每次失败后的间隔时间
retry.backoff.ms =100
## ⽣产者定时更新topic元信息的时间间隔,若是设置为0,那么会在每个消息发送后都去更新数据
## ⽤户随意指定,但是不能重复,主要⽤于跟踪记录消息
client.id=""
------------------------------------------- 消息模式相关 -------------------------------------------
## ⽣产者的类型 async:异步执⾏消息的发送 sync:同步执⾏消息的发送
## 异步模式下,那么就会在设置的时间缓存消息,并⼀次性发送
queue.buffering.max.ms =5000
## 异步的模式下最长等待的消息数
queue.ssages =10000
## 异步模式下,进⼊队列的等待时间若是设置为0,那么要么进⼊队列,要么直接抛弃
## 异步模式下,每次发送的最⼤消息数,前提是触发了queue.ssages或是queue.buffering.max.ms的限制
ssages=200
## 消息体的系列化处理类,转化为字节流进⾏传输
serializer.class= kafka.serializer.DefaultEncoder

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。