Kafka提交offset机制
在kafka的消费者中,有⼀个⾮常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分配Partation,当下次重新恢复消费时仍然可以知道从哪⾥开始消费。它好⽐看⼀本书中的书签标记,每次通过书签标记(offset)就能快速到该从哪⾥开始看(消费)。
Kafka对于offset的处理有两种提交⽅式:(1) ⾃动提交(默认的提交⽅式) (2) ⼿动提交(可以灵活地控制offset)
(1) ⾃动提交偏移量:
Kafka中偏移量的⾃动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,当enable_auto_commit=True
时,Kafka在消费的过程中会以频率为auto_commit_interval_ms向Kafka⾃带的topic(__consumer_offsets)进⾏偏移量提交,具体提交到哪个Partation是以算法:partation=hash(group_id)%50来计算的。
如:group_id=test_group_1,则partation=hash(“test_group_1”)%50=28
⾃动提交偏移量⽰例:
import pickle
import uuid
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['192.168.33.11:9092'],
group_id="test_group_1",
client_id="{}".format(str(uuid.uuid4())),
max_poll_records=500,
enable_auto_commit=True,  # 默认为True 表⽰⾃动提交偏移量
auto_commit_interval_ms=100,  # 控制⾃动提交偏移量的频率单位ms 默认是5000ms
key_deserializer=lambda k: pickle.loads(k),
value_deserializer=lambda v: pickle.loads(v)
)
# 订阅消费round_topic这个主题
consumer.subscribe(topics=('round_topic',))
try:
while True:
consumer_records_dict = consumer.poll(timeout_ms=1000)
# consumer.assignment()可以获取每个分区的offset
for partition in consumer.assignment():
print('主题:{} 分区:{},需要从下⾯的offset开始消费:{}'.format(
pic),
str(partition.partition),
consumer.position(partition)
))
# 处理逻辑.
for k, record_list in consumer_records_dict.items():
print(k)
for record in record_list:
print("topic = {},partition = {},offset = {},key = {},value = {}".format(
)
finally:
# 调⽤close⽅法的时候会触发偏移量的⾃动提交 close默认autocommit=True
consumer.close()
返回结果:
在上述代码中,最后调⽤consumer.close()时候也会触发⾃动提交,因为它默认autocommit=True,源码如下:
def close(self, autocommit=True):
"""Close the consumer, waiting indefinitely for any needed cleanup.
Keyword Arguments:
autocommit (bool): If auto-commit is configured for this consumer,
this optional flag causes the consumer to attempt to commit any
pending consumed offsets prior to close. Default: True
"""
if self._closed:
return
log.debug("Closing the KafkaConsumer.")
self._closed = True
self._coordinator.close(autocommit=autocommit)
self._metrics.close()
self._client.close()
try:
except AttributeError:
pass
try:
except AttributeError:
pass
log.debug("The KafkaConsumer has closed.")
对于⾃动提交偏移量,如果auto_commit_interval_ms的值设置的过⼤,当消费者在⾃动提交偏移量之前异常退出,将导致kafka未提交偏移量,进⽽出现重复消费的问题,所以建议auto_commit_interval_ms的值越⼩越好。
(2) ⼿动提交偏移量:
鉴于Kafka⾃动提交offset的不灵活性和不精确性(只能是按指定频率的提交),Kafka提供了⼿动提交off
set策略。⼿动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。
对于⼿动提交offset主要有3种⽅式:1.同步提交 2.异步提交 3.异步+同步 组合的⽅式提交
1.同步⼿动提交偏移量
同步模式下提交失败的时候⼀直尝试提交,直到遇到⽆法重试的情况下才会结束,同时同步⽅式下消费者线程在拉取消息会被阻塞,在broker对提交的请求做出响应之前,会⼀直阻塞直到偏移量提交操作成功或者在提交过程中发⽣异常,限制了消息的吞吐量。
"""
同步的⽅式10W条消息  4.58s
"""
import pickle
import uuid
import time
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['192.168.33.11:9092'],
group_id="test_group_1",
client_id="{}".format(str(uuid.uuid4())),
enable_auto_commit=False,  # 设置为⼿动提交偏移量.
key_deserializer=lambda k: pickle.loads(k),
value_deserializer=lambda v: pickle.loads(v)
)
# 订阅消费round_topic这个主题
consumer.subscribe(topics=('round_topic',))
try:
start_time = time.time()
while True:
consumer_records_dict = consumer.poll(timeout_ms=100)  # 在轮询中等待的毫秒数
print("获取下⼀轮")
record_num = 0
for key, record_list in consumer_records_dict.items():
for record in record_list:
record_num += 1
print("---->当前批次获取到的消息个数是:{}<----".format(record_num))
record_num = 0
for k, record_list in consumer_records_dict.items():
for record in record_list:
print("topic = {},partition = {},offset = {},key = {},value = {}".format(
)
try:
# 轮询⼀个batch ⼿动提交⼀次
consumermit()  # 提交当前批次最新的偏移量. 会阻塞执⾏完后才会下⼀轮poll
end_time = time.time()
time_counts = end_time - start_time
print(time_counts)
except Exception as e:
print('commit failed', str(e))
finally:
consumer.close()  # ⼿动提交中close对偏移量提交没有影响
从上述可以看出,每轮循⼀个批次,⼿动提交⼀次,只有当前批次的消息提交完成时才会触发poll来获取下⼀轮的消息,经测试10W条消息耗时4.58s
2.异步⼿动提交偏移量+回调函数
异步⼿动提交offset时,消费者线程不会阻塞,提交失败的时候也不会进⾏重试,并且可以配合回调函数在broker做出响应的时候记录错误信息。
"""
异步的⽅式⼿动提交偏移量(异步+回调函数的模式) 10W条消息 3.09s
"""
import pickle
import uuid
import time
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=['192.168.33.11:9092'],
group_id="test_group_1",
client_id="{}".format(str(uuid.uuid4())),
enable_auto_commit=False,  # 设置为⼿动提交偏移量.
key_deserializer=lambda k: pickle.loads(k),
value_deserializer=lambda v: pickle.loads(v)
)
# 订阅消费round_topic这个主题
consumer.subscribe(topics=('round_topic',))
def _on_send_response(*args, **kwargs):
"""
提交偏移量涉及回调函数import pickle
:param args: args[0] --> {TopicPartition:OffsetAndMetadata}  args[1] --> Exception    :param kwargs:
:return:
"""
if isinstance(args[1], Exception):
print('偏移量提交异常. {}'.format(args[1]))
else:
print('偏移量提交成功')
try:
start_time = time.time()
while True:
consumer_records_dict = consumer.poll(timeout_ms=10)
record_num = 0
for key, record_list in consumer_records_dict.items():
for record in record_list:
record_num += 1
print("当前批次获取到的消息个数是:{}".format(record_num))
for record_list in consumer_records_dict.values():
for record in record_list:
print("topic = {},partition = {},offset = {},key = {},value = {}".format(
# 避免频繁提交
if record_num != 0:
try:
consumermit_async(callback=_on_send_response)
except Exception as e:
print('commit failed', str(e))
record_num = 0
finally:
consumer.close()

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。