该死的Kafka,远程连接Kafka超时以及解决办法
关于消息的发布与订阅,之前⼀直使⽤的是activeMQ基于JMS的消息队列进⾏操作的,最近听说有⼀个更⾼效的消息的发布与订阅技术,就是Kafka。
关于kafka的介绍,在这⾥就不做过多讲解了,因为我⾃⼰也不是很了解,⼤概就知道它与activeMQ⼀样,都是具有⽣产者和消费者的发布与订阅消息的机制。
具体请参见百度百科。
今天我想说的就是,初遇kafka所踩的坑,⾮常⼤的坑!!
今天第⼀次学习Kafka,参考的是⽹上的资料。
具体使⽤,我这⾥不过多介绍,具体讲我遇到的问题。因为是⾃学,我采⽤的是在centOS6.5的虚拟机上安装的Kafka,由于新版的Kafka⾃带有zookeeper,所以就直接使⽤了。
当我按照教程启动玩Kafka后,并且在虚拟机服务器⾥⾯是可以正常操作,可是使⽤JavaAPI远程进⾏操作的时候,便⼀直报连接异常!
Java代码:
package site.wangxin520.kafkatest;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.211.129:9092");
//The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
//“所有”设置将导致记录的完整提交阻塞,最慢的,但最持久的设置。
props.put("acks", "all");
//如果请求失败,⽣产者也会⾃动重试,即使设置成0 the producer can automatically retry.
props.put("retries", 0);
//The producer maintains buffers of unsent records for each partition.
props.put("batch.size", 16384);
//默认⽴即发送,这⾥这是延时毫秒数
props.put("linger.ms", 1);
//⽣产者缓冲⼤⼩,当缓冲区耗尽后,额外的发送调⽤将被阻塞。时间超过max.block.ms将抛出TimeoutException
props.put("", 33554432);
//The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
props.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafkamon.serialization.StringSerializer");
//创建kafka的⽣产者类
Producer<String, String> producer = new KafkaProducer<String, String>(props);
activemq默认账号密码//⽣产者的主要⽅法
producer.send(new ProducerRecord<String, String>("show", "测试Kafka"));
producer.close();
}
}
代码没问题,但是每次运⾏就会抛⼀个time out 异常,总是连接失败。
java.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.kafkamonwork.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
at org.apache.kafkamonwork.KafkaChannel.finishConnect(KafkaChannel.java:73)
at org.apache.kafkamonwork.Selector.pollSelectionKeys(Selector.java:323)
at org.apache.kafkamonwork.Selector.poll(Selector.java:291)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:148)
at java.lang.Thread.run(Thread.java:745)
解决办法
这⾥需要注意的是,因为是远程连接服务器,所以要看服务器的防⽕墙是否针对端⼝9092(默认端⼝)打开的,刚开始弄了很长时间,我⼀直没弄好的原因是因为中午我重启了服务器,导致防⽕墙⼜打开了。
如果防⽕墙是正常的,就需要改变Kafka的配置:在/config/service.properties中,添加上⼀句host.name=192.168.211.129
这主要是因为,kafka默认是监听localhost的端⼝,如果不配置新端⼝名的话,就解析监听不到消息。
现在重新启动⼀下,看看是不是已经解决了。
在kafka安装⽬录,启动⾃带的zookeeper服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
在同⼀个地⽅,启动kafka服务
bin/kafka-server-start.sh config/server.properties
使⽤消费者客户端,监听show的topic,验证是否已经启动了Kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic show --from-beginning
没有报错,并且现在服务器端已经在监听状态
启动Java客户端,控制台没有报错
并且在服务器端显⽰了刚刚在Java客户端发送的消息。
解决成功!
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论