RabbitMQ教程(四)RabbitMQ并发处理
前⾔:前⾯我们都讲解了⼀些基本的RabbitMQ配置及操作,现在我们来试下使⽤RabbitMQ处理⼀些简单的数据并发问题准备条件:先创建⼀个表students, 字段有id, count
CREATE TABLE Students
(
id INT IDENTITY PRIMARY KEY NOT NULL,
count INT NULL
)
我们准备通过每⼀次累加1,总和存储在count字段上
⼀、普通程序的处理
//创建数据库连接
private static BaseDAL dal = new BaseDAL("conName");
static void Main(string[] args)
{
//开启线程进⾏处理
new Thread(Update).Start();
Console.ReadLine();
}
private static void Update()
{
for (int i = 0; i <100; i++)
{
//获取当前表中的count数值
object total = dal.ExecComdToObject("select count from Students where id=1", null);
int value = int.Parse(total.ToString()) + 1;//将数值累加1
//将累加后的数值更新到表Students的字段count
string sql = string.Format("update Students set count={0}", value);
dal.ExecComd(sql);
Console.WriteLine("写⼊成功=" + i);
}
}
上⾯代码,我们就开启了1个线程进⾏读取写⼊,结果如下:
数据库的字段count值为:
发现结果没有问题,循环100次,结果是100
⼆、普通程序的并发处理
这次,我们开启2个线程进⾏并发读取,再同时写⼊。我们先把数据库的count置回0,再将代码修改⼀下。class Program
{
//创建数据库连接
private static BaseDAL dal = new BaseDAL("conName");
static void Main(string[] args)
{
//开启2线程进⾏处理
new Thread(Update).Start();
new Thread(Update).Start();
Console.ReadLine();
}
private static void Update()
{
for (int i = 1; i <=100; i++)
{
//获取当前表中的count数值
object total = dal.ExecComdToObject("select count from Students where id=1", null);
int value = int.Parse(total.ToString()) + 1;//将数值累加1
//将累加后的数值更新到表Students的字段count
string sql = string.Format("update Students set count={0}", value);
dal.ExecComd(sql);
Console.WriteLine("写⼊成功=" + i);
}
}
}
程序执⾏结果:
⼀看这程序我们就知道出问题了。那么这时候,数据库的count字段值是多少呢?
为什么会出现这个结果。是因为在并发的时候,前⾯线程执⾏的结果,会被后⾯的update进⾏了覆盖,所以值不会是200,也不会是刚好100.
那么这时候,我们的RabbitMQ就开始派上⽤场了(我这⾥就不介绍lock以及queue的使⽤,主要讲解RabbitMQ)
三、RabbitMQ并发处理
(1)RabbitMQ.Server代码处理,2线程并发,输送200次请求
static void Main(string[] args)
{
//开启2线程进⾏处理
new Thread(SendMsg).Start();
new Thread(SendMsg).Start();
Console.ReadLine();
}
private static ConnectionFactory factory = new ConnectionFactory() { HostName = "116.28.8.166", UserName = "admin", Password = "********", VirtualHost = "/" };
private static void SendMsg()
{
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
for (int i = 1; i <= 100; i++)
{
string guid = Guid.NewGuid().ToString();
var body = Encoding.UTF8.GetBytes(guid);
channel.QueueDeclare("AllenLeeQueue", false, false, false, null);
channel.BasicPublish("", "AllenLeeQueue", null, body);
Console.WriteLine("[Set Msg To AllenLeeQueue] " + guid);
}
}
}
}
(2)RabbitMQ.Client代码处理,对发起的200个请求进⾏接收
//创建数据库连接
private static BaseDAL dal = new BaseDAL("conName");
private static ConnectionFactory factory = new ConnectionFactory() { HostName = "116.28.8.166", UserName = "admin", Password = "********", VirtualHost = "/" };
static void Main(string[] args)
{
factory.AutomaticRecoveryEnabled = true;//设置端⼝后⾃动恢复连接属性即可
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [Get Msg from AllenLeeQueue] {0}", message);
//获取当前表中的count数值
object total = dal.ExecComdToObject("select count from Students where id=1", null);
int value = int.Parse(total.ToString()) + 1;//将数值累加1
string sql = string.Format("update Students set count={0}", value);
dal.ExecComd(sql);
};
channel.BasicConsume(queue: "AllenLeeQueue", noAck: true, consumer: consumer);
Console.ReadLine();
}
}
}
(3)如果RabbitMQ如果处理得当,数据库的字段count值就为200,不多不少,我们来看下数据库字段值
实践证明,RabbitMQ能很好的进⾏并发处理,达到了我们预期的效果。writeline教程
但是可能有朋友会觉得,这传的都是guid,如果是我们实际⼯作中,直接传的是并发业务数据,⼜该怎么处理呢?
其实就是在传输的body中,传⼊业务数据,再在RabbitMQ.Client进⾏业务数据转化就可以了。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。