RocketMQ——⼊门代码案例
该案例基于springboot,和普通的spring项⽬的主要差别在于⼤⼤减少了配置话,不了解的同学可以先参考的⼀下我的,学习之后你会知道什么叫爽歪歪,什么叫美滋滋,再也不⽤话太多时间在项⽬的配置上了。
1.新建⼀个springboot项⽬
2.引⼊对应的rocketmq的jar包,这⾥以⽐较经典的
3.2.6的版本为例
<dependency>
<groupId>ketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
3.新建Producer.Class
package ketmq;
import ption.MQClientException;
import ketmq.client.producer.DefaultMQProducer;
import ketmq.client.producer.SendResult;
import ssage.Message;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//声明并初始化⼀个producer
//需要⼀个producer group名字作为构造⽅法的参数,这⾥为producer1
DefaultMQProducer producer = new DefaultMQProducer("producer1");
//设置NameServer地址,此处应改为实际NameServer地址,多个地址之间⽤;分隔
/
/NameServer的地址必须有,但是也可以通过环境变量的⽅式设置,不⼀定⾮得写死在代码⾥
producer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876");
// producer.setVipChannelEnabled(false);//3.2。6的版本没有该设置,在更新或者最新的版本中务必将其设置为false,否则会有问题
//调⽤start()⽅法启动⼀个producer实例
producer.start();
//发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
java设置环境变量的方法代码("Hello RocketMQ " + i).getBytes("utf-8")// body
);
//调⽤producer的send()⽅法发送消息
//这⾥调⽤的是同步的⽅式,所以会有返回结果
SendResult sendResult = producer.send(msg);
System.out.SendStatus()); //发送结果状态
//打印返回结果,可以看到消息发送的状态以及⼀些相关信息
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
//发送完消息之后,调⽤shutdown()⽅法关闭producer
producer.shutdown();
4.新建Consumer.Class
package ketmq;
import java.util.List;
import sumer.DefaultMQPushConsumer;
import sumer.listener.ConsumeConcurrentlyContext;
import sumer.listener.ConsumeConcurrentlyStatus;
import sumer.listener.MessageListenerConcurrently;
import ption.MQClientException;
import sumer.ConsumeFromWhere;
import ssage.MessageExt;
public class Consumer {
public static void main(String[] args) throws MQClientException {
//声明并初始化⼀个consumer
//需要⼀个consumer group名字作为构造⽅法的参数,这⾥为consumer1
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
//同样也要设置NameServer地址
consumer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876");
//这⾥设置的是⼀个consumer的消费策略
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
/
/CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费⼀遍
//CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使⽤,默认是半个⼩时以前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置consumer所订阅的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
//设置⼀个Listener,主要进⾏消息的逻辑处理
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
/
/ System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
for (MessageExt messageExt : msgs) {
// System.out.println("messageExt: " + messageExt);//输出消息内容
String messageBody = new Body());
// System.out.StoreHost());//获取对应rocketmq服务器ip地址
System.out.println("消费响应:msgId : " + MsgId() + ", msgBody : " + messageBody);//输出消息内容 }
//返回消费状态
//CONSUME_SUCCESS 消费成功
//RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//调⽤start()⽅法启动consumer
consumer.start();
System.out.println("Consumer Started.");
}
5.先运⾏Consumer再运⾏Producer即可
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论