国产av日韩一区二区三区精品,成人性爱视频在线观看,国产,欧美,日韩,一区,www.成色av久久成人,2222eeee成人天堂

目錄
依賴
配置文件
普通消息
順序消息
集群/廣播消息模式
集群消息模式
廣播消息模式
事務(wù)消息
首頁 Java java教程 SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

May 18, 2023 am 10:04 AM
rocketmq springboot

環(huán)境:springboot2.3.9RELEASE + RocketMQ4.8.0

依賴

<dependency>???<groupId>org.springframework.boot</groupId>?????<artifactId>spring-boot-starter-web</artifactId>?</dependency>?<dependency>?????<groupId>org.apache.rocketmq</groupId>?????<artifactId>rocketmq-spring-boot-starter</artifactId>?????<version>2.2.0</version>?</dependency>

配置文件

server:???port:?8080?---?rocketmq:???nameServer:?localhost:9876???producer:?????group:?demo-mq

普通消息

發(fā)送

@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?send(String?message)?{???rocketMQTemplate.convertAndSend("test-topic:tag2",?MessageBuilder.withPayload(message).build());?}

接受

@RocketMQMessageListener(topic?=?"test-topic",?consumerGroup?=?"consumer01-group",?selectorExpression?=?"tag1?||?tag2")?@Component?public?class?ConsumerListener?implements?RocketMQListener<String>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("接收到消息:"?+?message)?;?????}??}

順序消息

發(fā)送

@Resource?private?RocketMQTemplate?rocketMQTemplate?;??public?void?sendOrder(String?topic,?String?message,?String?tags,?int?id)?{?????rocketMQTemplate.asyncSendOrderly(topic?+?":"?+?tags,?MessageBuilder.withPayload(message).build(),??????????????"order-"?+?id,?new?SendCallback()?{?????????????????@Override?????????????????public?void?onSuccess(SendResult?sendResult)?{?????????????????????System.err.println("msg-id:?"?+?sendResult.getMsgId()?+?":?"?+?message?+"\tqueueId:?"?+?sendResult.getMessageQueue().getQueueId())?;?????????????????}?????????????????@Override?????????????????public?void?onException(Throwable?e)?{?????????????????????e.printStackTrace()?;?????????????????}?????????????});?}

這里是根據(jù)hashkey將消息發(fā)送到不同的隊(duì)列中

@RocketMQMessageListener(topic?=?"order-topic",?consumerGroup?=?"consumer02-group",??????selectorExpression?=?"tag3?||?tag4",?consumeMode?=?ConsumeMode.ORDERLY)?@Component?public?class?ConsumerOrderListener?implements?RocketMQListener<String>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println(Thread.currentThread().getName()?+?"?接收到Order消息:"?+?message)?;?????}??}

consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個(gè)隊(duì)列,一個(gè)線程。

結(jié)果

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

當(dāng)consumeMode = ConsumeMode.CONCURRENTLY執(zhí)行結(jié)果如下:

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

集群/廣播消息模式

發(fā)送端

@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?send(String?topic,?String?message,?String?tags)?{?????rocketMQTemplate.send(topic?+?":"?+?tags,?MessageBuilder.withPayload(message).build())?;?}

集群消息模式

消費(fèi)端

@RocketMQMessageListener(topic?=?"broad-topic",?consumerGroup?=?"consumer03-group",??????selectorExpression?=?"tag6?||?tag7",?messageModel?=?MessageModel.CLUSTERING)?@Component?public?class?ConsumerBroadListener?implements?RocketMQListener<String>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("ConsumerBroadListener1接收到消息:"?+?message)?;?????}??}

messageModel = MessageModel.CLUSTERING

測試

啟動(dòng)兩個(gè)服務(wù)分別端口是8080,8081

8080服務(wù)

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

8081服務(wù)

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

集群消息模式下,每個(gè)服務(wù)分別接收一部分消息,實(shí)現(xiàn)了負(fù)載均衡

廣播消息模式

消費(fèi)端

@RocketMQMessageListener(topic?=?"broad-topic",?consumerGroup?=?"consumer03-group",??????selectorExpression?=?"tag6?||?tag7",?messageModel?=?MessageModel.BROADCASTING)?@Component?public?class?ConsumerBroadListener?implements?RocketMQListener<String>?{??????@Override?????public?void?onMessage(String?message)?{?????????System.out.println("ConsumerBroadListener1接收到消息:"?+?message)?;?????}??}

messageModel = MessageModel.BROADCASTING

測試

啟動(dòng)兩個(gè)服務(wù)分別端口是8080,8081

8080服務(wù)

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

8081服務(wù)

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

集群消息模式下,每個(gè)服務(wù)分別都接受了同樣的消息。

事務(wù)消息

RocketMQ事務(wù)的3個(gè)狀態(tài)

TransactionStatus.CommitTransaction:提交事務(wù)消息,消費(fèi)者可以消費(fèi)此消息

TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。

TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊(duì)列來確定狀態(tài)。

RocketMQ實(shí)現(xiàn)事務(wù)消息主要分為兩個(gè)階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補(bǔ)償流程 整體流程為:

正常事務(wù)發(fā)送與提交階段

1、生產(chǎn)者發(fā)送一個(gè)半消息給MQServer(半消息是指消費(fèi)者暫時(shí)不能消費(fèi)的消息)

2、服務(wù)端響應(yīng)消息寫入結(jié)果,半消息發(fā)送成功

3、開始執(zhí)行本地事務(wù)

4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作

事務(wù)信息的補(bǔ)償流程

1、如果MQServer長時(shí)間沒收到本地事務(wù)的執(zhí)行狀態(tài)會(huì)向生產(chǎn)者發(fā)起一個(gè)確認(rèn)回查的操作請求

2、生產(chǎn)者收到確認(rèn)回查請求后,檢查本地事務(wù)的執(zhí)行狀態(tài)

3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作

補(bǔ)償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時(shí)發(fā)生超時(shí)或失敗的情況。

發(fā)送端

@Resource?private?RocketMQTemplate?rocketMQTemplate?;??????public?void?sendTx(String?topic,?Long?id,?String?tags)?{?????rocketMQTemplate.sendMessageInTransaction(topic?+?":"?+?tags,?MessageBuilder.withPayload(?????????????new?Users(id,?UUID.randomUUID().toString().replaceAll("-",?""))).?????????????setHeader("BID",?UUID.randomUUID().toString().replaceAll("-",?"")).build(),??????????????UUID.randomUUID().toString().replaceAll("-",?""))?;?}

生產(chǎn)者對應(yīng)的監(jiān)聽器

@RocketMQTransactionListener?public?class?ProducerTxListener?implements?RocketMQLocalTransactionListener?{??????????@Resource?????private?BusinessService?bs?;??????@Override?????public?RocketMQLocalTransactionState?executeLocalTransaction(Message?msg,?Object?arg)?{?????????//?這里執(zhí)行本地的事務(wù)操作,比如保存數(shù)據(jù)。?????????try?{?????????????//?創(chuàng)建一個(gè)日志記錄表,將這唯一的ID存入數(shù)據(jù)庫中,在下面的check方法中可以根據(jù)這個(gè)id查詢是否有數(shù)據(jù)?????????????String?id?=?(String)?msg.getHeaders().get("BID")?;?????????????Users?users?=?new?JsonMapper().readValue((byte[])msg.getPayload(),?Users.class)?;?????????????System.out.println("消息內(nèi)容:"?+?users?+?"\t參與數(shù)據(jù):"?+?arg?+?"\t本次事務(wù)的唯一編號(hào):"?+?id)?;?????????????bs.save(users,?new?UsersLog(users.getId(),?id))?;?????????}?catch?(Exception?e)?{?????????????e.printStackTrace()?;?????????????return?RocketMQLocalTransactionState.ROLLBACK?;?????????}?????????return?RocketMQLocalTransactionState.COMMIT?;?????}??????@Override?????public?RocketMQLocalTransactionState?checkLocalTransaction(Message?msg)?{?????????//?這里檢查本地事務(wù)是否執(zhí)行成功?????????String?id?=?(String)?msg.getHeaders().get("BID")?;?????????System.out.println("執(zhí)行查詢ID為:"?+?id?+?"?的數(shù)據(jù)是否存在")?;?????????UsersLog?usersLog?=?bs.queryUsersLog(id)?;?????????if?(usersLog?==?null)?{?????????????return?RocketMQLocalTransactionState.ROLLBACK?;?????????}?????????return?RocketMQLocalTransactionState.COMMIT?;?????}??}

消費(fèi)端

@RocketMQMessageListener(topic?=?"tx-topic",?consumerGroup?=?"consumer05-group",?selectorExpression?=?"tag10")?@Component?public?class?ConsumerTxListener?implements?RocketMQListener<Users>?{??????@Override?????public?void?onMessage(Users?users)?{?????????System.out.println("TX接收到消息:"?+?users)?;?????}??}

Service

@Transactional?public?boolean?save(Users?users,?UsersLog?usersLog)?{?????usersRepository.save(users)?;?????usersLogRepository.save(usersLog)?;?????if?(users.getId()?==?1)?{?????????throw?new?RuntimeException("數(shù)據(jù)錯(cuò)誤")?;?????}?????return?true?;?}??????public?UsersLog?queryUsersLog(String?bid)?{?????return?usersLogRepository.findByBid(bid)?;?}

Controller

@GetMapping("/tx/{id}")?public?Object?sendTx(@PathVariable("id")Long?id)?{?????ps.sendTx("tx-topic",?id,?"tag10")?;?????return?"send?transaction?success"?;?}

測試

調(diào)用接口后,控制臺(tái)輸出:

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

從打印日志看出來都保存完畢了后 消費(fèi)端才接受到消息。

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

刪除數(shù)據(jù),再測試ID為1會(huì)報(bào)錯(cuò)的。

SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息

數(shù)據(jù)庫中沒有數(shù)據(jù)。。。

是不是也不是很復(fù)雜,2個(gè)階段來處理。

以上是SpringBoot如何整合RocketMQ事務(wù)、廣播以及順序消息的詳細(xì)內(nèi)容。更多信息請關(guān)注PHP中文網(wǎng)其他相關(guān)文章!

本站聲明
本文內(nèi)容由網(wǎng)友自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,本站不承擔(dān)相應(yīng)法律責(zé)任。如您發(fā)現(xiàn)有涉嫌抄襲侵權(quán)的內(nèi)容,請聯(lián)系admin@php.cn

熱AI工具

Undress AI Tool

Undress AI Tool

免費(fèi)脫衣服圖片

Undresser.AI Undress

Undresser.AI Undress

人工智能驅(qū)動(dòng)的應(yīng)用程序,用于創(chuàng)建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用于從照片中去除衣服的在線人工智能工具。

Clothoff.io

Clothoff.io

AI脫衣機(jī)

Video Face Swap

Video Face Swap

使用我們完全免費(fèi)的人工智能換臉工具輕松在任何視頻中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費(fèi)的代碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

功能強(qiáng)大的PHP集成開發(fā)環(huán)境

Dreamweaver CS6

Dreamweaver CS6

視覺化網(wǎng)頁開發(fā)工具

SublimeText3 Mac版

SublimeText3 Mac版

神級(jí)代碼編輯軟件(SublimeText3)

Springboot怎么集成Jasypt實(shí)現(xiàn)配置文件加密 Springboot怎么集成Jasypt實(shí)現(xiàn)配置文件加密 Jun 01, 2023 am 08:55 AM

Jasypt介紹Jasypt是一個(gè)java庫,它允許開發(fā)員以最少的努力為他/她的項(xiàng)目添加基本的加密功能,并且不需要對加密工作原理有深入的了解用于單向和雙向加密的高安全性、基于標(biāo)準(zhǔn)的加密技術(shù)。加密密碼,文本,數(shù)字,二進(jìn)制文件...適合集成到基于Spring的應(yīng)用程序中,開放API,用于任何JCE提供程序...添加如下依賴:com.github.ulisesbocchiojasypt-spring-boot-starter2.1.1Jasypt好處保護(hù)我們的系統(tǒng)安全,即使代碼泄露,也可以保證數(shù)據(jù)源的

怎么在SpringBoot中使用Redis實(shí)現(xiàn)分布式鎖 怎么在SpringBoot中使用Redis實(shí)現(xiàn)分布式鎖 Jun 03, 2023 am 08:16 AM

一、Redis實(shí)現(xiàn)分布式鎖原理為什么需要分布式鎖在聊分布式鎖之前,有必要先解釋一下,為什么需要分布式鎖。與分布式鎖相對就的是單機(jī)鎖,我們在寫多線程程序時(shí),避免同時(shí)操作一個(gè)共享變量產(chǎn)生數(shù)據(jù)問題,通常會(huì)使用一把鎖來互斥以保證共享變量的正確性,其使用范圍是在同一個(gè)進(jìn)程中。如果換做是多個(gè)進(jìn)程,需要同時(shí)操作一個(gè)共享資源,如何互斥呢?現(xiàn)在的業(yè)務(wù)應(yīng)用通常是微服務(wù)架構(gòu),這也意味著一個(gè)應(yīng)用會(huì)部署多個(gè)進(jìn)程,多個(gè)進(jìn)程如果需要修改MySQL中的同一行記錄,為了避免操作亂序?qū)е屡K數(shù)據(jù),此時(shí)就需要引入分布式鎖了。想要實(shí)現(xiàn)分

SpringBoot怎么自定義Redis實(shí)現(xiàn)緩存序列化 SpringBoot怎么自定義Redis實(shí)現(xiàn)緩存序列化 Jun 03, 2023 am 11:32 AM

1、自定義RedisTemplate1.1、RedisAPI默認(rèn)序列化機(jī)制基于API的Redis緩存實(shí)現(xiàn)是使用RedisTemplate模板進(jìn)行數(shù)據(jù)緩存操作的,這里打開RedisTemplate類,查看該類的源碼信息publicclassRedisTemplateextendsRedisAccessorimplementsRedisOperations,BeanClassLoaderAware{//聲明了key、value的各種序列化方式,初始值為空@NullableprivateRedisSe

springboot讀取文件打成jar包后訪問不到怎么解決 springboot讀取文件打成jar包后訪問不到怎么解決 Jun 03, 2023 pm 04:38 PM

springboot讀取文件,打成jar包后訪問不到最新開發(fā)出現(xiàn)一種情況,springboot打成jar包后讀取不到文件,原因是打包之后,文件的虛擬路徑是無效的,只能通過流去讀取。文件在resources下publicvoidtest(){Listnames=newArrayList();InputStreamReaderread=null;try{ClassPathResourceresource=newClassPathResource("name.txt");Input

SpringBoot怎么集成Redisson實(shí)現(xiàn)延遲隊(duì)列 SpringBoot怎么集成Redisson實(shí)現(xiàn)延遲隊(duì)列 May 30, 2023 pm 02:40 PM

使用場景1、下單成功,30分鐘未支付。支付超時(shí),自動(dòng)取消訂單2、訂單簽收,簽收后7天未進(jìn)行評(píng)價(jià)。訂單超時(shí)未評(píng)價(jià),系統(tǒng)默認(rèn)好評(píng)3、下單成功,商家5分鐘未接單,訂單取消4、配送超時(shí),推送短信提醒……對于延時(shí)比較長的場景、實(shí)時(shí)性不高的場景,我們可以采用任務(wù)調(diào)度的方式定時(shí)輪詢處理。如:xxl-job今天我們采

Springboot+Mybatis-plus不使用SQL語句進(jìn)行多表添加怎么實(shí)現(xiàn) Springboot+Mybatis-plus不使用SQL語句進(jìn)行多表添加怎么實(shí)現(xiàn) Jun 02, 2023 am 11:07 AM

在Springboot+Mybatis-plus不使用SQL語句進(jìn)行多表添加操作我所遇到的問題準(zhǔn)備工作在測試環(huán)境下模擬思維分解一下:創(chuàng)建出一個(gè)帶有參數(shù)的BrandDTO對象模擬對后臺(tái)傳遞參數(shù)我所遇到的問題我們都知道,在我們使用Mybatis-plus中進(jìn)行多表操作是極其困難的,如果你不使用Mybatis-plus-join這一類的工具,你只能去配置對應(yīng)的Mapper.xml文件,配置又臭又長的ResultMap,然后再去寫對應(yīng)的sql語句,這種方法雖然看上去很麻煩,但具有很高的靈活性,可以讓我們

SpringBoot與SpringMVC的比較及差別分析 SpringBoot與SpringMVC的比較及差別分析 Dec 29, 2023 am 11:02 AM

SpringBoot和SpringMVC都是Java開發(fā)中常用的框架,但它們之間有一些明顯的差異。本文將探究這兩個(gè)框架的特點(diǎn)和用途,并對它們的差異進(jìn)行比較。首先,我們來了解一下SpringBoot。SpringBoot是由Pivotal團(tuán)隊(duì)開發(fā)的,它旨在簡化基于Spring框架的應(yīng)用程序的創(chuàng)建和部署。它提供了一種快速、輕量級(jí)的方式來構(gòu)建獨(dú)立的、可執(zhí)行

SpringBoot中的@Import注解怎么使用 SpringBoot中的@Import注解怎么使用 May 31, 2023 pm 06:25 PM

一、@Import引入普通類@Import引入普通的類可以幫助我們把普通的類定義為Bean。@Import可以添加在@SpringBootApplication(啟動(dòng)類)、@Configuration(配置類)、@Component(組件類)對應(yīng)的類上。注意:@RestController、@Service、@Repository都屬于@Component@SpringBootApplication@Import(ImportBean.class)//通過@Import注解把ImportBean

See all articles