SpringBoot集成RocketMQ事務(wù)消息的完整指南
事務(wù)消息是 RocketMQ 提供的一種高級消息類型,用于解決分布式場景下,本地數(shù)據(jù)庫事務(wù)與消息發(fā)送之間的一致性問題。它通過兩階段提交和事務(wù)狀態(tài)回查機(jī)制,確保本地事務(wù)執(zhí)行與消息投遞達(dá)到最終一致性,尤其適用于訂單支付、積分變更等需要高可靠性的業(yè)務(wù)場景。
事務(wù)消息的核心原理
事務(wù)消息的核心機(jī)制可以概括為以下兩個階段和一種補(bǔ)償機(jī)制:
第一階段:發(fā)送半消息(Half Message)??
- 生產(chǎn)者向 Broker 發(fā)送一條半消息。這條消息與普通消息不同,它已經(jīng)持久化到服務(wù)端,但對消費(fèi)者不可見,暫時不能被消費(fèi)。
- Broker 收到半消息并持久化成功后,會向生產(chǎn)者返回確認(rèn)響應(yīng)。
第二階段:提交或回滾?
生產(chǎn)者開始執(zhí)行本地事務(wù)?(例如,操作本地數(shù)據(jù)庫)。
根據(jù)本地事務(wù)的執(zhí)行結(jié)果(成功或失敗),生產(chǎn)者向 Broker 發(fā)送 ?二次確認(rèn)指令?(Commit 或 Rollback)。
- ?Commit?:Broker 將半消息轉(zhuǎn)換為正式消息,對消費(fèi)者可見,等待被消費(fèi)。
- ?Rollback?:Broker 會回滾該事務(wù),即刪除半消息,消息不會被投遞。
?事務(wù)回查(Transaction Check)??
- 這是關(guān)鍵的補(bǔ)償機(jī)制。如果因?yàn)榫W(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致 Broker 長時間未收到二次確認(rèn),Broker 會主動向生產(chǎn)者發(fā)起消息回查。
- 生產(chǎn)者收到回查后,需要去檢查該消息對應(yīng)的本地事務(wù)的最終狀態(tài)(例如查詢數(shù)據(jù)庫),并根據(jù)實(shí)際狀態(tài)再次向 Broker 提交 Commit 或 Rollback 指令。這保證了即使在異常情況下,事務(wù)也能最終達(dá)成一致。
為了更直觀地理解整個流程,下圖概括了事務(wù)消息的完整生命周期:

在SpringBoot項(xiàng)目中實(shí)現(xiàn)事務(wù)消息
下面我們基于 rocketmq-spring-boot-starter來實(shí)現(xiàn)一個完整的事務(wù)消息示例,以“訂單支付成功后通知積分服務(wù)增加積分”為場景。
1. 添加依賴
首先確保 pom.xml中包含必要的依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2. 配置生產(chǎn)者與事務(wù)監(jiān)聽器
核心是創(chuàng)建一個事務(wù)監(jiān)聽器,它包含了執(zhí)行本地事務(wù)和處理事務(wù)回查的兩個方法。
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
@Service
@RocketMQTransactionListener(txProducerGroup = "tx-order-group") // 與發(fā)送方組名一致
public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
/**
* 執(zhí)行本地事務(wù)
* @param msg 收到的消息
* @param arg 調(diào)用sendMessageInTransaction時傳入的額外參數(shù)
* @return 事務(wù)狀態(tài)
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 從消息頭或arg中獲取業(yè)務(wù)ID,如訂單ID
String orderId = (String) msg.getHeaders().get("orderId");
try {
// 執(zhí)行本地業(yè)務(wù)邏輯,例如:更新訂單狀態(tài)為“支付成功”
boolean success = orderService.updateOrderStatus(orderId, OrderStatus.PAID);
// 根據(jù)執(zhí)行結(jié)果返回提交或回滾
return success ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
} catch (Exception e) {
// 記錄日志,返回UNKNOWN狀態(tài),等待Broker回查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 事務(wù)回查方法
* @param msg 收到的消息
* @return 事務(wù)狀態(tài)
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = (String) msg.getHeaders().get("orderId");
// 根據(jù)orderId查詢數(shù)據(jù)庫,確認(rèn)本地事務(wù)的最終狀態(tài)
OrderStatus status = orderService.queryOrderStatus(orderId);
if (OrderStatus.PAID.equals(status)) {
// 本地事務(wù)已成功,提交消息
return RocketMQLocalTransactionState.COMMIT;
} else if (OrderStatus.FAILED.equals(status)) {
// 本地事務(wù)已失敗,回滾消息
return RocketMQLocalTransactionState.ROLLBACK;
} else {
// 狀態(tài)仍不明確,繼續(xù)等待下次回查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
3. 發(fā)送事務(wù)消息
在業(yè)務(wù)服務(wù)中,使用 RocketMQTemplate發(fā)送事務(wù)消息。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void payOrder(String orderId) {
// 1. 構(gòu)建消息
Message<String> message = MessageBuilder.withPayload("訂單支付成功,增加積分")
.setHeader("orderId", orderId) // 設(shè)置業(yè)務(wù)ID,用于回查
.build();
// 2. 發(fā)送事務(wù)消息
// 參數(shù)1: 事務(wù)組名(需與監(jiān)聽器內(nèi)txProducerGroup一致)
// 參數(shù)2: 主題(Topic)
// 參數(shù)3: 消息體
// 參數(shù)4: 可選參數(shù),會傳遞給executeLocalTransaction方法的arg參數(shù)
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx-order-group", "order-topic", message, orderId);
System.out.println("發(fā)送結(jié)果:" + result.getSendStatus());
}
}
4. 消費(fèi)者端實(shí)現(xiàn)冪等性
事務(wù)消息只能保證消息生產(chǎn)端的一致性,消費(fèi)端需要自行保證消息的冪等性,因?yàn)榫W(wǎng)絡(luò)重試可能導(dǎo)致消息被重復(fù)消費(fèi)。
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group")
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 1. 解析消息,獲取訂單ID
// 2. 【關(guān)鍵】冪等性校驗(yàn):查詢數(shù)據(jù)庫或Redis,判斷該訂單的積分是否已經(jīng)添加過
// if (已處理) { return; }
// 3. 執(zhí)行業(yè)務(wù)邏輯(例如,為用戶增加積分)
// creditService.addCredit(...);
// 4. 記錄處理狀態(tài),標(biāo)記該消息已處理
}
}
重要注意事項(xiàng)與最佳實(shí)踐
- ?事務(wù)狀態(tài)的可查詢性?:
checkLocalTransaction方法需要能夠查詢本地事務(wù)的最終狀態(tài)。通常的做法是,在執(zhí)行本地事務(wù)時,將事務(wù)狀態(tài)(如訂單狀態(tài))持久化到數(shù)據(jù)庫中,以便回查時使用。 - ?避免未知狀態(tài)?:雖然
UNKNOWN狀態(tài)是回查機(jī)制的保障,但在生產(chǎn)中應(yīng)盡量明確返回COMMIT或ROLLBACK,避免大量消息進(jìn)入回查流程,影響系統(tǒng)性能和增加復(fù)雜度。 - ?消息回查配置?:Broker 端有關(guān)于回查間隔和最大回查次數(shù)的配置,需要根據(jù)業(yè)務(wù)容忍度進(jìn)行合理設(shè)置。
- ?主題類型匹配?:事務(wù)消息必須發(fā)送到類型為
Transaction的主題上。
以上就是SpringBoot集成RocketMQ事務(wù)消息的完整指南的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot集成RocketMQ事務(wù)消息的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實(shí)現(xiàn)飛機(jī)大戰(zhàn)游戲?附完整源碼
這篇文章主要介紹了Java實(shí)現(xiàn)飛機(jī)大戰(zhàn)游戲,本文給大家分享完整源代碼和效果圖展示,對java飛機(jī)大戰(zhàn)游戲?qū)崿F(xiàn)代碼感興趣的朋友一起看看吧2022-05-05
舉例說明JAVA調(diào)用第三方接口的GET/POST/PUT請求方式
在日常工作和學(xué)習(xí)中,有很多地方都需要發(fā)送請求,這篇文章主要給大家介紹了關(guān)于JAVA調(diào)用第三方接口的GET/POST/PUT請求方式的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01
Struts2學(xué)習(xí)筆記(9)-Result配置全局結(jié)果集
這篇文章主要介紹Struts2中使用Result配置全局結(jié)果集的方法,希望能給大家做一個參考。2016-06-06
這一次搞懂Spring代理創(chuàng)建及AOP鏈?zhǔn)秸{(diào)用過程操作
這篇文章主要介紹了這一次搞懂Spring代理創(chuàng)建及AOP鏈?zhǔn)秸{(diào)用過程操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08
淺談Java中ThreadLocal內(nèi)存泄露的原因及處理方式
內(nèi)存泄漏就是我們申請了內(nèi)存,但是該內(nèi)存一直無法釋放,就會導(dǎo)致內(nèi)存溢出問題,本文詳細(xì)的介紹了ThreadLocal內(nèi)存泄露的原因及處理方式,感興趣的可以了解一下2023-05-05
Mybatis實(shí)現(xiàn)自定義類型轉(zhuǎn)換器TypeHandler的方法
Mybatis實(shí)現(xiàn)自定義的轉(zhuǎn)換器非常的簡單,只需要三步就可以實(shí)現(xiàn)自定義類型轉(zhuǎn)換器TypeHandler,非常不錯,具有參考借鑒價值,感興趣的朋友一起看下吧2016-07-07
Java實(shí)現(xiàn)TXT文件導(dǎo)入功能的詳細(xì)步驟
在實(shí)際開發(fā)中,很多應(yīng)用場景需要將用戶上傳的 TXT 文件進(jìn)行解析,并將文件中的數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫或其他存儲系統(tǒng)中,本文將演示如何用 Java 實(shí)現(xiàn)一個基本的 TXT 文件導(dǎo)入功能,需要的朋友可以參考下2025-08-08

