spring中使⽤异步事件实现同步事务
结合Scala+Spring,我们将采取⼀个很简单的场景:下订单,然后发送⼀封电⼦邮件。
编制⼀个服务:
@Service
class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) {
@Transactional
def placeOrder(order: Order) {
orderDao save order //保存订单
mailNotifier sendMail order //发送邮件
}
}
上⾯代码是在保存订单和发送邮件两个同步执⾏,发送邮件需要连接邮件服务器,⽐较耗时,拖延了整个性能,我们采取异步发送电⼦邮件,利⽤Spring内置的⾃定义事件,与JMS或其他⽣产者 - 消费者类似。
case class OrderPlacedEvent(order: Order) extends ApplicationEvent
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
@Transactional
def placeOrder(order: Order) {
orderDao save order
eventPublisher publishEvent OrderPlacedEvent(order)
}
}
区别是继承了ApplicationEvent 之前是直接⽤ OrderMailNotifier 直接发送,⽽现在我们使⽤发送发邮件事件了。
事件监听者代码如下:
@Service
class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] {
def onApplicationEvent(event: OrderPlacedEvent) {
//
}
}
在监听者⽅法中真正实现邮件发送。
但是Spring的ApplicationEvents是同步事件,意味着我们并没有真正实现异步,程序还会在这⾥堵塞,如果希望异步,我们需要重新定义⼀个ApplicationEventMulticaster,实现类型SimpleApplicationEvent
Multicaster和TaskExecutor:
@Bean
def applicationEventMulticaster() = {
val multicaster = new SimpleApplicationEventMulticaster()
multicaster.setTaskExecutor(taskExecutor())
multicaster
}
@Bean
def taskExecutor() = {
val pool = new ThreadPoolTaskExecutor()
pool.setMaxPoolSize(10)
pool.setCorePoolSize(10)
pool.setThreadNamePrefix("Spring-Async-")
pool
}
Spring通过使⽤TaskExecutor已经⽀持⼴播事件了,对onApplicationEvent() 标注
@Async
def onApplicationEvent(event: OrderPlacedEvent) { //...
如果你希望使⽤@Async,可以编制⾃⼰的异步执⾏器:
@Configuration
@EnableAsync
class ThreadingConfig extends AsyncConfigurer {
def getAsyncExecutor = taskExecutor()
@Bean
def taskExecutor() = {
val pool = new ThreadPoolTaskExecutor()
pool.setMaxPoolSize(10)
pool.setCorePoolSize(10)
pool.setThreadNamePrefix("Spring-Async-")
pool
}
}
@ EnableAsync是⾜够了。,默认情况下,Spring使⽤SimpleAsyncTaskExecutor类创建新的线程。
以上所有设置暴露⼀个真正的问题。现在,我们虽然使⽤其他线程发送⼀个异步消息处理。不幸的是,我们引⼊竞争条件。
1. 开始事务
2. 存储order到数据库
3. 发送⼀个包装order的消息
4. 确认
springframework事务
异步线程获得OrderPlacedEvent并开始处理。现在的问题是,它发⽣(3)之后,还是(4)之前或者(4)之后?这有⼀个很⼤的区别!在前者的情况下,交易也尚未提交订单所以不存在于数据库中。另⼀⽅⾯,延迟加载可能已经在⼯作,致使订单对象仍然然绑定在PersistenceContext(缺省我们使⽤JPA)。
解决办法是使⽤ .,可以注册很多监听者,它对于事务的提交或回滚都有事件发送。
@Transactional
def placeOrder(order: Order) {
orderDao save order
afterCommit {
eventPublisher publishEvent OrderPlacedEvent(order)
}
}
private def afterCommit[T](fun: => T) {
override def afterCommit() {
fun
}
})
}
当前事务提交后 afterCommit()接受调⽤,可以安全地调⽤registerSynchronization()多次 - 存储在Set并且本地保存到当前事务中,事务提交后消失。
我们将afterCommit⽅法单独抽象成⼀个类,分离关注。
class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher)
extends ApplicationEventPublisher {
override def publishEvent(event: ApplicationEvent) {
if (TransactionSynchronizationManager.isActualTransactionActive) {
new TransactionSynchronizationAdapter {
override def afterCommit() {
delegate publishEvent event
}
})
}
else
delegate publishEvent event
}
}
TransactionAwareApplicationEventPublisher是实现Spring的ApplicationEventPublisher。
我们要将这个新的实现告诉Spring替换掉旧的,⽤@Primary:
@Resource
val applicationContext: ApplicationContext = null
@Bean
@Primary
def transactionAwareApplicationEventPublisher() =
new TransactionAwareApplicationEventPublisher(applicationContext)
再看看原来的订单服务:
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
@Transactional
def placeOrder(order: Order) {
orderDao save order
eventPublisher publishEvent OrderPlacedEvent(order)
}
注意这⾥ApplicationEventPublisher已经是我们⾃⼰实现的TransactionAwareApplicationEventPublisher,将被⾃动注⼊这个服务。最后,要在真正订单保存的业务代码上放置事务:
def placeOrder(order: Order) {
storeOrder(order)
eventPublisher publishEvent OrderPlacedEvent(order)
}
@Transactional
def storeOrder(order: Order) = orderDao save order
当然这没有根本解决问题,如果placeOrder有⼀个更⼤的事务怎么办?

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。