NodeJS操作Kafka
本⽂将会介绍在windows环境下启动Kafka,并通过nodejs作为客户端,⽣产并消费消息。
第⼀步,Kafka需要java运⾏时,先安装配置java环境。通过在命令⾏中输⼊java -version确认java是否成功安装(可能需要查看windows的环境变量中是否有java)。
第⼆步,下载最新版本的压缩包(.tgz格式),并解压。
分别在两个命令⾏⾥⾯启动zookeeper、kafka(解压缩路径下)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
说明⼀下zookeeper和kafka的关系:zookeeper是集的调度者,kafka才是消息队列。 zookeeper的默认端⼝:2181,kafka的默认端⼝:9092
相关配置可以在config⽂件下的server.properties和zookeeper.properties中到
消费者客户端需要的group.id可以在config->consumer.properties中到。
第三步,使⽤命令⾏测试⽣产、消费
//创建⼀个topic:test
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
//查看topic
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
//创建⽣产者
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
//创建消费者
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
第四步,发送消息
kafka命令在⽣产者窗⼝,随意输⼊⼀条消息,可以在消费者窗⼝看到该消息。
最后,使⽤nodejs访问kafka
npm init -y
npm install kafkajs
新建demo.js,输⼊以下代码
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const consumer = sumer({ groupId: 'test-consumer-group' })
const run = async () => {
// Producing
t()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
// Consuming
t()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: String(),
})
},
})
}
run().)
最后执⾏命令node demo.js
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论