SpringAMQP源码分析02-CachingConnectionFactory
### 准备
## ⽬标
了解 CachingConnectionFactory 在默认缓存模式下的⼯作原理
## 前置知识
《Spring AMQP 源码分析 01 - Impatient》
## 测试代码
同《Spring AMQP 源码分析 01 - Impatient》
### 分析
## 流程分析
从《Spring AMQP 源码分析 01》可知,在 RabbitTemplate 的execute(ChannelCallback action,
ConnectionFactory connectionFactory)⽅法中需要创建与销毁连接和信道。execute ⽅法调⽤ doExecute ⽅法完成相关逻辑,代码如下:
核⼼逻辑很简单,第1430⾏通过 CachingConnectionFactory 的 createConnection ⽅法创建
org.springframework.tion.Connection,第1435⾏通过 Connection 的 createChannel ⽅法创建
com.rabbitmq.client.Channel,第1455⾏将创建的 channel 回传给回调函数,执⾏业务操作。最后在 finally 块中释放信道和连接(不在截图中)。
## 创建连接
在看代码前先了解⼀下 CachingConnectionFactory 的功能。默认情况下(缓存模式
是 CacheMode.CHANNEL),CachingConnectionFactory 的 createConnection ⽅法总是返回同⼀个连接。通过连接获取的信道也是会被缓存的,但是缓存的细节与⽂档描述不⼀致,以实际代码为准。
CachingConnectionFactory 有⼀个属性ChannelCachingConnectionProxy connection,在缓存模式为 CacheMode.CHANNEL 时,⽤于缓存唯⼀的连接。ChannelCachingConnectionProxy  包含两个属性org.springframework.tion.Connection target
和AtomicBoolean closeNotified,target 代表真实的连接。ChannelCachingConnectionProxy
是 org.springframework.tion.Connection 的代理类,根据代理模式的定义,它也实现了 Connection 接⼝。
CachingConnectionFactory 的 createConnection ⽅法会返回ChannelCachingConnectionProxy connection(Line 573)。返回的代理连接需要保证connection.target 不为 null(Line 564)。第⼀次调⽤ createConnection ⽅法时 connection.target 值为 null,因此会调
⽤createBareConnection ⽅法创建出org.springframework.tion.SimpleConnection赋值给 connection.target(Line 565)。SimpleConnection 拥有com.rabbitmq.client.Connection delegate属性,持有真正的 RabbitMQ 连接
(com.rabbitmq.client.impl.AMQConnection)。createBareConnection ⽅法先通过com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory创建出 AMQConnection,再创建⼀个 SimpleConnection 实例,将 AMQConnection 赋值给 delegate。
代码第568⾏为当前 connection 绑定了⼀个 Semaphore,放在Map<Connection, Semaphore> checkoutPermits中。Semaphore 是⾮公平同步信号量,允许有channelCacheSize(默认为25)个访问许可。这和后⾯的信道缓存逻辑相关。
代码第571⾏向所有 ConnectionListener 发布 onCreate 事件。CachingConnectionFactory 拥有属性CompositeConnectionListener connectionListener ,是 ConnectionListener 的注册中⼼,同时它也是事件源,这部分代码是 Listener 模式的⼀个很好的例⼦。特别注意源码中使⽤CopyOnWriteArrayList 保存所有的 ConnectionListener,值得学习⼀下。
## 创建信道
创建信道是通过 ChannelCachingConnectionProxy 类的 createChannel ⽅法。⾸先判断channelCheckoutTimeout 参数值是否⼤于0,只有⼤于0的情况下才会通过 Semaphore 限制当前连接下可⽤的信道数量(不超过 Semaphore 的 permits 值,也就
是channelCacheSize 值),由于 channelCheckoutTimeout 默认值为0,所以默认情况下不会限制⼀个连接下可以有多少个信道。
整个信道的复⽤是通过LinkedList<ChannelProxy> channelList;实现的。CachingConnectionFactory 中有4个相关属性分别⽤来缓
存 CacheMode.CHANNEL 与 CacheMode.CONNECTION 两种缓存模式下⽀持事务与不⽀持事务的信道,对于本例,⽤的
是LinkedList<ChannelProxy> cachedChannelsNonTransactional。Spring AMQP 的缓存实现很普通:使⽤ channelList 作为缓存队列,所有对该队列的操作都通过 channelList ⾃⾝作为对象锁进⾏同步。
⾸先尝试从 channelList 中获取可⽤的缓存信道。在同步块中,先判断 channelList 是否为空,若不为空,则返回队列头部缓存的ChannelProxy(要从队列中移除)。如果没有可⽤的缓存信道,则通过getCachedChannelProxy ⽅法创建新的 ChannelProxy。创建ChannelProxy ⼤致步骤如下:
1. 先通过com.rabbitmq.client.Connection delegate创建出com.rabbitmq.client.Channel(com.rabbitmq.client.impl.ChannelN)实例。
(Line 492)
2. 向所有 ChannelListener 发布 onCreate 事件。(Line 496)
3. 创建动态代理。(Line 504)
本⽂不深⼊解释动态代理,简单来说,我们通过 wProxyInstance ⽅法凭空创建了⼀个实例,这个实例实现了 ChannelProxy 接⼝,所有对该实例的⽅法调⽤都会转交给CachedChannelInvocationHandler 的 invoke ⽅法处理。动态代理可以有效减少普通代理模式的代码量(⼤量的委托实现不再需要),接⼝定义发⽣变化时 InvocationHandler 也可能⽆需变更。
## ChannelProxy
ChannelProxy 第⼀次被调⽤是在业务逻辑中:
DeclareOk declareOk = channel.Name(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), Arguments());
记住这⼉的 channel 是org.springframework.tion.ChannelProxy的动态代理实例。对 queueDeclare ⽅法的调⽤,实际上是通过反射调⽤真正的信道(ChannelN)实例的相同⽅法完成的:
Object result = method.invoke(this.target, args);
## 关闭信道
关闭信道的代码很直接:
channel.close();
魔法就在于这个 channel 是动态代理实例,close ⽅法在 CachedChannelInvocationHandler 中被重新实现。
第912⾏的 channelList 就是 CachingConnectionFactory 的LinkedList<ChannelProxy> cachedChannelsNonTransactional,⼀路被传递到 CachedChannelInvocationHandler 中。第914⾏判断当前已缓存的信道数量是否已经达到阈值,保证缓存的信道数量不超
过 channelCacheSize 设定的值。(第915⾏代码⽬的是什么?)。如果最终需要缓存信道,则让 Semaphore 释放 permits(如果channelCheckoutTimeout > 0),将 ChannelProxy 放到 channelList 队尾。如果不需要缓存,则物理关闭信道,并让 Semaphore 释放permits(如果 channelCheckoutTimeout > 0)。
整理⼀下,默认 channelCacheSize 为25,表⽰最多为同⼀个连接缓存25个信道。如果 channelCheckoutTimeout 值为0(默认值),实际上并不限制同⼀连接下能同时存在的信道数量;如果 channelCheckoutTimeout 值⼤于0,则通过 Semaphore 机制保证最多只有25个信道能够同时被使⽤,超出数量的信道创建请求会抛出 AmqpTimeoutException 异常。
## 关闭连接springframework事务
实际的连接类是 ChannelCachingConnectionProxy,在默认的模式下,实际上关闭连接没有执⾏任何操作。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。