kafka命令及启动
默认内⽹访问,要在外⽹访问的话,需要在修改config/server.properties中的配置
将listeners和advertised.listeners的值⽤主机名进⾏替换,在外⽤使⽤java进⾏⽣产者或消费者连接的时候,不填写具体的IP,填写安装kafka的主机名,然后,在hosts⽬录中,配置该主机名对应的真是IP地址即可;
先启动zookeeper,默认⾃带的
bin/zookeeper-server-start.sh config/zookeeper.properties
然后启动kafka服务
bin/kafka-server-start.sh config/server.properties
列举拥有哪些topics
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
在服务器上打开⼀个⽣产者,然后把输⼊的每⾏数据发送到kafka中的命令
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#后⾯光标提⽰数据数据,然后回车就会发送到kafka中了
打开⼀个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
当有数据往kafka的test主题发送消息,这边就会进⾏消费。
java调⽤作为⽣产者和消费者代码:
项⽬需要引⼊的依赖l
<project xmlns="/POM/4.0.0" xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.theorydance</groupId>
<artifactId>kafkademo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafkademo</name>
<url></url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
</project>
⽣产者代码ProducerDemo.java
package com.theorydance.kafkademo;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
properties在哪打开import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerDemo {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "node125:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("", 33554432);
properties.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 100; i++) {
String msg = "This is Message " + i;
producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
消费者代码ConsumerDemo.java
package com.theorydance.kafkademo;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.sumer.ConsumerRecord;
import org.apache.sumer.ConsumerRecords;
import org.apache.sumer.KafkaConsumer;
import org.apache.kafkamon.PartitionInfo;
public class ConsumerDemo {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "node125:9092");
properties.put("group.id", "group-1");
properties.put("enable.automit", "true");
properties.put("automit.interval.ms", "1000");
properties .put("set", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
while(true){
Map<String, List<PartitionInfo>> maps = kafkaConsumer.listTopics();
System.out.println("监听topics="+maps.keySet());
Set<String> sets = new HashSet<>();
for (String topic : maps.keySet()) {
if(topic.startsWith("Hello")){ // 制定规则,监听哪⼀些的topic
sets.add(topic);
}
}
kafkaConsumer.subscribe(sets);
long startTime = System.currentTimeMillis();
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s, topic = %s", record.offset(), record.value(), pic());
System.out.println("=====================>");
}
long endTime = System.currentTimeMillis();
if(endTime - startTime > 30000){
System.out.println("------------------------------------------------------------------");
break;
}
}
}
}
}
说明:在实际需求中,我需要收集在不同服务器上的⽇志(微服务相同模块和不同模块,或其他程序的
⽇志),采⽤的是flume进⾏收集,希望能够对收集的⽇志进⾏分类(区别是哪个程序产⽣的),去⽹上了⼀下,在flume进⾏收集的时候,能不能在⽇志前⾯加上应⽤的标识进⾏区别,我没有到,如果有,看到该博客的同⾏,请不吝赐教。我这边就换了种思路,就像前⾯我写的消费者⽰例⼀样,不同的程序⽇志,我往不同的topic中进⾏发送消息,在消费者监听⼀定规则的topic,然后进⾏消费,这样就可以区分不同的应⽤程序的⽇志了。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论