java使⽤RabbitMQ⽰例
RabbitMQ简介
RabbitMQ是⼀个受欢迎的消息代理,通常⽤于应⽤程序之间或者程序的不同组件之间通过消息来进⾏集成。具有⾼可⽤⾼并发的优点,适合集服务器。采⽤ Erlang实现, 对主要的编程语⾔都有客户端⽀持。
RabbitMQ环境配置
linux下环境配置
我⽤的是centos 6.5版本。
先从这个地址下载安装包
$ tar -zxvf otp_src_18.
$ cd otp_src_18.3
$ ./configure --prefix=/usr/local/erlang --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-javac
$ make
$ sudo make install
配置环境变量
vim /etc/profile
在⽂件末尾添加下⾯⽂字:
ERLANG_HOME=/usr/local/erlang
PATH=$ERLANG_HOME/bin:$PATH
export ERLANG_HOME
export PATH
使环境变量⽣效
source /etc/profile
然后下载RabbitMQ,
先安装依赖
$ sudo yum install xmlto
$ xz -d rabbitmq-server-generic-unix-3.6.
$ tar -xvf rabbitmq-server-generic-unix-3.6.1.tar
# 移动⽬录
$ sudo cp -rf ./rabbitmq_server-3.6.1/ /usr/local/
$ cd /usr/local
java环境变量自动配置
#改名字
$  mv rabbitmq_server-3.6.1 rabbitmq-3.6.1
# 安装web管理插件
$ cd sbin/
$ ./rabbitmq-plugins enable rabbitmq_management
启动和关闭服务
$ ./rabbitmq-server
# 后台启动
$ ./rabbitmq-server -detached
# 关闭
$ ./rabbitmqctl stop
登录web管理端
启动rabbitmq,然后浏览器输⼊
10.0.0.221:15672/
这个ip地址是你实际安装的主机的ip,请根据实际情况修改。
下⾯配置登录账号
$ ./rabbitmqctl add_user pony 123456
Creating user "pony"...
$ ./rabbitmqctl set_user_tags pony administrator
Setting tags for user "pony" to [administrator]...
刷新下,⽤刚才创建的账号登录,如下:
有⼀点要注意,如果主机配有防⽕墙,要把⽤到的端⼝都打开
-A INPUT -p tcp -m state --state NEW -m tcp --dport 15672 -j ACCEPT -A INPUT -p tcp -m state --state NEW -m tcp --dport 25672 -j ACCEPT -A INPUT -p tcp -m state --state NEW -m tcp --dport 5672 -j ACCEPT -A INPUT -p tcp -m state --state NEW -m tcp --dport 4369 -j ACCEPT -A INPUT -p tcp -m state --state NEW -m tcp --dport 5671 -j ACCEPT
windows下环境配置
可以根据需要下载⾃⼰所需要的版本,我这⾥下载的分别是: otp_win64_和rabbitmq-server-3.
默认⽅式安装otp_win64_,完成后,配置环境变量
ERLANG_HOME C:\Program Files\erl7.3
添加到PATH
%ERLANG_HOME%\bin;
默认⽅式安装rabbitmq-server-3.,完成后,配置环境变量
RABBITMQ_SERVER C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.9
添加到PATH
%RABBITMQ_SERVER%\sbin;
进⼊sbin⽬录打开⼀个控制台,安装web管理插件
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.9\sbin>rabbitmq-plugins.bat enable rabbitmq_management
然后重启下服务使上⾯的配置⽣效(注意这⾥要使⽤管理员打开cmd命令⾏,路径⽆影响),
C:\WINDOWS\system32>net stop RabbitMQ && net start RabbitMQ
RabbitMQ 服务正在停⽌.
RabbitMQ 服务已成功停⽌。
RabbitMQ 服务正在启动 .
RabbitMQ 服务已经启动成功。
C:\WINDOWS\system32>
打开浏览器,输⼊localhost:15672/,可以正常访问。由于我是在本机安装,所以这⾥是localhost。
接下来配置⼀个账号并赋予管理员权限(当然要确保是启动状态),
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.9\sbin>rabbitmqctl.bat add_user pony 123456
Creating user "pony" ...
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.9\sbin>rabbitmqctl.bat set_user_tags pony administrator
Setting tags for user "pony" to [administrator] ...
在web端⽤这个账号登录,可以成功!
java调⽤⽰例
我的开发环境是myeclipse+win10,windows环境下已经按照前⾯章节配置好了rabbitmq服务。**然后保持启动状态。**java要使⽤rabbitmq需要依赖⼀个客户端。
如果⽤maven的话就不⽤下载,配置⽂件加上依赖描述即可。
我这⾥新建两个⼯程,分别为RabbitMQDemo-P(表⽰⽣产者)和RabbitMQDemo-C(表⽰消费者),两个⼯程运⾏起来后就代表两个独⽴的进程通过消息队列通信。⽣产者不断的往消息队列发送消息,⽽消费者不断的从队列取消息。⼯程中导⼊前⾯下载的客户端依赖包。
源码来⾃:
先⽣产者的代码:
private final static String QUEUE_NAME ="hello";
public static void main(String[] args)throws Exception{
// TODO Auto-generated method stub
ConnectionFactory factory =new ConnectionFactory();
factory.setHost("localhost");//因为两个进程在同⼀个机器上
Connection connection = null;
Channel channel = null;
connection = wConnection();
channel = ateChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
String message ="Hello World!";
channel.basicPublish("", QUEUE_NAME, null, Bytes("UTF-8"));
System.out.println(" [Producer] Sent '"+ message +"'");
channel.close();
connection.close();
}
⽐较简单,涉及到的API具体说明可以⾃⾏搜索学习。运⾏⼀次就往名为hello的队列上发布⼀条消息:“Hello World”,然后进程结束。消费者稍微复杂⼀点,
private final static String QUEUE_NAME ="hello";
public static void main(String[] args)throws Exception{
ConnectionFactory factory =new ConnectionFactory();
factory.setHost("localhost");
Connection connection = wConnection();
Channel channel = ateChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer =new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body)
throws IOException {
String message =new String(body,"UTF-8");
System.out.println(" [Consumer] Received '"+ message +"'");
}
};
channel.basicConsume(QUEUE_NAME,true, consumer);
}
Channel.basicConsume⽅法将Consumer与消息队列绑定,它的定义如下:
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException
第⼀个参数是Consumer绑定的队列名,第⼆个参数是⾃动确认标志,如果为true,表⽰Consumer接受到消息后,会⾃动发确认消息(Ack消息)给消息队列,消息队列会将这条消息从消息队列⾥删除,第三个参数就是Consumer对象,⽤于处理接收到的消息。
第三个参数其实是个回调,当消费者收到消息时,会调⽤Consumer对象的handleDelivery⽅法。我们这⾥重写了这个⽅法,收到消息之后打印出来。
先运⾏消费者,如下:
[*] Waiting for messages. To exit press CTRL+C
开始等待消息。
再运⾏⽣产者,如下:
[Producer] Sent 'Hello World!'
再次看下消费者的控制台信息,接收到消息:
[*] Waiting for messages. To exit press CTRL+C
[Consumer] Received 'Hello World!'
欢迎⼤家关注我的

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。