bin/kafka-topics.sh --describe --zookeeper hadoop1:2181 --topic test
--topic test
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 110 Replicas: 110,111,112 Isr: 110,111,112
Topic: test Partition: 1 Leader: 111 Replicas: 111,112,110 Isr: 111,112,110
Topic: test Partition: 2 Leader: 112 Replicas: 112,110,111 Isr: 112,110,111
说明:
{
partiton:分区:ID
leader:当前负责读写的lead broker idk
relicas:当前partition的所有replication broker list
isr:relicas的子集,只包含出于活动状态的broker
}
去zk上看kafka集
[zk: localhost:2181(CONNECTED) 5] ls /
[admin, zookeeper, consumers, config, controller, zk-fifo, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 6] ls /brokers ----> 查看注册在zk内的kafka
[topics, ids]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/ids
[112, 110, 111]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/ids/112
[]
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics
[test]
[zk: localhost:2181(CONNECTED) 10] ls /brokers/topics/test
[partitions]
[zk: localhost:2181(CONNECTED) 11] ls /brokers/topics/test/partitions
[2, 1, 0]
[zk: localhost:2181(CONNECTED) 12]
8、关闭kafka
pkill -9 -f server.properti
二、kafka java调用:
1、 java端生产数据, kafka集消费数据:
Java代码
1. 1创建maven工程,l中增加如下:
2. <dependency>
3. <groupId>org.apache.kafka</groupId>
4. <artifactId>kafka_2.10</artifactId>
5. <version>0.8.2.0</version>
6. </dependency>
7.
8.
9. 2 java代码:向主题test内写入数据
10.
11. import java.util.Properties;
12. import urrent.TimeUnit;
13.
14. import kafka.javaapi.producer.Producer;
15. import kafka.producer.KeyedMessage;
16. import kafka.producer.ProducerConfig;
17. import kafka.serializer.StringEncoder;
18.
19.
20.
21.
22. public class kafkaProducer extends Thread{
23.
24. private String topic;
25.
26. public kafkaProducer(String topic){
27. super();
28. pic = topic;
29. }
30.
31.
32. @Override
33. public void run() {
34. Producer producer = createProducer();
35. int i=0;
36. while(true){
37. producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));
38. try {
39. TimeUnit.SECONDS.sleep(1);
40. } catch (InterruptedException e) {
41. e.printStackTrace();
42. }
43. }
44. }
45.
46. private Producer createProducer() {
47. Properties properties = new Properties();
kafka命令48. properties.put("t", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk
49. properties.put("serializer.class", "kafka.serializer.StringEncoder");
50. properties.put("metadata.broker.list", "192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");// 声明
kafka broker
51. return new Producer<Integer, String>(new ProducerConfig(properties));
52. }
53.
54.
55. public static void main(String[] args) {
56. new kafkaProducer("test").start();// 使用kafka集中创建好的主题 test
57.
58. }
59.
60. }
61.
62.
63.
64.
65. 3 kafka集中消费主题test的数据:
66. [root@h2master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginnin
67.
68. 4启动java代码,然后在看集消费的数据如下:
69.
70. message: 0
71. message: 1
72. message: 2
73. message: 3
74. message: 4
75. message: 5
76. message: 6
77. message: 7
78. message: 8
79. message: 9
80. message: 10
81. message: 11
82. message: 12
83. message: 13
84. message: 14
85. message: 15
86. message: 16
87. message: 17
88. message: 18
89. message: 19
90. message: 20
91. message: 21
2、kafka 使用Java写消费者,这样先运行kafkaProducer ,在运行kafkaConsumer,即可得到生产者的数据:
Java代码
1. import java.util.HashMap;
2. import java.util.List;
3. import java.util.Map;
4. import java.util.Properties;
5.
6. sumer.Consumer;
7. sumer.ConsumerConfig;
8. sumer.ConsumerIterator;
9. sumer.KafkaStream;
10. import sumer.ConsumerConnector;
11.
12.
13.
14.
15. /**
16. * 接收数据
17. * 接收到: message: 10
18. 接收到: message: 11
19. 接收到: message: 12
20. 接收到: message: 13
21. 接收到: message: 14
22. * @author zm
23. *
24. */
25. public class kafkaConsumer extends Thread{
26.
27. private String topic;
28.
29. public kafkaConsumer(String topic){
30. super();
31. pic = topic;
32. }
33.
34.
35. @Override
36. public void run() {
37. ConsumerConnector consumer = createConsumer();
38. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
39. topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
40. Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams =
41. KafkaStream<byte[], byte[]> stream = (topic).get(0);// 获取每次接收到的这个数据
42. ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
43. while(iterator.hasNext()){
44. String message = new ().message());
45. System.out.println("接收到: " + message);
46. }
47. }
48.
49. private ConsumerConnector createConsumer() {
50. Properties properties = new Properties();
51. properties.put("t", "192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk
52. properties.put("group.id", "group1");// 必须要使用别的组名称,如果生产者和消费者都在同一组,则不能访问同一组内的
topic数据
53. ateJavaConsumerConnector(new ConsumerConfig(properties));
54. }
55.
56.
57. public static void main(String[] args) {
58. new kafkaConsumer("test").start();// 使用kafka集中创建好的主题 test
59.
60. }
61.
62. }
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论