认真写文章,用心做分享。公众号:Java耕耘者 文章都会在里面更新,整理的资料也会放在里面。
事务消息
我们的所有事务消息都可以看作是BASE模型的实现。在业界中有事务消息功能比较有代表性的就是阿里开源的RocketMQ和去哪儿开源的QMQ,他们两个消息队列都实现了事务消息功能,但是实现的方式却各有不同,接下来也会分别剖析这两个消息队列是如何实现事务消息。
1. RocketMQ-事务消息
RocketMQ事务消息到底是怎么一回事呢?
基本流程如下: 第一阶段Prepared消息,会拿到消息的地址。 第二阶段执行本地事务。 第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。消息接受者就能使用这个消息。 如果确认消息失败,在RocketMq Broker中提供了定时扫描没有更新状态的消息,如果有消息没有得到确认,会向消息发送者发送消息,来判断是否提交,在rocketmq中是以listener的形式给发送者,用来处理。
如果确认消息失败,在RocketMq Broker中提供了定时扫描没有更新状态的消息,如果有消息没有得到确认,会向消息发送者发送消息,来判断是否提交,在rocketmq中是以listener的形式给发送者,用来处理。
如果消费超时,则需要一直重试,消息接收端需要保证幂等。如果消息消费失败,这个就需要人工进行处理,因为这个概率较低,如果为了这种小概率时间而设计这个复杂的流程反而得不偿失
这个图大家想必再其他地方已经看见过很多次了,很多时候从看这个图只能一知半解,那接下来看看代码是如何实现的吧。
2.使用事务消息
在RocketMQ的事务消息中有个很重要的监听器叫TransactionListener,我们需要实现他
其中有两个方法:
- executeLocalTransaction:顾名思义执行我们的本地事务方法,一般来说我们的本地事务方法是由上层的业务顺序推进调用,但是在rocketMQ的事务消息中是需要由Listener来进行驱动,如果要使用RocketMQ的事务消息需要对我们的业务进行一定的改造。并且这里还需要注意的是,我们在事务中还需要保存消息的事务ID和当前事务的对应关系。
- checkLocalTransaction:根据我们之前的事务ID来检查我们的本地事务状态,这里的状态有三种: 事务消息共有三种状态,提交状态、回滚状态、中间状态: TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。 TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。 TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。返回这个状态的时候RocketMQ会进行重试检查,为了防止频繁检查,默认将单个消息的检查次数限制为15 次。
对于我们的消息发送有如下代码:
我们发现在代码中我们将我们之前的listener以及一个线程池来和我们的producer进行绑定,这里线程池的作用是我们checkLocalTransaction所使用的线程池。
3. 实现原理
客户端
这里的代码比较简单,主要分下面几个步骤
- Step 1: 先发送消息至Broker.
- Step 2: 根据发送的结果,判断是否执行本地事务,如果发送成功,则执行本地事务。
- Step 3: 记录本地事务状态,这里的状态也就是上面我们所讲的提交事务,回滚事务,中间状态三个状态。
- Step 4: 结束事务,根据本地事务状态决定是提交或者回滚。
对于checkLocalTransaction:
在RocketMQ中会接收RocketMQ-Broker发送的CHECK_TRANSACTION_STATE请求,来执行检查本地事务状态。
服务端
在Broker上会对事务消息进行特殊判断:
如果是事务消息那么就需要走prepareMessage这个逻辑,prepareMessage这个逻辑如下:
主要是将当前消息的topic替换成RMQ_SYS_TRANS_HALF_TOPIC。我们的一阶段发送半消息到这里就完成了,接下来就是Broker处理我们事务的commit或者rollback:
图中红色方框表示我们的核心步骤,对于commit的一共有三步:
- 获取需要commit的半消息
- 将消息发送到原来的topic
- 删除半消息
对于rollback一共有两步:
- 获取需要rollback的半消息
- 删除半消息
对于获取消息这个比较简单,通过记录的offset直接查询就好,对于将消息发送到原来的topic逻辑基本上可以复用,这里要重点讨论的是如何删除半消息,我们都知道RocketMQ是顺序写入,我们不可能去真正的删除消息,那么就只能依靠一些其他的途径,我们可以想到消息消费了之后,只要offset不重置,这个消息就不会再被消费,那么其实就实现了删除的功能。RocketMQ也是通过这样的思路,自己实现了一个消费者,去消费RMQ_SYS_TRANS_HALF_TOPIC这个Topic,如果消息需要删除的话消费了之后就不需要做其他操作,如果不需要删除的话,消费了之后又会重新投递。
那其实核心就在于怎么去记录半消息是否应该删除呢?对于这个问题RocketMQ采用了新的TopicRMQ_SYS_TRANS_OP_HALF_TOPIC来保存半消息是否删除,其实在上面的删除半消息的流程中其实也是对RMQ_SYS_TRANS_OP_HALF_TOPIC投递了一个op_message,然后由后台任务去进行操作。
整个流程原理图如下面所示:
- Step1: 发送事务消息,这里也叫做halfMessage,会将Topic替换为HalfMessage的Topic。
- Step2: 发送commit或者rollback,如果是commit这里会查询出之前的消息,然后将消息复原成原Topic,并且发送一个OpMessage用于记录当前消息可以删除。如果是rollback这里会直接发送一个OpMessage删除。
- Step3: 在Broker有个处理事务消息的定时任务,定时对比halfMessage和OpMessage,如果有OpMessage且状态为删除,那么该条消息必定commit或者rollback,所以就可以删除这条消息。
- Step4: 如果事务超时(默认是6s),还没有opMessage,那么很有可能commit信息丢了,这里会去反查我们的Producer本地事务状态。
- Step5: 根据查询出来的信息做Step2。
4. 小结
上面已经讲了如何使用RocketMQ的事务消息和实现原理,想必大家已经对RocketMQ事务消息有自己的认识了。但是RocketMQ的事务消息目前在我的一些业务实战中是从来没有使用过的,主要原因有几个方面:
- 改造成本大,比如一个下单的操作,创建订单的本地事务一般来说是同步进行的,创建之后会获取到订单ID,但是在RocketMQ中这个本地事务变成了在Listener里面的操作了,那么就不能通过返回参数来进行,只能通过一些其他方法来完成这个业务逻辑,比如ThreadLocal等等。
- 需要记录TransactionId和本地事务状态的关系
- 只支持单个事务消息,如果我创建订单需要发送10种消息,如果都想保持事务一致,那么RocketMQ是不支持的。
综上所述,RocketMQ的事务消息在我看来的确属于比较鸡肋,很难去适应于老业务。那么怎么去接下来讲一下QMQ的事务消息的解决方案,看看这种方案能否解决我们所说的这种问题呢?
5. QMQ事务消息
QMQ的事务消息没有RocketMQ那么的复杂,对于消息中间件的本身改造是很小的,其依赖了数据库自身的本地事务,比如一个创建订单,需要发送两种消息,分别是A和B,那么有如下的伪代码:
begin transaction;
createOrder();
commit transaction;
sendMessageA();
snedMessageB();
这个时候我们发现消息A和消息B都在事务之外,其一致性得不到保证,那么其实我们发送消息的时候不一定要真正的和消息中间件打交道,我们可以做一个本地的存储,保存我们的消息:
begin transaction;
createOrder();
saveMessageA();
saveMessageB();
commit transaction;
// 发送消息
sendMessageA();
snedMessageB();
可以看见其实我们只是增加两个保存消息的操作,那么我们是如何保证一致性呢,如果发送MessageA的时候挂了,那么我们就可以通过定时任务去拉去我们数据库中保存的并没有发送的消息,然后再次进行发送。
其实这种方法同样的可以扩展至其他的消息队列,因为对于消息中间件本身是没有入侵的,如果RocketMQ或者Kafka也想使用这种方法来保证事务消息,也是可以的。
我们来看看这种方法能否解决RocketMQ事务消息带来的问题呢?
- 改造成本,只需要改造一次Client,在QMQ中重写了spring的TransactionSynchronization,可以直接把代码简化成如下面所示:
begin transaction;
createOrder();
sendMessageA();
snedMessageB();
commit transaction;
这里的send其实内部逻辑是saveMessage,在commit之后会自动进行发送,并且后台有定时任务会补偿发送。
- 不需要额外做transactionId和message的绑定
- 支持发送多个事务消息
RocketMQ事务消息带来的问题基本可以解决,但是其同样也有缺点,因为其引入了额外的数据库写,如果事务消息较多,那么就会多出很多写数据库的操作,对于响应时间比较敏感的服务需要仔细考虑
6.总结
介绍了两种事务消息,对于我个人而言,QMQ实现的方案能更加适应于大多数业务。但是这里要注意事务消息并不是所有的分布式一致性都能使用,事务消息使用的场景只能是发出这个消息就能代表这个操作成功的场景,什么意思呢?举个例子,比如我们支付的时候会扣积分,扣券等等,如果我发一个扣积分的消息能代表一定成功吗?这个肯定是不行的,因为用户的积分可能不够,就会导致扣除失败。如果是发送一个赠送积分的消息那么就可以代表成功,因为赠送积分是属于加法,并没有太多的限制。
如果发现事务消息不能很好的满足的满足业务场景,那么你就可以考虑其他的一些事务策略。
关注私信回复:555领取Java高级架构资料、Spring源码分析、Dubbo、Redis、Netty、zookeeper、Spring cloud、分布式等
本文暂时没有评论,来添加一个吧(●'◡'●)