golang实现kafka的消息推送
Kafka的安装与启动
kafka中涉及的名词
1. 消息记录:由⼀个key,⼀个value和⼀个时间戳构成,消息最终存储在主题下的分区中,记录在⽣产中称为⽣产者记录,在消费者中
称为消费记录。Kafka集保持了所有发布的消息,直到它们过期,⽆论消息是否被消费了,在⼀个可配置的时间段内,Kafka集保留了所有发布的消息。⽐如消息的保存策略被设置为2天,那么在⼀个消息被发布的两天时间内,它都是可以被消费的。Kafka的性能是和数据量⽆关的常量级的,所以保留太多数据并不是问题
2. ⽣成者:⽣产者⽤于发布消息
3. 消费者:消费者⽤于订阅消息
4. 消费者组:相同的groupID的消费者将视为同⼀个消费者组,每个消费者都需要设置⼀个组id,每条消息只能被consumer group中的⼀
个Consumer消费,但是可以被多个consumer group消费
5. 主题(topic):消息的⼀种逻辑分组,⽤于对消息分门别类,每⼀类消息称之为⼀个主题,相同主题的消息放在⼀个队列中
6. 分区(partition):消息的⼀种物理分组,⼀个主题被拆成多个分区,每⼀个分区就是⼀个顺序的,不可变的消息队列,并且可以持续
添加,分区中的每个消息都被分配了⼀个唯⼀的id,称之为偏移量(offset),在每个分区中偏移量都是唯⼀的。每个分区对应⼀个逻辑log,有多个segment组成
7. 偏移量:分区中每个消息都有⼀个唯⼀的Id,称之为偏移量,代表已经消费的位置
8. 代理(broker):⼀台kafka服务器称之为⼀个broker
9. 副本(replica):副本只是⼀个分区(partition)的备份。副本不读取或写⼊数据。它们⽤于防⽌数据丢失
10. 领导者:leader是负责给定分区的所有读取和写⼊的节点
11. 追随者:跟随领导者指令的节点被称为Follower。
12. zookeeper:Kafka代理是⽆状态的,所以它们使⽤Zookeeper来维护它们的集状态。Zookeeper⽤于管理和协调Kafka代理
kafka功能
发布订阅:⽣产者⽣产消息(数据流),将消息发送给kafka指定的主题队列中,也可以发送到topic中的指定分区中,消费者从kafka 的指定队列中获取消息,然后来处理消息
⼀. Mac版安装
brew install kafka
安装kafka需要依赖zookeeper的,所以安装kafka的时候也会包含zooker
kafka的安装⽬录:/usr/local/Cellar/kafka
go 字符串转数组kafka的配置⽂件⽬录:/usr/local/etc/kafka
kafka服务的配置⽂件:/usr/local/etc/kafka/server.properties
zookeeper配置⽂件:/usr/local/etc/kafka/zookeeper.properties
server.properties中重要配置
1. broker.id=0
2. listeners=PLAINTEXT://:9092
3. advertised.listeners=PLAINTEXT://127.0.0.1:9092
4. log.dirs=/usr/local/var/lib/kafka-logs
zookeeper.properties重要配置
1. dataDir=/usr/local/var/lib/zookeeper
2. clientPort=2181
3. maxClientCnxns=0
⼆. 启动zookeeper
新创建终端启动zookeeper
1. cd /usr/local/Cellar/kafka/
2.1.0
2. ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
3. 打印台显⽰:INFO Reading configuration from: /usr/local/etc/kafka/zookeeper.properties
(keeper.server.quorum.QuorumPeerConfig)
4. ...即是启动成功
三.启动kafka
新创建终端启动kafka(启动kafka之前必须先启动zookeeper)
1. cd /usr/local/Cellar/kafka/
2.1.0
2. ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
3. 打印台显⽰:INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
4. ...即启动成功
5. 启动了kafka之后,zookeeper端会报⼀些Error:KeeperErrorCode = NoNode for /config/topics/test之类的错误,这个是没有
问题的,这是因为kafka向zookeeper发送了关于该路径的⼀些请求信息,但是不存在,所以这是没有问题的
四.创建topic
新创建终端
1. cd /usr/local/Cellar/kafka/
2.1.0
2. 创建⼀个名为“test”的主题:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic
test
3. 查看所有的topic:./bin/kafka-topics --list --zookeeper localhost:2181
4. 查看某个topic的信息,⽐如test:./bin/kafka-topics --describe --zookeeper localhost:2181 --topic test
五.发送消息
新创建⼀个终端,作为⽣产者,⽤于发送消息,每⼀⾏就是⼀条信息,将消息发送到kafka服务器
1. cd /usr/local/Cellar/kafka/
2.1.0
2. ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
3. send one message
4. send two message
六.消费消息(接受消息)
新创建⼀个终端作为消费者,接受消息
1. cd /usr/local/Cellar/kafka/
2.1.0
2. ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
3. send one message
4. send two message(这些便是从⽣产者获得的消息)
注意:发送消息与接受消息必须启动kafka与zookeeper
GoLang实现kafka的信息发布与订阅
⽣产者
import (
"fmt"
"github/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
// 等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
// 随机的分区类型:返回⼀个分区器,该分区器每次选择⼀个随机分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失败后的响应
config.Producer.Return.Successes = true
// 使⽤给定代理地址和配置创建⼀个同步⽣产者
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
//构建发送的消息,
msg := &sarama.ProducerMessage {
//Topic: "test",//包含了消息的主题
Partition: int32(10),//
Key: sarama.StringEncoder("key"),//
}
var value string
var msgType string
for {
_, err := fmt.Scanf("%s", &value)
if err != nil {
break
}
fmt.Scanf("%s",&msgType)
fmt.Println("msgType = ",msgType,",value = ",value)
msg.Topic = msgType
//将字符串转换为字节数组
msg.Value = sarama.ByteEncoder(value)
//fmt.Println(value)
//SendMessage:该⽅法是⽣产者⽣产给定的消息
//⽣产成功的时候返回该消息的分区和所在的偏移量
//⽣产失败的时候返回error
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Send message Fail")
}
fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
}
}
消费者
import (
"fmt"
"github/Shopify/sarama"
"sync"
)
var (
wg sync.WaitGroup
)
func main() {
/
/ 根据给定的代理地址和配置创建⼀个消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
//Partitions(topic):该⽅法返回了该topic的所有分区id
partitionList, err := consumer.Partitions("test")
if err != nil {
panic(err)
}
for partition := range partitionList {
/
/ConsumePartition⽅法根据主题,分区和给定的偏移量创建创建了相应的分区消费者
//如果该分区消费者已经消费了该信息将会返回error
//sarama.OffsetNewest:表明了为最新消息
pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer pc.AsyncClose()
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer wg.Done()
/
/Messages()该⽅法返回⼀个消费消息类型的只读通道,由代理产⽣
for msg := range pc.Messages() {
fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(pc)
}
wg.Wait()
consumer.Close()
}
kafka使⽤场景
kafka的应⽤很⼴泛,在这⾥简单介绍⼏种
服务解耦
⽐如我们发了⼀个帖⼦,除了写⼊数据库之外还有很多联动操作,⽐如给关注这个⽤户的⼈发送通知,推送到⾸页的时间线列表,如果⽤代码实现的话,发帖服务就要调⽤通知服务,时间线服务,这样的耦合很⼤,并且如果增加⼀个功能依赖发帖,除了要增加新功能外还要修改发帖代码。
解决⽅法:引⼊kafka,将发完贴的消息放⼊kafka消息队列中,对这个主题感兴趣的功能就⾃⼰去消费这个消息,那么发帖功能就能够完全独⽴。同时即使发帖进程挂了,其他功能还能够使⽤,这样可以将bug隔离在最⼩范围内
流量削峰
流量削峰在消息队列中也是常⽤场景,⼀般在秒杀或团购活动中使⽤⽐较⼴泛。当流量太⼤的时候达到服务器瓶颈的时候可以将事件放在kafka中,下游服务器当接收到消息的时候⾃⼰去消费,有效防⽌服务器被挤垮
消息通讯
消息队列⼀般都内置了⾼效的通信机制,因此也可以⽤在纯的消息通讯中,⽐如客户端A跟客户端B都使⽤同⼀队列进⾏消息通讯,客户端A,客户端B,客户端N都订阅了同⼀个主题进⾏消息发布和接受不了
实现类似聊天室效果
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论