kafkapython使⽤⽅法_如何使⽤Python读写Kafka?
关于Kafka的第三篇⽂章,我们来讲讲如何使⽤Python读写Kafka。这⼀篇⽂章⾥⾯,我们要使⽤的⼀个第三⽅库叫做kafka-python。⼤家可以使⽤pip或者pipenv安装它。下⾯两种安装⽅案,任选其⼀即可。
python3 -m pip install kafka-python
pipenv install kafka-python
复制代码
如下图所⽰:
这篇⽂章,我们将会使⽤最短的代码来实现⼀个读、写Kafka的⽰例。
创建配置⽂件
由于⽣产者和消费者都需要连接Kafka,所以我单独写了⼀个配置⽂件config.py⽤来保存连接Kafka所需要的各个参数,⽽不是直接把这些参数Hard Code写在代码⾥⾯:
# config.py
SERVER = '123.45.32.11:1234'
USERNAME = 'kingname'
PASSWORD = 'kingnameisgod'
TOPIC = 'howtousekafka'
复制代码
本⽂演⽰所⽤的Kafka由我司平台组的同事搭建,需要账号密码才能连接,所以我在配置⽂件中加上了USERNAME和PASSWORD两项。你使⽤的Kafka如果没有账号和密码,那么你只需要SERVER和TO
PIC即可。
创建⽣产者
python怎么读取json文件
代码简单到甚⾄不需要解释。⾸先使⽤KafkaProducer类连接 Kafka,获得⼀个⽣产者对象,然后往⾥⾯写数据。
import json
import time
import datetime
import config
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=config.SERVER,
value_serializer=lambda m: json.dumps(m).encode())
for i in range(100):
data = {'num': i, 'ts': w().strftime('%Y-%m-%d %H:%M:%S')}
producer.send(config.TOPIC, data)
time.sleep(1)
复制代码
参数bootstrap_servers⽤于指定 Kafka 的服务器连接地址。
参数value_serializer⽤来指定序列化的⽅式。这⾥我使⽤ json 来序列化数据,从⽽实现我向 Kafka 传⼊⼀个字典,Kafka ⾃动把它转成JSON 字符串的效果。
如下图所⽰:
注意,上图中,我多写了4个参数:
security_protocol="SASL_PLAINTEXT"
sasl_mechanism="PLAIN"
sasl_plain_username=config.USERNAME
sasl_plain_password=config.PASSWORD
复制代码
这四个参数是因为我这⾥需要通过密码连接 Kafka ⽽加上的,如果你的 Kafka 没有账号密码,就不需要这四个参数。
创建消费者
Kafka 消费者也需要连接 Kafka,⾸先使⽤KafkaConsumer类初始化⼀个消费者对象,然后循环读取数据。代码如下:
import config
from kafka import KafkaConsumer
consumer = KafkaConsumer(config.TOPIC,
bootstrap_servers=config.SERVER,
group_id='test',
auto_offset_reset='earliest')
for msg in consumer:
print(msg.value)
复制代码
KafkaConsumer 的第⼀个参数⽤于指定 Topic。你可以把这个 Topic 理解成 Redis 的 Key。
bootstrap_servers⽤于指定 Kafka 服务器连接地址。
group_id这个参数后⾯的字符串可以任意填写。如果两个程序的Topic与group_id相同,那么他们读取的数据不会重复,两个程序的Topic 相同,但是group_id不同,那么他们各⾃消费全部数据,互不影响。
auto_offset_rest 这个参数有两个值,earliest和latest,如果省略这个参数,那么默认就是latest。这个
参数会单独介绍。这⾥先略过。
连接好 Kafka 以后,直接对消费者对象使⽤ for 循环迭代,就能持续不断获取⾥⾯的数据了。
运⾏演⽰
运⾏两个消费者程序和⼀个⽣产者程序,效果如下图所⽰。
我们可以看到,两个消费者程序读取数据不重复,不遗漏。
当所有数据都消费完成以后,如果你把两个消费者程序关闭,再运⾏其中⼀个,你会发现已经没有数据会被打印出来了。
但如果你修改⼀下 group_id,程序⼜能正常从头开始消费了,如下图所⽰:
很多⼈都会搞混的⼏个地⽅
earliest 与 latest
在我们创建消费者对象的时候,有⼀个参数叫做auto_offset_reset='earliest'。有⼈看到earliest与latest,想当然地认为设置为earliest,就是从 Topic 的头往后读,设置为latest就是忽略之前的数据,从程序运⾏以后,新来的数据开始读。
这种看法是不正确的。
auto_offset_reset这个参数,只有在⼀个group第⼀次运⾏的时候才有作⽤,从第⼆次运⾏开始,这个参数就失效了。
假设现在你的 Topic ⾥⾯有100个数据,你设置了⼀个全新的 group_id 为test2。auto_offset_reset设置为 earliest。那么当你的消费者运⾏的时候,Kafka 会先把你的 offset 设置为0,然后让你从头开始消费的。
假设现在你的 Topic ⾥⾯有100个数据,你设置了⼀个全新的 group_id 为test3。auto_offset_reset设置为 latest。那么当你的消费者运⾏的时候,Kafka 不会给你返回任何数据,消费者看起来就像卡住了⼀样,但是 Kafka 会直接强制把前100条数据的状态设置为已经被你消费的状态。所以当前你的 offset
就直接是99了。直到⽣产者插⼊了⼀条新的数据,此时消费者才能读取到。这条新的数据对应的offset 就变成了100。
假设现在你的 Topic ⾥⾯有100个数据,你设置了⼀个全新的 group_id 为test4。auto_offset_reset设置为 earliest。那么当你的消费者运⾏的时候,Kafka 会先把你的 offset 设置为0,然后让你从头开始消费的。等消费到第50条数据时,你把消费者程序关了,把
auto_offset_reset设置为latest,再重新运⾏。此时消费者依然会接着从第51条数据开始读取。不会跳过剩下的50条数据。
所以,auto_offset_reset的作⽤,是在你的 group 第⼀次运⾏,还没有 offset 的时候,给你设定初始的 offset。⽽⼀旦你这个 group 已经有 offset 了,那么auto_offset_reset这个参数就不会再起作⽤了。
partition 是如何分配的?
对于同⼀个 Topic 的同⼀个 Group:
假设你的 Topic 有10个 Partition,⼀开始你只启动了1个消费者。那么这个消费者会轮换着从这10个Partition 中读取数据。
当你启动第⼆个消费者时,Kafka 会从第⼀个消费者⼿上抢⾛5个Partition,分给第⼆个消费者。于是两个消费者各⾃读5个 Partition。互不影响。
当第三个消费者⼜出现时,Kafka 从第⼀个消费者⼿上再抢⾛1个 Partition,从第⼆个消费者⼿上抢⾛2个 Partition 给第三个消费者。于是,消费者1有4个 Partition,消费者2有3个 Partition,消费者3有3个 Partiton,互不影响。
当你有10个消费者⼀起消费时,每个消费者读取⼀个 Partition,互不影响。
当第11个消费者出现时,它由于分配不到 Partition,所以它什么都读不到。
所以在上⼀篇⽂章中,我说,在同⼀个 Topic,同⼀个 Group 中,你有多少个 Partiton,就能起多少个进程同时消费。
Kafka 是不是完全不重复不遗漏?
在极端情况下,Kafka 会重复,也会遗漏,但是这种极端情况并不常见。如果你的 Kafka 频繁漏数据,或者总是出现重复数据,那么肯定是你环境没有搭建正确,或者代码有问题。
忠告
再次提醒:专业的⼈做专业的事情,不要轻易⾃建Kafka 集。让专门的同事复制搭建和维护,你只管使⽤。这才是最⾼效省事的做法。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。