SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息
May 18, 2023 am 10:04 AM環(huán)境:springboot2.3.9RELEASE + RocketMQ4.8.0
依賴
<dependency>???<groupId>org.springframework.boot</groupId>?????<artifactId>spring-boot-starter-web</artifactId>?</dependency>?<dependency>?????<groupId>org.apache.rocketmq</groupId>?????<artifactId>rocketmq-spring-boot-starter</artifactId>?????<version>2.2.0</version>?</dependency>
配置文件
server:???port:?8080?---?rocketmq:???nameServer:?localhost:9876???producer:?????group:?demo-mq
普通消息
發(fā)送
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?send(String?message)?{???rocketMQTemplate.convertAndSend("test-topic:tag2",?MessageBuilder.withPayload(message).build());?}
接受
@RocketMQMessageListener(topic?=?"test-topic",?consumerGroup?=?"consumer01-group",?selectorExpression?=?"tag1?||?tag2")?@Component?public?class?ConsumerListener?implements?RocketMQListener<String>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("接收到消息:"?+?message)?;?????}??}
順序消息
發(fā)送
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??public?void?sendOrder(String?topic,?String?message,?String?tags,?int?id)?{?????rocketMQTemplate.asyncSendOrderly(topic?+?":"?+?tags,?MessageBuilder.withPayload(message).build(),??????????????"order-"?+?id,?new?SendCallback()?{?????????????????@Override?????????????????public?void?onSuccess(SendResult?sendResult)?{?????????????????????System.err.println("msg-id:?"?+?sendResult.getMsgId()?+?":?"?+?message?+"\tqueueId:?"?+?sendResult.getMessageQueue().getQueueId())?;?????????????????}?????????????????@Override?????????????????public?void?onException(Throwable?e)?{?????????????????????e.printStackTrace()?;?????????????????}?????????????});?}
這里是根據(jù)hashkey將消息發(fā)送到不同的隊(duì)列中
@RocketMQMessageListener(topic?=?"order-topic",?consumerGroup?=?"consumer02-group",??????selectorExpression?=?"tag3?||?tag4",?consumeMode?=?ConsumeMode.ORDERLY)?@Component?public?class?ConsumerOrderListener?implements?RocketMQListener<String>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println(Thread.currentThread().getName()?+?"?接收到Order消息:"?+?message)?;?????}??}
consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個(gè)隊(duì)列,一個(gè)線程。
結(jié)果
當(dāng)consumeMode = ConsumeMode.CONCURRENTLY執(zhí)行結(jié)果如下:
集群/廣播消息模式
發(fā)送端
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?send(String?topic,?String?message,?String?tags)?{?????rocketMQTemplate.send(topic?+?":"?+?tags,?MessageBuilder.withPayload(message).build())?;?}
集群消息模式
消費(fèi)端
@RocketMQMessageListener(topic?=?"broad-topic",?consumerGroup?=?"consumer03-group",??????selectorExpression?=?"tag6?||?tag7",?messageModel?=?MessageModel.CLUSTERING)?@Component?public?class?ConsumerBroadListener?implements?RocketMQListener<String>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("ConsumerBroadListener1接收到消息:"?+?message)?;?????}??}
messageModel = MessageModel.CLUSTERING
測試
啟動(dòng)兩個(gè)服務(wù)分別端口是8080,8081
8080服務(wù)
8081服務(wù)
集群消息模式下,每個(gè)服務(wù)分別接收一部分消息,實(shí)現(xiàn)了負(fù)載均衡
廣播消息模式
消費(fèi)端
@RocketMQMessageListener(topic?=?"broad-topic",?consumerGroup?=?"consumer03-group",??????selectorExpression?=?"tag6?||?tag7",?messageModel?=?MessageModel.BROADCASTING)?@Component?public?class?ConsumerBroadListener?implements?RocketMQListener<String>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("ConsumerBroadListener1接收到消息:"?+?message)?;?????}??}
messageModel = MessageModel.BROADCASTING
測試
啟動(dòng)兩個(gè)服務(wù)分別端口是8080,8081
8080服務(wù)
8081服務(wù)
集群消息模式下,每個(gè)服務(wù)分別都接受了同樣的消息。
事務(wù)消息
RocketMQ事務(wù)的3個(gè)狀態(tài)
TransactionStatus.CommitTransaction:提交事務(wù)消息,消費(fèi)者可以消費(fèi)此消息
TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。
TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊(duì)列來確定狀態(tài)。
RocketMQ實(shí)現(xiàn)事務(wù)消息主要分為兩個(gè)階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補(bǔ)償流程 整體流程為:
正常事務(wù)發(fā)送與提交階段
1、生產(chǎn)者發(fā)送一個(gè)半消息給MQServer(半消息是指消費(fèi)者暫時(shí)不能消費(fèi)的消息)
2、服務(wù)端響應(yīng)消息寫入結(jié)果,半消息發(fā)送成功
3、開始執(zhí)行本地事務(wù)
4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作
事務(wù)信息的補(bǔ)償流程
1、如果MQServer長時(shí)間沒收到本地事務(wù)的執(zhí)行狀態(tài)會(huì)向生產(chǎn)者發(fā)起一個(gè)確認(rèn)回查的操作請求
2、生產(chǎn)者收到確認(rèn)回查請求后,檢查本地事務(wù)的執(zhí)行狀態(tài)
3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作
補(bǔ)償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時(shí)發(fā)生超時(shí)或失敗的情況。
發(fā)送端
@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?sendTx(String?topic,?Long?id,?String?tags)?{?????rocketMQTemplate.sendMessageInTransaction(topic?+?":"?+?tags,?MessageBuilder.withPayload(?????????????new?Users(id,?UUID.randomUUID().toString().replaceAll("-",?""))).?????????????setHeader("BID",?UUID.randomUUID().toString().replaceAll("-",?"")).build(),??????????????UUID.randomUUID().toString().replaceAll("-",?""))?;?}
生產(chǎn)者對應(yīng)的監(jiān)聽器
@RocketMQTransactionListener?public?class?ProducerTxListener?implements?RocketMQLocalTransactionListener?{??????????@Resource?????private?BusinessService?bs?;??????@Override?????public?RocketMQLocalTransactionState?executeLocalTransaction(Message?msg,?Object?arg)?{?????????//?這里執(zhí)行本地的事務(wù)操作,比如保存數(shù)據(jù)。?????????try?{?????????????//?創(chuàng)建一個(gè)日志記錄表,將這唯一的ID存入數(shù)據(jù)庫中,在下面的check方法中可以根據(jù)這個(gè)id查詢是否有數(shù)據(jù)?????????????String?id?=?(String)?msg.getHeaders().get("BID")?;?????????????Users?users?=?new?JsonMapper().readValue((byte[])msg.getPayload(),?Users.class)?;?????????????System.out.println("消息內(nèi)容:"?+?users?+?"\t參與數(shù)據(jù):"?+?arg?+?"\t本次事務(wù)的唯一編號(hào):"?+?id)?;?????????????bs.save(users,?new?UsersLog(users.getId(),?id))?;?????????}?catch?(Exception?e)?{?????????????e.printStackTrace()?;?????????????return?RocketMQLocalTransactionState.ROLLBACK?;?????????}?????????return?RocketMQLocalTransactionState.COMMIT?;?????}??????@Override?????public?RocketMQLocalTransactionState?checkLocalTransaction(Message?msg)?{?????????//?這里檢查本地事務(wù)是否執(zhí)行成功?????????String?id?=?(String)?msg.getHeaders().get("BID")?;?????????System.out.println("執(zhí)行查詢ID為:"?+?id?+?"?的數(shù)據(jù)是否存在")?;?????????UsersLog?usersLog?=?bs.queryUsersLog(id)?;?????????if?(usersLog?==?null)?{?????????????return?RocketMQLocalTransactionState.ROLLBACK?;?????????}?????????return?RocketMQLocalTransactionState.COMMIT?;?????}??}
消費(fèi)端
@RocketMQMessageListener(topic?=?"tx-topic",?consumerGroup?=?"consumer05-group",?selectorExpression?=?"tag10")?@Component?public?class?ConsumerTxListener?implements?RocketMQListener<Users>?{??????@Override?????public?void?onMessage(Users?users)?{?????????System.out.println("TX接收到消息:"?+?users)?;?????}??}
Service
@Transactional?public?boolean?save(Users?users,?UsersLog?usersLog)?{?????usersRepository.save(users)?;?????usersLogRepository.save(usersLog)?;?????if?(users.getId()?==?1)?{?????????throw?new?RuntimeException("數(shù)據(jù)錯(cuò)誤")?;?????}?????return?true?;?}??????public?UsersLog?queryUsersLog(String?bid)?{?????return?usersLogRepository.findByBid(bid)?;?}
Controller
@GetMapping("/tx/{id}")?public?Object?sendTx(@PathVariable("id")Long?id)?{?????ps.sendTx("tx-topic",?id,?"tag10")?;?????return?"send?transaction?success"?;?}
測試
調(diào)用接口后,控制臺(tái)輸出:
從打印日志看出來都保存完畢了后 消費(fèi)端才接受到消息。
刪除數(shù)據(jù),再測試ID為1會(huì)報(bào)錯(cuò)的。
數(shù)據(jù)庫中沒有數(shù)據(jù)。。。
是不是也不是很復(fù)雜,2個(gè)階段來處理。
以上是SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息的詳細(xì)內(nèi)容。更多信息請關(guān)注PHP中文網(wǎng)其他相關(guān)文章!

熱AI工具

Undress AI Tool
免費(fèi)脫衣服圖片

Undresser.AI Undress
人工智能驅(qū)動(dòng)的應(yīng)用程序,用于創(chuàng)建逼真的裸體照片

AI Clothes Remover
用于從照片中去除衣服的在線人工智能工具。

Clothoff.io
AI脫衣機(jī)

Video Face Swap
使用我們完全免費(fèi)的人工智能換臉工具輕松在任何視頻中換臉!

熱門文章

熱工具

記事本++7.3.1
好用且免費(fèi)的代碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
功能強(qiáng)大的PHP集成開發(fā)環(huán)境

Dreamweaver CS6
視覺化網(wǎng)頁開發(fā)工具

SublimeText3 Mac版
神級(jí)代碼編輯軟件(SublimeText3)

Jasypt介紹Jasypt是一個(gè)java庫,它允許開發(fā)員以最少的努力為他/她的項(xiàng)目添加基本的加密功能,并且不需要對加密工作原理有深入的了解用于單向和雙向加密的高安全性、基于標(biāo)準(zhǔn)的加密技術(shù)。加密密碼,文本,數(shù)字,二進(jìn)制文件...適合集成到基于Spring的應(yīng)用程序中,開放API,用于任何JCE提供程序...添加如下依賴:com.github.ulisesbocchiojasypt-spring-boot-starter2.1.1Jasypt好處保護(hù)我們的系統(tǒng)安全,即使代碼泄露,也可以保證數(shù)據(jù)源的

一、Redis實(shí)現(xiàn)分布式鎖原理為什么需要分布式鎖在聊分布式鎖之前,有必要先解釋一下,為什么需要分布式鎖。與分布式鎖相對就的是單機(jī)鎖,我們在寫多線程程序時(shí),避免同時(shí)操作一個(gè)共享變量產(chǎn)生數(shù)據(jù)問題,通常會(huì)使用一把鎖來互斥以保證共享變量的正確性,其使用范圍是在同一個(gè)進(jìn)程中。如果換做是多個(gè)進(jìn)程,需要同時(shí)操作一個(gè)共享資源,如何互斥呢?現(xiàn)在的業(yè)務(wù)應(yīng)用通常是微服務(wù)架構(gòu),這也意味著一個(gè)應(yīng)用會(huì)部署多個(gè)進(jìn)程,多個(gè)進(jìn)程如果需要修改MySQL中的同一行記錄,為了避免操作亂序?qū)е屡K數(shù)據(jù),此時(shí)就需要引入分布式鎖了。想要實(shí)現(xiàn)分

1、自定義RedisTemplate1.1、RedisAPI默認(rèn)序列化機(jī)制基于API的Redis緩存實(shí)現(xiàn)是使用RedisTemplate模板進(jìn)行數(shù)據(jù)緩存操作的,這里打開RedisTemplate類,查看該類的源碼信息publicclassRedisTemplateextendsRedisAccessorimplementsRedisOperations,BeanClassLoaderAware{//聲明了key、value的各種序列化方式,初始值為空@NullableprivateRedisSe

springboot讀取文件,打成jar包后訪問不到最新開發(fā)出現(xiàn)一種情況,springboot打成jar包后讀取不到文件,原因是打包之后,文件的虛擬路徑是無效的,只能通過流去讀取。文件在resources下publicvoidtest(){Listnames=newArrayList();InputStreamReaderread=null;try{ClassPathResourceresource=newClassPathResource("name.txt");Input

使用場景1、下單成功,30分鐘未支付。支付超時(shí),自動(dòng)取消訂單2、訂單簽收,簽收后7天未進(jìn)行評(píng)價(jià)。訂單超時(shí)未評(píng)價(jià),系統(tǒng)默認(rèn)好評(píng)3、下單成功,商家5分鐘未接單,訂單取消4、配送超時(shí),推送短信提醒……對于延時(shí)比較長的場景、實(shí)時(shí)性不高的場景,我們可以采用任務(wù)調(diào)度的方式定時(shí)輪詢處理。如:xxl-job今天我們采

在Springboot+Mybatis-plus不使用SQL語句進(jìn)行多表添加操作我所遇到的問題準(zhǔn)備工作在測試環(huán)境下模擬思維分解一下:創(chuàng)建出一個(gè)帶有參數(shù)的BrandDTO對象模擬對后臺(tái)傳遞參數(shù)我所遇到的問題我們都知道,在我們使用Mybatis-plus中進(jìn)行多表操作是極其困難的,如果你不使用Mybatis-plus-join這一類的工具,你只能去配置對應(yīng)的Mapper.xml文件,配置又臭又長的ResultMap,然后再去寫對應(yīng)的sql語句,這種方法雖然看上去很麻煩,但具有很高的靈活性,可以讓我們

SpringBoot和SpringMVC都是Java開發(fā)中常用的框架,但它們之間有一些明顯的差異。本文將探究這兩個(gè)框架的特點(diǎn)和用途,并對它們的差異進(jìn)行比較。首先,我們來了解一下SpringBoot。SpringBoot是由Pivotal團(tuán)隊(duì)開發(fā)的,它旨在簡化基于Spring框架的應(yīng)用程序的創(chuàng)建和部署。它提供了一種快速、輕量級(jí)的方式來構(gòu)建獨(dú)立的、可執(zhí)行

一、@Import引入普通類@Import引入普通的類可以幫助我們把普通的類定義為Bean。@Import可以添加在@SpringBootApplication(啟動(dòng)類)、@Configuration(配置類)、@Component(組件類)對應(yīng)的類上。注意:@RestController、@Service、@Repository都屬于@Component@SpringBootApplication@Import(ImportBean.class)//通過@Import注解把ImportBean
