SpringBoot如何整合RocketMQ事務、廣播以及順序訊息
May 18, 2023 am 10:04 AM環(huán)境:springboot2.3.9RELEASE RocketMQ4.8.0
#依賴
<dependency>???<groupid>org.springframework.boot</groupid>?????<artifactid>spring-boot-starter-web</artifactid>?</dependency>?<dependency>?????<groupid>org.apache.rocketmq</groupid>?????<artifactid>rocketmq-spring-boot-starter</artifactid>?????<version>2.2.0</version>?</dependency>
設定檔
server:???port:?8080?---?rocketmq:???nameServer:?localhost:9876???producer:?????group:?demo-mq
普通訊息
發(fā)送
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?send(String?message)?{???rocketMQTemplate.convertAndSend("test-topic:tag2",?MessageBuilder.withPayload(message).build());?}
接受
@RocketMQMessageListener(topic?=?"test-topic",?consumerGroup?=?"consumer01-group",?selectorExpression?=?"tag1?||?tag2")?@Component?public?class?ConsumerListener?implements?RocketMQListener<string>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("接收到消息:"?+?message)?;?????}??}</string>
順序訊息
發(fā)送
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??public?void?sendOrder(String?topic,?String?message,?String?tags,?int?id)?{?????rocketMQTemplate.asyncSendOrderly(topic?+?":"?+?tags,?MessageBuilder.withPayload(message).build(),??????????????"order-"?+?id,?new?SendCallback()?{?????????????????@Override?????????????????public?void?onSuccess(SendResult?sendResult)?{?????????????????????System.err.println("msg-id:?"?+?sendResult.getMsgId()?+?":?"?+?message?+"\tqueueId:?"?+?sendResult.getMessageQueue().getQueueId())?;?????????????????}?????????????????@Override?????????????????public?void?onException(Throwable?e)?{?????????????????????e.printStackTrace()?;?????????????????}?????????????});?}
這裡是根據(jù)hashkey將訊息傳送到不同的佇列中
@RocketMQMessageListener(topic?=?"order-topic",?consumerGroup?=?"consumer02-group",??????selectorExpression?=?"tag3?||?tag4",?consumeMode?=?ConsumeMode.ORDERLY)?@Component?public?class?ConsumerOrderListener?implements?RocketMQListener<string>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println(Thread.currentThread().getName()?+?"?接收到Order消息:"?+?message)?;?????}??}</string>
consumeMode = ConsumeMode.ORDERLY,指明了訊息模式為順序模式,一個佇列,一個執(zhí)行緒。
結果
當consumeMode = ConsumeMode.CONCURRENTLY執(zhí)行結果如下:
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?send(String?topic,?String?message,?String?tags)?{?????rocketMQTemplate.send(topic?+?":"?+?tags,?MessageBuilder.withPayload(message).build())?;?}叢集訊息模式消費端
@RocketMQMessageListener(topic?=?"broad-topic",?consumerGroup?=?"consumer03-group",??????selectorExpression?=?"tag6?||?tag7",?messageModel?=?MessageModel.CLUSTERING)?@Component?public?class?ConsumerBroadListener?implements?RocketMQListener<string>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("ConsumerBroadListener1接收到消息:"?+?message)?;?????}??}</string>messageModel = MessageModel.CLUSTERING測試#啟動兩個服務分別連接埠是8080,8081#8080服務
@RocketMQMessageListener(topic?=?"broad-topic",?consumerGroup?=?"consumer03-group",??????selectorExpression?=?"tag6?||?tag7",?messageModel?=?MessageModel.BROADCASTING)?@Component?public?class?ConsumerBroadListener?implements?RocketMQListener<string>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("ConsumerBroadListener1接收到消息:"?+?message)?;?????}??}</string>messageModel = MessageModel.BROADCASTING
測試
啟動兩個服務分別連接埠是8080,8081
#8080服務
叢集訊息模式下,每個服務分別都接受了相同的訊息。
事務訊息
RocketMQ事務的3個狀態(tài)
TransactionStatus.CommitTransaction:提交事務訊息,消費者可以消費此訊息
TransactionStatus.RollbackTransaction:回滾事務,它代表該訊息將被刪除,不允許被消費。
TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查訊息佇列來決定狀態(tài)。
RocketMQ實現(xiàn)事務訊息主要分為兩個階段:正常事務的發(fā)送與提交、事務資訊的補償流程整體流程為:
正常事務發(fā)送與提交階段
1、生產(chǎn)者發(fā)送一個半訊息給MQServer(半訊息是指消費者暫時不能消費的訊息)
#2、服務端回應訊息寫入結果,半訊息發(fā)送成功
3.開始執(zhí)行本地事務
4、根據(jù)本地事務的執(zhí)行狀態(tài)執(zhí)行Commit或Rollback操作
事務資訊的補償流程
1、如果MQServer長時間沒收到本地事務的執(zhí)行狀態(tài)會向生產(chǎn)者發(fā)起一個確認回查的操作請求
2、生產(chǎn)者收到確認回查請求後,檢查本地事務的執(zhí)行狀態(tài)
3、根據(jù)檢查後的結果執(zhí)行Commit或Rollback操作
補償階段主要是用來解決生產(chǎn)者在發(fā)送Commit或Rollback操作時發(fā)生逾時或失敗的情況。
發(fā)送端
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?sendTx(String?topic,?Long?id,?String?tags)?{?????rocketMQTemplate.sendMessageInTransaction(topic?+?":"?+?tags,?MessageBuilder.withPayload(?????????????new?Users(id,?UUID.randomUUID().toString().replaceAll("-",?""))).?????????????setHeader("BID",?UUID.randomUUID().toString().replaceAll("-",?"")).build(),??????????????UUID.randomUUID().toString().replaceAll("-",?""))?;?}
生產(chǎn)者對應的監(jiān)聽器
@RocketMQTransactionListener?public?class?ProducerTxListener?implements?RocketMQLocalTransactionListener?{??????????@Resource?????private?BusinessService?bs?;??????@Override?????public?RocketMQLocalTransactionState?executeLocalTransaction(Message?msg,?Object?arg)?{?????????//?這里執(zhí)行本地的事務操作,比如保存數(shù)據(jù)。?????????try?{?????????????//?創(chuàng)建一個日志記錄表,將這唯一的ID存入數(shù)據(jù)庫中,在下面的check方法中可以根據(jù)這個id查詢是否有數(shù)據(jù)?????????????String?id?=?(String)?msg.getHeaders().get("BID")?;?????????????Users?users?=?new?JsonMapper().readValue((byte[])msg.getPayload(),?Users.class)?;?????????????System.out.println("消息內容:"?+?users?+?"\t參與數(shù)據(jù):"?+?arg?+?"\t本次事務的唯一編號:"?+?id)?;?????????????bs.save(users,?new?UsersLog(users.getId(),?id))?;?????????}?catch?(Exception?e)?{?????????????e.printStackTrace()?;?????????????return?RocketMQLocalTransactionState.ROLLBACK?;?????????}?????????return?RocketMQLocalTransactionState.COMMIT?;?????}??????@Override?????public?RocketMQLocalTransactionState?checkLocalTransaction(Message?msg)?{?????????//?這里檢查本地事務是否執(zhí)行成功?????????String?id?=?(String)?msg.getHeaders().get("BID")?;?????????System.out.println("執(zhí)行查詢ID為:"?+?id?+?"?的數(shù)據(jù)是否存在")?;?????????UsersLog?usersLog?=?bs.queryUsersLog(id)?;?????????if?(usersLog?==?null)?{?????????????return?RocketMQLocalTransactionState.ROLLBACK?;?????????}?????????return?RocketMQLocalTransactionState.COMMIT?;?????}??}
消費端
@RocketMQMessageListener(topic?=?"tx-topic",?consumerGroup?=?"consumer05-group",?selectorExpression?=?"tag10")?@Component?public?class?ConsumerTxListener?implements?RocketMQListener<users>?{??????@Override?????public?void?onMessage(Users?users)?{?????????System.out.println("TX接收到消息:"?+?users)?;?????}??}</users>
Service
@Transactional?public?boolean?save(Users?users,?UsersLog?usersLog)?{?????usersRepository.save(users)?;?????usersLogRepository.save(usersLog)?;?????if?(users.getId()?==?1)?{?????????throw?new?RuntimeException("數(shù)據(jù)錯誤")?;?????}?????return?true?;?}??????public?UsersLog?queryUsersLog(String?bid)?{?????return?usersLogRepository.findByBid(bid)?;?}
Controller
@GetMapping("/tx/{id}")?public?Object?sendTx(@PathVariable("id")Long?id)?{?????ps.sendTx("tx-topic",?id,?"tag10")?;?????return?"send?transaction?success"?;?}測試呼叫介面後,控制臺輸出:
從列印日誌看出來都保存完畢了後消費端才接受到訊息。
以上是SpringBoot如何整合RocketMQ事務、廣播以及順序訊息的詳細內容。更多資訊請關注PHP中文網(wǎng)其他相關文章!

熱AI工具

Undress AI Tool
免費脫衣圖片

Undresser.AI Undress
人工智慧驅動的應用程序,用於創(chuàng)建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發(fā)環(huán)境

Dreamweaver CS6
視覺化網(wǎng)頁開發(fā)工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

Jasypt介紹Jasypt是一個java庫,它允許開發(fā)員以最少的努力為他/她的專案添加基本的加密功能,並且不需要對加密工作原理有深入的了解用於單向和雙向加密的高安全性、基於標準的加密技術。加密密碼,文本,數(shù)字,二進位檔案...適合整合到基於Spring的應用程式中,開放API,用於任何JCE提供者...添加如下依賴:com.github.ulisesbocchiojasypt-spring-boot-starter2. 1.1Jasypt好處保護我們的系統(tǒng)安全,即使程式碼洩露,也可以保證資料來源的

一、Redis實現(xiàn)分散式鎖原理為什麼需要分散式鎖在聊分散式鎖之前,有必要先解釋一下,為什麼需要分散式鎖。與分散式鎖相對就的是單機鎖,我們在寫多執(zhí)行緒程式時,避免同時操作一個共享變數(shù)產(chǎn)生資料問題,通常會使用一把鎖來互斥以保證共享變數(shù)的正確性,其使用範圍是在同一個進程中。如果換做是多個進程,需要同時操作一個共享資源,如何互斥?現(xiàn)在的業(yè)務應用通常是微服務架構,這也意味著一個應用會部署多個進程,多個進程如果需要修改MySQL中的同一行記錄,為了避免操作亂序導致髒數(shù)據(jù),此時就需要引入分佈式鎖了。想要實現(xiàn)分

1.自訂RedisTemplate1.1、RedisAPI預設序列化機制基於API的Redis快取實作是使用RedisTemplate範本進行資料快取操作的,這裡開啟RedisTemplate類,查看該類別的源碼資訊publicclassRedisTemplateextendsRedisAccessorimplementsRedisOperations,BeanClassLoaderAware{//聲明了value的各種序列化方式,初始值為空@NullableprivateRedisSe

springboot讀取文件,打成jar包後訪問不到最新開發(fā)出現(xiàn)一種情況,springboot打成jar包後讀取不到文件,原因是打包之後,文件的虛擬路徑是無效的,只能通過流去讀取。文件在resources下publicvoidtest(){Listnames=newArrayList();InputStreamReaderread=null;try{ClassPathResourceresource=newClassPathResource("name.txt");Input

使用場景1、下單成功,30分鐘未支付。支付超時,自動取消訂單2、訂單簽收,簽收後7天未進行評估。訂單超時未評價,系統(tǒng)預設好評3、下單成功,商家5分鐘未接單,訂單取消4、配送超時,推播簡訊提醒…對於延時比較長的場景、即時性不高的場景,我們可以採用任務調度的方式定時輪詢處理。如:xxl-job今天我們採

在Springboot+Mybatis-plus不使用SQL語句進行多表添加操作我所遇到的問題準備工作在測試環(huán)境下模擬思維分解一下:創(chuàng)建出一個帶有參數(shù)的BrandDTO對像模擬對後臺傳遞參數(shù)我所遇到的問題我們都知道,在我們使用Mybatis-plus中進行多表操作是極其困難的,如果你不使用Mybatis-plus-join這一類的工具,你只能去配置對應的Mapper.xml文件,配置又臭又長的ResultMap,然後再寫對應的sql語句,這種方法雖然看上去很麻煩,但具有很高的靈活性,可以讓我們

SpringBoot和SpringMVC都是Java開發(fā)中常用的框架,但它們之間有一些明顯的差異。本文將探究這兩個框架的特點和用途,並對它們的差異進行比較。首先,我們來了解一下SpringBoot。 SpringBoot是由Pivotal團隊開發(fā)的,它旨在簡化基於Spring框架的應用程式的建立和部署。它提供了一種快速、輕量級的方式來建立獨立的、可執(zhí)行

一、@Import引入普通類別@Import引入普通的類別可以幫助我們把普通的類別定義為Bean。 @Import可以加入在@SpringBootApplication(啟動類別)、@Configuration(配置類別)、@Component(組件類別)對應的類別上。注意:@RestController、@Service、@Repository都屬於@Component@SpringBootApplication@Import(ImportBean.class)//透過@Import註解把ImportBean
