C#使⽤RabbitMQ
1. RabbitMQ
MQ全称为Message Queue, 消息队列(MQ)是⼀种应⽤程序对应⽤程序的通信⽅法。应⽤程序通过写和检索出⼊列队的针对应⽤程序的数据(消息)来通信,⽽⽆需专⽤连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进⾏通信,⽽不是通过直接调⽤彼此来通信,直接调⽤通常是⽤于诸如远程过程调⽤的技术。排队指的是应⽤程序通过队列来通信。队列的使⽤除去了接收和发送应⽤程序同时执⾏的要求。其中较为成熟的MQ产品有MSMQ,ActiveMQ,RabbitMQ,IBM WEBSPHERE MQ 等。
RabbitMQ是使⽤Erlang开发的,开源的,⼀个在⾼级消息队列协议(AMQP)基础上完整的,可复⽤的企业消息系统,遵循Mozilla Public License开源协议。
RabbitMQ⽀持⼤多数开发语⾔如Java,Ruby,Python,.Net,C/C++,Erlang等,各种语⾔的客户端可以从官⽹上下载。
2. RabbitMQ的⼀些概念
- 连接(Connection),与RabbitMQ Server建⽴的⼀个连接,由ConnectionFactory创建,每个connection只与⼀个物理的Server进⾏连接,此连接是基于Socket进⾏连接的,这个可以相似的理解为像
⼀个DB Connection。AMQP⼀般使⽤TCP链接来保证消息传输的可靠性。
- 通道 (Channel),在C#客户端⾥应该是叫Model,其他客户端基本都叫Channel。建⽴在Connection基础上的⼀个通道,相对于Connection来说,它是轻量级的。它就像是Hibernate⾥⾯的Session⼀样。Channel 主要进⾏相关定义,发送消息,获取消息,事务处理等。Channel可以在多线程中使⽤,但是必须保证任何时候只有⼀个线程执⾏命令。⼀个Connection可以有多个Channel。客户端程序有时候会是⼀个多线程程序,每⼀个线程都想要和RabbitMQ进⾏连接,但是⼜不想共享⼀个连接,这种需求还是⽐较普遍的。因为⼀个Connection就是⼀个TCP链接,RabbitMQ在设计的时候不希望与每⼀个客户端保持多个TCP连接,但这确实是有些客户端的需求,所以在设计中引⼊了Channel的概念,每⼀个Channel之间没有任何联系,是完全分离的。多个Channel来共享⼀个Connection。
- 交换器(Exchange),发送消息的实体。
- 队列(Queue),接收消息的实体。
- 绑定器(Bind),将交换器和队列连接起来,并且封装消息的路由信息。
3.资源⽂件
4.发送消息
1///<summary>
2///</summary>
3///<param name="deptId">部门id</param>
4///<param name="userCode">⽤户名</param>
5///<returns></returns>
6 [Generated]
7protected override String SendMQ(long deptId, string userCode, string clientIP)
8 {
9var factory = new ConnectionFactory();
10 factory.HostName = "182.94.73.104"; //MQ服务端IP
11 factory.Port = 15673; //MQ 服务端端⼝
12 factory.UserName = "login_mq_user";
13 factory.Password = "chinaiss_xyz@345Q1uTfcx.vb";
14//factory.HostName = "localhost";
15//factory.UserName = "yy";
16//factory.Password = "hello!";
17
18
19string ticket = System.Guid.NewGuid().ToString(); //对应URL中的token参数
activemq重启命令20
21//MQINFOPoco MQobj = new MQINFOPoco();
22//MQobj.clientIP = clientIP;
23//MQobj.section = deptId;
24//MQobj.userId = userCode;
25//MQobj.loginTime = GetTimeStamp();
26//MQobj.timeLive = 480;
27//ken = ticket;
28var loginTime = GetTimeStamp();
29
30using (var connection = factory.CreateConnection())
31 {
32using (var channel = connection.CreateModel())
33 {
34 channel.QueueDeclare("login_queue", false, false, false, null); //MQ队列名称login_queue 35//string message = ObjectToJson(MQobj);
36 StringBuilder message = new StringBuilder(@"{""clientIP"":");
37 message.AppendFormat("\"{0}\"", clientIP);
38 message.AppendFormat(",\"section\":\"{0}\"", deptId);
39 message.AppendFormat(",\"userId\":\"{0}\"", userCode);
40//message.Append(",\"userId\":\"hxj\"");
41 message.AppendFormat(",\"loginTime\":{0}", loginTime);
42 message.Append(",\"timeLive\":480");
43 message.AppendFormat(",\"token\":\"{0}\"", ticket);
44 message.Append("}");
45
46var properties = channel.CreateBasicProperties();
47 properties.SetPersistent(true);
48var body = Encoding.UTF8.GetBytes(message.ToString());
49 channel.BasicPublish("", "login_queue", properties, body);
50 }
51 }
52
53return ticket;
54 }
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论