实战笔记:来⼀起探究下Kafka是如何实现万亿级海量数据的⾼
并发写⼊的?
前两天为⼤家分享了⼀篇关于kafka和RocketMQ选型的内容,那么今天就为⼤家分享,kafkaKafka海量数据解决⽅案之测试⽅案和监控集应⽤详解,今天的内容和前两天的内容是关联的,推荐⼀下,可以关注我的账号看前⾯的内容哦,同时还有视频教程,废话不多说,开始为⼤家分享实战笔记⼲货!
测试⽅案
1、添加model
public class UserDataSource {
public static void main(String args[]) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.90.131:9092");
props.put("acks", "all");
props.put("delivery.timeout.ms", 30000);
props.put("request.timeout.ms", 20000);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("", 33554432);
props.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafkamon.serialization.ByteArraySerializer");
Producer<String, byte[]> producer = new KafkaProducer<>(props);
while (true){
User car = next();
byte[] carBinary = Binary(car);
ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(
"user",
carBinary);
producer.send(record);
Thread.sleep(200);
System.out.println("");
}
//producer.close();
}
private static User next (){
Random random = new Random();
User u = new User(
true,
"",
1);
return u;
}
}
2、⽣成数据
Properties props = new Properties(); props.put("bootstrap.servers", "192.168.90.131:9092"); props.put("acks", "all");
props.put("delivery.timeout.ms", 30000); props.put("request.timeout.ms", 20000); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("", 33554432); props.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafkamon.serialization.ByteArraySerializer"); Producer<String, byte[]> producer = new KafkaProducer<>(props); while (true){ User car = next(); byte[] carBinary = Binary(car); ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>( "user", Id(), carBinary); producer.send(record); Thread.sleep(200);
System.out.println(""); } //producer.close(); } private static User next (){ Random random = new Random(); User u = new User( Int(10) + "", true, "", 1); return u; }
3、创建topic
bin/kafka-topics.sh --create \
--bootstrap-server 192.168.90.131:9092 \
--replication-factor 1 \
--partitions 3 \
--topic user
4、添加CarConsume
public static void main(String args[]){
//要消费的topic名称
String topic = "user";
List<TopicPartition> partitions = new ArrayList<>();
for (int i=0; i<3; i++){
//构建partition 对象
TopicPartition p = new TopicPartition(topic, i);
partitions.add(p);
}
//⽬标表
String targetTable = "user";
//实例化exact once consumer
ExactOnceConsumer<Electrocar> exactConsumer =
new ExactOnceConsumer(topic, partitions, targetTable);
//从指定offset开始消费
exactConsumer.seek();
//开始消费
exactConsumer.subscribe();
}
5、添加 kafka.user 表
drop table user;
CREATE TABLE `user` (
`topic` varchar(20) DEFAULT NULL,
`pid` int(11) DEFAULT NULL,
`offset` mediumtext,
`id` int(11) DEFAULT NULL,
`gender` tinyint(1) DEFAULT NULL,
`name` varchar(20) DEFAULT NULL,
`age` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
6、添加 UserConsume
#直接拷贝CarConsume
public class UserConsume {
public static void main(String args[]){
//要消费的topic名称
String topic = "user";
List<TopicPartition> partitions = new ArrayList<>();
for (int i=0; i<3; i++){
//构建partition 对象
TopicPartition p = new TopicPartition(topic, i);
partitions.add(p);
}
//⽬标表
String targetTable = "user";
//实例化exact once consumer
ExactOnceConsumer<Electrocar> exactConsumer = new ExactOnceConsumer(topic, partitions, targetTable); //从指定offset开始消费
exactConsumer.seek();
//开始消费
exactConsumer.subscribe();
}
}
7、完善seek
seek 中offset还是写死的,应该从MySQL获取最新的offset
SQL: select max(offset+0) from kafka.electrocar where pid=1;
public long offsetByPartition(TopicPartition p){
String sql = String.format("select max(offset+0) from %s where pid=%d", this.targetTable, p.partition());
Statement stat = null;
try {
stat = ateStatement();
ResultSet rs = uteQuery(sql);
if (rs.next()){
Int(1);
}
} catch (SQLException e) {
if (stat !=null){
try {
stat.close();
} catch (SQLException e1) {
e1.printStackTrace();
}
}
}
return 0;
}
8、测试offset边界
#清理数据
delete from kafka.electrocar;
执⾏carConsume
停⽌carConsume
#查看是否有重复数据
select pid,offset,count(*) ct
from kafka.electrocar
group by pid,offset
having ct>1;
监控集/应⽤
1 、安装 KafkaOffsetMonitor
特点:权限⼩、侵⼊性⼩,快速实现必要的功能
在GitHub中搜KafkaOffsetMonitor
注意:KafkaOffsetMonitor中引⼊了⼀些外⽹的js\css ⽂件,导致你的web异常
java -Xms512M -Xmx512M -Xss1024K -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --port 8088 \
--zk 192.168.90.131:2181 \
--refresh 5.minutes \
--retain 1.da
KafkaOffsetMonitor 不仅可以监控集状态,还可以帮我们监控消费进度
只要把进度写到 ZK 的
/consumers/${group_id}/offsets/${Topic}/${partition_id}
2 、获取最新消费进度
哪⾥可以获取消费进度呢?MySQL中不太好使⽤
Demo03:
consumermitAsync();
要提交监督,说明consumer⼀定是有这个进度在内存
这段代码获取offset
this.subscriptions.allConsumed()
private subscriptions ⽆法使⽤,⽤反射获取
Field f = DeclaredField("subscriptions");
f.setAccessible(true);
SubscriptionState subState = (SubscriptionState) f.get(consumer);
#执⾏allConsumed();
遍历
for (TopicPartition p : latestOffsets.keySet()){
if (ainsKey(p)){
long offset = (p).offset();
System.out.println(String.format("pid:%d,offset:%d",
p.partition(),
offset));
}
}
封装
/
添加字段
private SubscriptionState subState;
private void setSubState(){
try {
Field f = DeclaredField("subscriptions");
f.setAccessible(true);
this.subState = (SubscriptionState) f.get(this.kafkaConsumer);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
resultset 遍历e.printStackTrace();
}
}
//在init 调⽤
setSubState();
System.out.println("Sub ");
3、减⼩ZK的压⼒
(1)、实时更新ZK好吗?不好,ZK的读、写都是事务
要添加⼀个线程每3min更新⼀次,添加
public class ZkUptThread extends Thread{
}
//内存中试试更新的Offset
public Map<TopicPartition, Long> imOffsets = new ConcurrentHashMap<>();
//记录ZooKeeper中的Offset
public Map<TopicPartition, Long> zkOffsets = new HashMap<>();
4、更新 InMemoryOffset
// 在 ZkUptThread 中
public void uptIMOffset(SubscriptionState subs){
//执⾏allConsumed
Map<TopicPartition, OffsetAndMetadata> latestOffsets = subs.allConsumed(); for (TopicPartition p : latestOffsets.keySet()){
if (ainsKey(p)){
long offset = (p).offset();
this.imOffsets.put(p, offset);
}
}
}
// exactOnceConsumer.subscribe 中调⽤uptIMOffset
5 run⽅法逻辑
offset未更新时,就不需要更新ZK
@Override
public void run() {
// 写成⼀个循环
while (true){
try {
for (Map.Entry<TopicPartition, Long> entry : Set()) {
long imOffset = Value(); //内存中offset
//若zkOffset 和 imOffset 相等,不作操作
if (Key())&&
<(Key()) == imOffset){
continue;
}else{
//否则,更新 zk 中的offset
Key(), imOffset);
zkOffsets.Key(), imOffset);
}
}
Thread.sleep(1000*10);
System.out.println("ZkUpdThread loop once ...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
6、更新ZooKeeper
依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>keeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>
</dependencies>
</dependencyManagement>
依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>keeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>
</dependencies>
</dependencyManagement>
//添加字段 zkClient
private CuratorFramework zkClient;
//在狗仔函数中实例化 curator
public ZkUptThread(){
//retry10次,每次等5s
RetryPolicy retry = new RetryNTimes(10,5000);
//创建curator 实例
zkClient = wClient("192.168.90.131:2181",retry);
}
private void uptZk(TopicPartition partition, long offset){
//拼接要更新的路径
String path = String.format("/consumers/%s/offsets/%s/%d",groupid, topic, partition.partition());
try {
byte[] offsetBytes = String.format("%d",offset).getBytes();
if (zkClient.checkExists().forPath(path) != null){
//upd
zkClient.setData().forPath(path,offsetBytes);
System.out.println("update ");
}else{
//insert
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path,offsetBytes);
System.out.println("add ");
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论