基于activemq的分布式事务解决⽅案1、分布式事务出现场景
场景描述:⽀付宝转账余额宝
分布式事务必须满⾜的条件:
1、远程RPC调⽤,⽀付宝和余额宝存在接⼝调⽤
2、⽀付宝和余额宝使⽤不同的数据库
如图:
2、分布式事务解决⽅案
1、基于数据库XA协议的两段提交
XA协议是数据库⽀持的⼀种协议,其核⼼是⼀个事务管理器⽤来统⼀管理两个分布式数据库,如图
事务管理器负责跟⽀付宝数据库和余额宝数据库打交道,⼀旦有⼀个数据库连接失败,另⼀个数据库的操作就不会进⾏,⼀个数据库操作失败就会导致另⼀个数据库回滚,只有他们全部成功两个数据库的事务才会提交。
基于XA协议的两段和三段提交是⼀种严格的安全确认机制,其安全性是⾮常⾼的,但是保证安全性的前提是牺牲了性能,这个就是分布式系统⾥⾯的CAP理论,做任何架构的前提需要有取舍。所以基于XA协议的分布式事务并发性不⾼,不适合⾼并发场景。
2、基于activemq的解决⽅案
如图:
1、⽀付宝扣款成功时往message表插⼊消息
2、message表有message_id(流⽔id,标识夸系统的⼀次转账操作),status(confirm,unconfirm)
3、timer扫描message表的unconfirm状态记录往activemq插⼊消息
4、余额宝收到消息消费消息时先查询message表如果有记录就不处理如果没记录就进⾏数据库增款操作
5、如果余额宝数据库操作成功往余额宝message表插⼊消息,表字段跟⽀付宝message⼀致
6、如果5操作成功,回调⽀付宝接⼝修改message表状态,把unconfirm状态转换成confirm状态
问题描述:
1、⽀付宝设计message表的⽬的
如果⽀付宝往activemq插⼊消息⽽余额宝消费消息异常,有可能是消费消息成功⽽事务操作异常,有可能是⽹络异常等等不确定因素。如果出现异常⽽activemq收到了确认消息的信号,这时候activemq中的消息是删除了的,消息丢失了。设置message表就是有⼀个消息存根,activemq中消息丢失了message表中的消息还在。解决了activemq消息丢失问题
2、余额宝设计message表的⽬的
当余额宝消费成功并且数据库操作成功时,回调⽀付宝的消息确认接⼝,如果回调接⼝时出现异常导致⽀付宝状态修改失败还是unconfirm 状态,这时候还会被timer扫描到,⼜会往activemq插⼊消息,⼜会被余额宝消费⼀边,但是这条消息已经消费成功了的只是回调失败⽽已,所以就需要有⼀个这样的message表,当余额宝消费时先插⼊message表,如果message根据message_id能查询到记录就说明之前这条消息被消费过就不再消费只需要回调成功即可,如果查询不到消息就消费这条消息继续数据库操作,数据库操作成功就往message表插⼊消息。 这样就解决了消息重复消费问题,这也是消费端的幂等操作。
基于消息中间件的分布式事务是最理想的分布式事务解决⽅案,兼顾了安全性和并发性!
接下来贴代码:
⽀付宝代码:
@Controller
@RequestMapping("/order")
public class OrderController {
/**
* @Description TODO
* @param @return 参数
* @return String 返回类型
* @throws
*
* userID:转账的⽤户ID
* amount:转多少钱
*/
@Autowired
@Qualifier("activemq")
OrderService orderService;
@RequestMapping("/transfer")
public @ResponseBody String transferAmount(String userId,String messageId, int amount) {
try {
orderService.updateAmount(amount,messageId, userId);
}
catch (Exception e) {
e.printStackTrace();
return "===============================transferAmount failed==================="; }
return "===============================transferAmount successfull==================="; }
@RequestMapping("/callback")
public String callback(String param) {
JSONObject parse = JSONObject.parseObject(param);
String respCode = String("respCode");
if(!"OK".equalsIgnoreCase(respCode)) {
return null;
}
try {
orderService.updateMessage(param);
jdbctemplate查询一条数据}catch (Exception e) {
e.printStackTrace();
return "fail";
}
return "ok";
}
}
public interface OrderService {
public void updateAmount(int amount, String userId,String messageId);
public void updateMessage(String param);
}
@Service("activemq")
@Transactional(rollbackFor = Exception.class)
public class OrderServiceActivemqImpl implements OrderService {
Logger logger = Logger(getClass());
@Autowired
JdbcTemplate jdbcTemplate;
@Autowired
JmsTemplate jmsTemplate;
@Override
public void updateAmount(final int amount, final String messageId, final String userId) {
String sql = "update account set amount = amount - ?,update_time=now() where user_id = ?";
int count = jdbcTemplate.update(sql, new Object[]{amount, userId});
if (count == 1) {
//插⼊到消息记录表
sql = "insert into message(user_id,message_id,amount,status) values (?,?,?,?)";
int row = jdbcTemplate.update(sql,new Object[]{userId,messageId,amount,"unconfirm"});
if(row == 1) {
//往activemq中插⼊消息
jmsTemplate.send("zg.jack.queue", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
com.zhuguang.jack.bean.Message message = new com.zhuguang.jack.bean.Message(); message.setAmount(Integer.valueOf(amount));
message.setStatus("unconfirm");
message.setUserId(userId);
message.setMessageId(messageId);
ateObjectMessage(message);
}
});
}
}
}
@Override
public void updateMessage(String param) {
JSONObject parse = JSONObject.parseObject(param);
String messageId = String("messageId");
String sql = "update message set status = ? where message_id = ?";
int count = jdbcTemplate.update(sql,new Object[]{"confirm",messageId});
if(count == 1) {
logger.info(messageId + " callback successfull");
}
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论