分布式事务解决⽅案之最⼤努⼒通知
6.1.什么是最⼤努⼒通知
最⼤努⼒通知也是⼀种解决分布式事务的⽅案,下边是⼀个是充值的例⼦:
交互流程:
  1、账户系统调⽤充值系统接⼝
  2、充值系统完成⽀付处理向账户系统发起充值结果通知,若通知失败,则充值系统按策略进⾏重复通知
  3、账户系统接收到充值结果通知修改充值状态。
  4、账户系统未接收到通知会主动调⽤充值系统的接⼝查询充值结果。
通过上边的例⼦我们总结最⼤努⼒通知⽅案的⽬标:
  ⽬标:发起通知⽅通过⼀定的机制最⼤努⼒将业务处理结果通知到接收⽅。
具体包括:
  1、有⼀定的消息重复通知机制。因为接收通知⽅可能没有接收到通知,此时要有⼀定的机制对消息重复通知。
  2、消息校对机制。如果尽最⼤努⼒也没有通知到接收⽅,或者接收⽅消费消息后要再次消费,此时可由接收⽅主动向通知⽅查询消息信息来满⾜需求。
最⼤努⼒通知与可靠消息⼀致性有什么不同?
  1、解决⽅案思想不同
  可靠消息⼀致性,发起通知⽅需要保证将消息发出去,并且将消息发到接收通知⽅,消息的可靠性关键由发起通知⽅来保证。
  最⼤努⼒通知,发起通知⽅尽最⼤的努⼒将业务处理结果通知为接收通知⽅,但是可能消息接收不到,此时需要接收通知⽅主动调⽤发起通知⽅的接⼝查询业务处理结果,通知的可靠性关键在接收通知⽅。
2、两者的业务应⽤场景不同
  可靠消息⼀致性关注的是交易过程的事务⼀致,以异步的⽅式完成交易。
  最⼤努⼒通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
3、技术解决⽅向不同
  可靠消息⼀致性要解决消息从发出到接收的⼀致性,即消息发出并且被接收到。
  最⼤努⼒通知⽆法保证消息从发出到接收的⼀致性,只提供消息接收的可靠性机制。可靠机制是,最⼤努⼒的将消息通知给接收⽅,当消息⽆法被接收⽅接收时,由接收⽅主动查询消息(业务处理结果)。
6.2.解决⽅案
通过对最⼤努⼒通知的理解,采⽤MQ的ack机制就可以实现最⼤努⼒通知。
⽅案1:
本⽅案是利⽤MQ的ack机制由MQ向接收通知⽅发送通知,流程如下:
  1、发起通知⽅将通知发给MQ。使⽤普通消息机制将通知发给MQ。
    注意:如果消息没有发出去可由接收通知⽅主动请求发起通知⽅查询业务执⾏结果。(后边会讲)
  2、接收通知⽅监听 MQ。
  3、接收通知⽅接收消息,业务处理完成回应ack。
  4、接收通知⽅若没有回应ack则MQ会重复通知。
    MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的⽅式,逐步拉⼤通知间隔(如果MQ采⽤rocketMq,在broker中可进⾏配置),直到达到通知要求的时间窗⼝上限。
  5、接收通知⽅可通过消息校对接⼝来校对消息的⼀致性。
⽅案2:
  本⽅案也是利⽤MQ的ack机制,与⽅案1不同的是应⽤程序向接收通知⽅发送通知,如下图:
交互流程如下:
  1、发起通知⽅将通知发给MQ。
    使⽤可靠消息⼀致⽅案中的事务消息保证本地事务与消息的原⼦性,最终将通知先发给MQ。
  2、通知程序监听 MQ,接收MQ的消息。
    ⽅案1中接收通知⽅直接监听MQ,⽅案2中由通知程序监听MQ。
    通知程序若没有回应ack则MQ会重复通知。
  3、通知程序通过互联⽹接⼝协议(如http、webservice)调⽤接收通知⽅案接⼝,完成通知。
    通知程序调⽤接收通知⽅案接⼝成功就表⽰通知成功,即消费MQ消息成功,MQ将不再向通知程序投递通知消息。
  4、接收通知⽅可通过消息校对接⼝来校对消息的⼀致性。
⽅案1和⽅案2的不同点:
  1、⽅案1中接收通知⽅与MQ接⼝,即接收通知⽅案监听 MQ,此⽅案主要应⽤与内部应⽤之间的通知。
  2、⽅案2中由通知程序与MQ接⼝,通知程序监听MQ,收到MQ的消息后由通知程序通过互联⽹接⼝协议调⽤接收通知⽅。此⽅案主要应⽤于外部应⽤之间的通知,例如⽀付宝、的⽀付结果通知。
6.3.RocketMQ实现最⼤努⼒通知型事务
6.3.1.业务说明
  本实例通过RocketMq中间件实现最⼤努⼒通知型分布式事务,模拟充值过程。
本案例有账户系统和充值系统两个微服务,其中账户系统的数据库是bank1数据库,其中有张三账户。充值系统的数据库使⽤bank1_pay数据库,记录了账户的充值记录。
业务流程如下图:
交互流程如下:
  1、⽤户请求充值系统进⾏充值。
  2、充值系统完成充值将充值结果发给MQ。
  3、账户系统监听MQ,接收充值结果通知,如果接收不到消息,MQ会重复发送通知。接收到充值结果通知账户系统增加充值⾦额。
  4、账户系统也可以主动查询充值系统的充值结果查询接⼝,增加⾦额。
6.3.2.程序组成部分
本⽰例程序组成部分如下:
  数据库:MySQL-5.7.25
  包括bank1和bank1_pay两个数据库。
  JDK:64位 jdk1.8.0_201
  rocketmq 服务端:RocketMQ-4.5.0
  rocketmq 客户端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE
  微服务框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
  微服务及数据库的关系:
    dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-bank1 银⾏1,操作张三账户,连接数据库bank1
    dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-pay 银⾏2,操作充值记录,连接数据库bank1_pay
交互流程如下:
  1、⽤户请求充值系统进⾏充值。
  2、充值系统完成充值将充值结果发给MQ。
  3、账户系统监听MQ,接收充值结果通知,如果接收不到消息,MQ会重复发送通知。接收到充值结果通知账户系统增加充值⾦额。
  4、账户系统也可以主动查询充值系统的充值结果查询接⼝,增加⾦额。
6.3.3.创建数据库
  导⼊数据库脚本:资料\sql\bank1.sql、资料\sql\bank1_pay.sql,已经导过不⽤重复导⼊。创建bank1库,并导⼊以下表结构和数据(包含张三账户)
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info`    (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银⾏卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE
)    ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);
DROP TABLE IF EXISTS `de_duplication`;
CREATE TABLE `de_duplication`    (
`tx_no`    varchar(64) COLLATE utf8_bin NOT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
创建bank1_pay库,并导⼊以下表结构:
CREATE DATABASE `bank1_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; CREATE TABLE `account_pay` (
`id` varchar(64) COLLATE utf8_bin NOT NULL,
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账号',
`pay_amount` double NULL DEFAULT NULL COMMENT '充值余额',
`result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值结果:success,fail',
PRIMARY KEY (`id`) USING BTREE
)ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FO
RMAT = Dynamic;
6.3.4.启动RocketMQ
  rocketmq启动⽅式与RocketMQ实现可靠消息最终⼀致性事务中完全⼀致
6.3.5 discover-server
  discover-server是服务注册中⼼,测试⼯程将⾃⼰注册⾄discover-server。
导⼊:资料\基础代码\dtx ⽗⼯程,此⼯程⾃带了discover-server,discover-server基于Eureka实现。
已经导过不⽤重复导⼊。
6.3.6 导⼊dtx-notifymsg-demo
dtx-notifymsg-demo是本⽅案的测试⼯程,根据业务需求需要创建两个dtx-notifymsg-demo⼯程。
(1)导⼊dtx-notifymsg-demo
导⼊:资料\基础代码\dtx-notifymsg-demo到⽗⼯程dtx下。
两个测试⼯程如下:
  dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-bank1 ,操作张三账户,连接数据库bank1
  dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-pay,操作李四账户,连接数据库bank1_pay
(2)⽗⼯程maven依赖说明
在dtx⽗⼯程中指定了SpringBoot和SpringCloud版本springcloud和springboot
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐dependencies</artifactId>
<version>2.1.3.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring‐cloud‐dependencies</artifactId>
<version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
在dtx-notifymsg-demo⽗⼯程中指定了rocketmq-spring-boot-starter的版本。
<dependency>
<groupId>ketmq</groupId>
<artifactId>rocketmq‐spring‐boot‐starter</artifactId>
<version>2.0.2</version>
</dependency>
( 3 ) 配置rocketMQ
在application-local.propertis中配置rocketMQ nameServer地址及⽣产组:
up = producer_bank2
rocketmq.name‐server = 127.0.0.1:9876
其它详细配置见导⼊的基础⼯程。
6.3.7 dtx-notifydemo-pay
dtx-notifydemo-pay实现如下功能:
  1、充值接⼝
  2、充值完成要通知
  3、充值结果查询接⼝
2)Dao
@Mapper
@Component
public interface AccountPayDao {
@Insert("insert into account_pay(id,account_no,pay_amount,result) values(#{id},# {accountNo},#{payAmount},#{result})")
int insertAccountPay(@Param("id") String id,@Param("accountNo") String accountNo, @Param("payAmount") Double pay_amount,@Param("result") String result);    @Select("select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}")
AccountPay findByIdTxNo(@Param("txNo") String txNo);
}
3)Service
@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService{
@Autowired
RocketMQTemplate rocketMQTemplate;
@Autowired
AccountPayDao accountPayDao;
@Transactional
@Override
public AccountPay insertAccountPay(AccountPay accountPay) {
int result = accountPayDao.Id(), AccountNo(), PayAmount(), "success");
if(result>0){
//发送通知
return accountPay;
}
return null;
}
@Override
public AccountPay getAccountPay(String txNo) {
AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
return accountPay;
}
}
4)Controller

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。