如何⽤idea开发⼯具搭建运⾏kafka源码
⽬录
介绍:
阅读优秀开源项⽬源码,学习源码中的⼀些设计和编程技巧,思考他们的设计⽬的,有助于我们提升⾃⼰技术、代码能⼒,在⼯作开发中,完全可以借鉴这些优秀的设计思想去应⽤到我们⾃⼰的业务中:
kafka是scala语⾔开发的项⽬,和java语⾔很类似,不需要学习sacla语⾔基本上都可以读懂,我们可以把kafka当成⾃⼰⼯作中的⼀个web 项⽬。kafka中⼤量的⽤到了jdk并发类,⽐如集合,并发包下的类,⽹络nio,⽂件存储,kafka也对调度线程池进⾏了优化,提升了任务的添加删除性能,我们可以阅读kafka源码去了解和学习这些知识。第⼀件事就是搭建源码,因为使⽤IDEA搭建kafka源码⽐较⽅便快捷,下⾯主要介绍如何使⽤IDEA 开发⼯具搭建。
环境搭建:
⼀下载源码。
1.源码下载:
下载完之后在IDEA setting--Plugins 处安装Scala 插件,再按照提⽰下载scala jar包。这⾥有⼀点需要注意,kafka使⽤的是gradle构建⼯具。
2.配置⼯程:
⼆运⾏源码
1.启动kafka
这⾥我们选择的是kafka 0.11版本(0.11这个版本功能⽐较全了,包含producer的事务和幂等api)。Kafka.scala是kafka的启动类,因此运⾏个类就可以把kafka运⾏起来,直接点击run运⾏之后会报错,按照如下配置,指定配置⽂件路径,就可以启动起来了。
Program arguments 指定配置⽂件路径:config/server.properties
vm o'ptions 指定log4j配置路径:-figuration=file:D:/myProject/kafka/config/log4j.properties
2.搭建zookeeper
3.运⾏kafka 和zookeeper
再次启动zookeeper,和kafka,可以看到如下⽇志就算启动成功了.
Zookeeper⽇志
2020-04-05 16:25:41,659 [myid:] - INFO [main:QuorumPeerConfig@133] - Reading configuration from: D:\myProject\zookeeper\conf\zoo.cfg
2020-04-05 16:25:41,663 [myid:] - INFO [main:QuorumPeerConfig@385] - clientPortAddress is 0.0.0.0/0.0.0.0:2181
2020-04-05 16:25:41,663 [myid:] - INFO [main:QuorumPeerConfig@389] - secureClientPort is not set
2020-04-05 16:25:41,667 [myid:1] - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2020-04-05 16:25:41,669 [myid:1] - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 0
2020-04-05 16:25:41,669 [myid:1] - INFO [main:DatadirCleanupManager@101] - Purge task is not scheduled.
2020-04-05 16:25:41,669 [myid:1] - WARN [main:QuorumPeerMain@124] - Either no config or no quorum defined in config, running in standalone mode
2020-04-05 16:25:41,670 [myid:1] - INFO [main:ManagedUtil@46] - Log4j found with jmx enabled.
2020-04-05 16:25:41,718 [myid:1] - INFO [main:QuorumPeerConfig@133] - Reading configuration from: D:\myProject\zookeeper\conf\zoo.cfg
2020-04-05 16:25:41,718 [myid:1] - INFO [main:QuorumPeerConfig@385] - clientPortAddress is 0.0.0.0/0.0.0.0:2181
2020-04-05 16:25:41,718 [myid:1] - INFO [main:QuorumPeerConfig@389] - secureClientPort is not set
2020-04-05 16:25:41,719 [myid:1] - INFO [main:ZooKeeperServerMain@116] - Starting server
2020-04-05 16:25:41,738 [myid:1] - INFO [main:Environment@109] - Server environment:zookeeper.version=3.5.6-SNAPSHOT-3882a0171f91280bf1adbbd4ffa 2020-04-05 16:25:41,738 [myid:1] - INFO [main:Environment@109] - Server environment:host.name=windows10.microdone
Kafka⽇志
[2020-04-05 16:50:49,065] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
authorizer.class.name =
able = true
background.threads = 10
broker.id = 0
able = true
broker.rack = null
4.创建producer发送⼀条消息
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties=new Properties();
properties.put("bootstrap.servers","localhost:9092");
properties.put("key.serializer","org.apache.kafkamon.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafkamon.serialization.StringSerializer");
properties.put("acks","-1");
// properties.put("enable.idempotence",true);
properties.put("batch.size","1");
KafkaProducer<String,String> producer=new KafkaProducer(properties);
ProducerRecord record = new ProducerRecord("topic-test-1", "test-2", "value");
Future<RecordMetadata> value = producer.send(record);
RecordMetadata recordMetadata = ();
producer.flush();
System.out.println("ProducerDemo send result , topic="+pic()+" ,partition="
+recordMetadata.partition()+", offset="+recordMetadata.offset());
可以看到kafka已经成功了接受到了消息:
17:06:57.151 [kafka-producer-network-thread | producer-1] DEBUG org.ics.Metrics - Added sensor with pic-test-
17:06:57.151 [kafka-producer-network-thread | producer-1] DEBUG org.ics.Metrics - Added sensor with ProducerDemo send result , topic=topic-test-1 ,partition=0, offset=2
idea debug这⾥只是简单介绍了⼀下如何搭建并运⾏,并没有讲的这么细。下⾯会着重介绍kafka的整个架构体系,源码。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论