日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列

瀏覽:149日期:2023-09-20 15:07:59

顧名思義,延遲隊(duì)列就是進(jìn)入該隊(duì)列的消息會(huì)被延遲消費(fèi)的隊(duì)列。而一般的隊(duì)列,消息一旦入隊(duì)了之后就會(huì)被消費(fèi)者馬上消費(fèi)。

延遲隊(duì)列能做什么?

延遲隊(duì)列多用于需要延遲工作的場(chǎng)景。最常見(jiàn)的是以下兩種場(chǎng)景:

延遲消費(fèi)。比如: 用戶生成訂單之后,需要過(guò)一段時(shí)間校驗(yàn)訂單的支付狀態(tài),如果訂單仍未支付則需要及時(shí)地關(guān)閉訂單。 用戶注冊(cè)成功之后,需要過(guò)一段時(shí)間比如一周后校驗(yàn)用戶的使用情況,如果發(fā)現(xiàn)用戶活躍度較低,則發(fā)送郵件或者短信來(lái)提醒用戶使用。 延遲重試。比如消費(fèi)者從隊(duì)列里消費(fèi)消息時(shí)失敗了,但是想要延遲一段時(shí)間后自動(dòng)重試。

如果不使用延遲隊(duì)列,那么我們只能通過(guò)一個(gè)輪詢掃描程序去完成。這種方案既不優(yōu)雅,也不方便做成統(tǒng)一的服務(wù)便于開(kāi)發(fā)人員使用。但是使用延遲隊(duì)列的話,我們就可以輕而易舉地完成。

如何實(shí)現(xiàn)?

別急,在下文中,我們將詳細(xì)介紹如何利用 Spring BootRabbitMQ 來(lái)實(shí)現(xiàn)延遲隊(duì)列。

本文出現(xiàn)的示例代碼都已push到Github倉(cāng)庫(kù)中: https://github.com/Lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue

實(shí)現(xiàn)思路

在介紹具體的實(shí)現(xiàn)思路之前,我們先來(lái)介紹一下RabbitMQ的兩個(gè)特性,一個(gè)是Time-To-Live Extensions,另一個(gè)是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允許我們?yōu)橄⒒蛘哧?duì)列設(shè)置TTL(time to live),也就是過(guò)期時(shí)間。TTL表明了一條消息可在隊(duì)列中存活的最大時(shí)間,單位為毫秒。也就是說(shuō),當(dāng)某條消息被設(shè)置了TTL或者當(dāng)某條消息進(jìn)入了設(shè)置了TTL的隊(duì)列時(shí),這條消息會(huì)在經(jīng)過(guò)TTL秒后“死亡”,成為Dead Letter。如果既配置了消息的TTL,又配置了隊(duì)列的TTL,那么較小的那個(gè)值會(huì)被取用。更多資料請(qǐng)查閱 官方文檔 。

Dead Letter Exchange

剛才提到了,被設(shè)置了TTL的消息在過(guò)期后會(huì)成為Dead Letter。其實(shí)在RabbitMQ中,一共有三種消息的“死亡”形式:

消息被拒絕。通過(guò)調(diào)用basic.reject或者basic.nack并且設(shè)置的requeue參數(shù)為false。 消息因?yàn)樵O(shè)置了TTL而過(guò)期。 消息進(jìn)入了一條已經(jīng)達(dá)到最大長(zhǎng)度的隊(duì)列。

如果隊(duì)列設(shè)置了Dead Letter Exchange(DLX),那么這些Dead Letter就會(huì)被重新publish到Dead Letter Exchange,通過(guò)Dead Letter Exchange路由到其他隊(duì)列。更多資料請(qǐng)查閱 官方文檔 。

流程圖

聰明的你肯定已經(jīng)想到了,如何將RabbitMQ的TTL和DLX特性結(jié)合在一起,實(shí)現(xiàn)一個(gè)延遲隊(duì)列。

針對(duì)于上述的延遲隊(duì)列的兩個(gè)場(chǎng)景,我們分別有以下兩種流程圖:

延遲消費(fèi)

延遲消費(fèi)是延遲隊(duì)列最為常用的使用模式。如下圖所示,生產(chǎn)者產(chǎn)生的消息首先會(huì)進(jìn)入緩沖隊(duì)列(圖中紅色隊(duì)列)。通過(guò)RabbitMQ提供的TTL擴(kuò)展,這些消息會(huì)被設(shè)置過(guò)期時(shí)間,也就是延遲消費(fèi)的時(shí)間。等消息過(guò)期之后,這些消息會(huì)通過(guò)配置好的DLX轉(zhuǎn)發(fā)到實(shí)際消費(fèi)隊(duì)列(圖中藍(lán)色隊(duì)列),以此達(dá)到延遲消費(fèi)的效果。

Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列

延遲重試

延遲重試本質(zhì)上也是延遲消費(fèi)的一種,但是這種模式的結(jié)構(gòu)與普通的延遲消費(fèi)的流程圖較為不同,所以單獨(dú)拎出來(lái)介紹。

如下圖所示,消費(fèi)者發(fā)現(xiàn)該消息處理出現(xiàn)了異常,比如是因?yàn)榫W(wǎng)絡(luò)波動(dòng)引起的異常。那么如果不等待一段時(shí)間,直接就重試的話,很可能會(huì)導(dǎo)致在這期間內(nèi)一直無(wú)法成功,造成一定的資源浪費(fèi)。那么我們可以將其先放在緩沖隊(duì)列中(圖中紅色隊(duì)列),等消息經(jīng)過(guò)一段的延遲時(shí)間后再次進(jìn)入實(shí)際消費(fèi)隊(duì)列中(圖中藍(lán)色隊(duì)列),此時(shí)由于已經(jīng)過(guò)了“較長(zhǎng)”的時(shí)間了,異常的一些波動(dòng)通常已經(jīng)恢復(fù),這些消息可以被正常地消費(fèi)。

Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列

代碼實(shí)現(xiàn)

接下來(lái)我們將介紹如何在Spring Boot中實(shí)現(xiàn)基于RabbitMQ的延遲隊(duì)列。我們假設(shè)讀者已經(jīng)擁有了Spring Boot與RabbitMQ的基本知識(shí)。如果想快速了解Spring Boot的相關(guān)基礎(chǔ)知識(shí),可以參考我之前寫(xiě)的一篇文章。

初始化工程

首先我們?cè)贗ntellij中創(chuàng)建一個(gè)Spring Boot工程,并且添加 spring-boot-starter-amqp 擴(kuò)展。

配置隊(duì)列

從上述的流程圖中我們可以看到,一個(gè)延遲隊(duì)列的實(shí)現(xiàn),需要一個(gè)緩沖隊(duì)列以及一個(gè)實(shí)際的消費(fèi)隊(duì)列。又由于在RabbitMQ中,我們擁有兩種消息過(guò)期的配置方式,所以在代碼中,我們一共配置了三條隊(duì)列:

delay_queue_per_message_ttl:TTL配置在消息上的緩沖隊(duì)列。 delay_queue_per_queue_ttl:TTL配置在隊(duì)列上的緩沖隊(duì)列。 delay_process_queue:實(shí)際消費(fèi)隊(duì)列。

我們通過(guò)Java Config的方式將上述的隊(duì)列配置為Bean。由于我們添加了 spring-boot-starter-amqp 擴(kuò)展,Spring Boot在啟動(dòng)時(shí)會(huì)根據(jù)我們的配置自動(dòng)創(chuàng)建這些隊(duì)列。為了方便接下來(lái)的測(cè)試,我們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置為同一個(gè),且過(guò)期的消息都會(huì)通過(guò)DLX轉(zhuǎn)發(fā)到delay_process_queue。

delay_queue_per_message_ttl

首先介紹delay_queue_per_message_ttl的配置代碼:

@BeanQueuedelayQueuePerMessageTTL(){ return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME) .withArgument('x-dead-letter-exchange', DELAY_EXCHANGE_NAME) // DLX,dead letter發(fā)送到的exchange .withArgument('x-dead-letter-routing-key', DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key .build();}

其中, x-dead-letter-exchange 聲明了隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱(chēng), x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時(shí)攜帶的routing-key名稱(chēng)。

delay_queue_per_queue_ttl

類(lèi)似地,delay_queue_per_queue_ttl的配置代碼:

@BeanQueuedelayQueuePerQueueTTL(){ return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME) .withArgument('x-dead-letter-exchange', DELAY_EXCHANGE_NAME) // DLX .withArgument('x-dead-letter-routing-key', DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key .withArgument('x-message-ttl', QUEUE_EXPIRATION) // 設(shè)置隊(duì)列的過(guò)期時(shí)間 .build();}

delay_queue_per_queue_ttl隊(duì)列的配置比delay_queue_per_message_ttl隊(duì)列的配置多了一個(gè) x-message-ttl ,該配置用來(lái)設(shè)置隊(duì)列的過(guò)期時(shí)間。

delay_process_queue

delay_process_queue的配置最為簡(jiǎn)單:

@BeanQueuedelayProcessQueue(){ return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME) .build();} 配置Exchange

配置DLX

首先,我們需要配置DLX,代碼如下:

@BeanDirectExchangedelayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME);}

然后再將該DLX綁定到實(shí)際消費(fèi)隊(duì)列即delay_process_queue上。這樣所有的死信都會(huì)通過(guò)DLX被轉(zhuǎn)發(fā)到delay_process_queue:

@BeanBindingdlxBinding(Queue delayProcessQueue, DirectExchange delayExchange){ return BindingBuilder.bind(delayProcessQueue) .to(delayExchange) .with(DELAY_PROCESS_QUEUE_NAME);}

配置延遲重試所需的Exchange

從延遲重試的流程圖中我們可以看到,消息處理失敗之后,我們需要將消息轉(zhuǎn)發(fā)到緩沖隊(duì)列,所以緩沖隊(duì)列也需要綁定一個(gè)Exchange。 在本例中,我們將delay_process_per_queue_ttl作為延遲重試?yán)锏木彌_隊(duì)列 。具體代碼是如何配置的,這里就不贅述了,大家可以查閱我 Github 中的代碼。

定義消費(fèi)者

我們創(chuàng)建一個(gè)最簡(jiǎn)單的消費(fèi)者ProcessReceiver,這個(gè)消費(fèi)者監(jiān)聽(tīng)delay_process_queue隊(duì)列,對(duì)于接受到的消息,他會(huì):

如果消息里的消息體不等于FAIL_MESSAGE,那么他會(huì)輸出消息體。 如果消息里的消息體恰好是FAIL_MESSAGE,那么他會(huì)模擬拋出異常,然后將該消息重定向到緩沖隊(duì)列(對(duì)應(yīng)延遲重試場(chǎng)景)。

另外,我們還需要新建一個(gè)監(jiān)聽(tīng)容器用于存放消費(fèi)者,代碼如下:

@BeanSimpleMessageListenerContainerprocessContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 監(jiān)聽(tīng)delay_process_queue container.setMessageListener(new MessageListenerAdapter(processReceiver)); return container;}

至此,我們前置的配置代碼已經(jīng)全部編寫(xiě)完成,接下來(lái)我們需要編寫(xiě)測(cè)試用例來(lái)測(cè)試我們的延遲隊(duì)列。

編寫(xiě)測(cè)試用例

延遲消費(fèi)場(chǎng)景

首先我們編寫(xiě)用于測(cè)試TTL設(shè)置在消息上的測(cè)試代碼。

我們借助 spring-rabbit 包下提供的RabbitTemplate類(lèi)來(lái)發(fā)送消息。由于我們添加了 spring-boot-starter-amqp 擴(kuò)展,Spring Boot會(huì)在初始化時(shí)自動(dòng)地將RabbitTemplate當(dāng)成bean加載到容器中。

解決了消息的發(fā)送問(wèn)題,那么又該如何為每個(gè)消息設(shè)置TTL呢?這里我們需要借助MessagePostProcessor。MessagePostProcessor通常用來(lái)設(shè)置消息的Header以及消息的屬性。我們新建一個(gè)ExpirationMessagePostProcessor類(lèi)來(lái)負(fù)責(zé)設(shè)置消息的TTL屬性:

/** * 設(shè)置消息的失效時(shí)間 */public class ExpirationMessagePostProcessorimplements MessagePostProcessor{ private final Long ttl; // 毫秒 public ExpirationMessagePostProcessor(Long ttl){this.ttl = ttl; } @Override public Message postProcessMessage(Message message)throws AmqpException {message.getMessageProperties() .setExpiration(ttl.toString()); // 設(shè)置per-message的失效時(shí)間return message; }}

然后在調(diào)用RabbitTemplate的convertAndSend方法時(shí),傳入ExpirationMessagePostPorcessor即可。我們向緩沖隊(duì)列中發(fā)送3條消息,過(guò)期時(shí)間依次為1秒,2秒和3秒。具體的代碼如下所示:

@Testpublic void testDelayQueuePerMessageTTL()throws InterruptedException { ProcessReceiver.latch = new CountDownLatch(3); for (int i = 1; i <= 3; i++) {long expiration = i * 1000;rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,(Object) ('Message From delay_queue_per_message_ttl with expiration ' + expiration), new ExpirationMessagePostProcessor(expiration)); } ProcessReceiver.latch.await();}

細(xì)心的朋友一定會(huì)問(wèn),為什么要在代碼中加一個(gè)CountDownLatch呢?這是因?yàn)槿绻麤](méi)有l(wèi)atch阻塞住測(cè)試方法的話,測(cè)試用例會(huì)直接結(jié)束,程序退出,我們就看不到消息被延遲消費(fèi)的表現(xiàn)了。

那么類(lèi)似地,測(cè)試TTL設(shè)置在隊(duì)列上的代碼如下:

@Testpublic void testDelayQueuePerQueueTTL()throws InterruptedException { ProcessReceiver.latch = new CountDownLatch(3); for (int i = 1; i <= 3; i++) {rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,'Message From delay_queue_per_queue_ttl with expiration ' + QueueConfig.QUEUE_EXPIRATION); } ProcessReceiver.latch.await();}

我們向緩沖隊(duì)列中發(fā)送3條消息。理論上這3條消息會(huì)在4秒后同時(shí)過(guò)期。

延遲重試場(chǎng)景

我們同樣還需測(cè)試延遲重試場(chǎng)景。

@Testpublic void testFailMessage()throws InterruptedException { ProcessReceiver.latch = new CountDownLatch(6); for (int i = 1; i <= 3; i++) {rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE); } ProcessReceiver.latch.await();}

我們向delay_process_queue發(fā)送3條會(huì)觸發(fā)FAIL的消息,理論上這3條消息會(huì)在4秒后自動(dòng)重試。

查看測(cè)試結(jié)果

延遲消費(fèi)場(chǎng)景

延遲消費(fèi)的場(chǎng)景測(cè)試我們分為了TTL設(shè)置在消息上和TTL設(shè)置在隊(duì)列上兩種。首先,我們先看一下TTL設(shè)置在消息上的測(cè)試結(jié)果:

Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列

從上圖中我們可以看到,ProcessReceiver分別經(jīng)過(guò)1秒、2秒、3秒收到消息。測(cè)試結(jié)果表明消息不僅被延遲消費(fèi)了,而且每條消息的延遲時(shí)間是可以被個(gè)性化設(shè)置的。TTL設(shè)置在消息上的延遲消費(fèi)場(chǎng)景測(cè)試成功。

然后,TTL設(shè)置在隊(duì)列上的測(cè)試結(jié)果如下圖:

Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列

從上圖中我們可以看到,ProcessReceiver經(jīng)過(guò)了4秒的延遲之后,同時(shí)收到了3條消息。測(cè)試結(jié)果表明消息不僅被延遲消費(fèi)了,同時(shí)也證明了當(dāng)TTL設(shè)置在隊(duì)列上的時(shí)候,消息的過(guò)期時(shí)間是固定的。TTL設(shè)置在隊(duì)列上的延遲消費(fèi)場(chǎng)景測(cè)試成功。

延遲重試場(chǎng)景

接下來(lái),我們?cè)賮?lái)看一下延遲重試的測(cè)試結(jié)果:

Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列

ProcessReceiver首先收到了3條會(huì)觸發(fā)FAIL的消息,然后將其移動(dòng)到緩沖隊(duì)列之后,過(guò)了4秒,又收到了剛才的那3條消息。延遲重試場(chǎng)景測(cè)試成功。

總結(jié)

本文首先介紹了延遲隊(duì)列的概念以及用途,并且通過(guò)代碼詳細(xì)講解了如何通過(guò)Spring Boot和RabbitMQ實(shí)現(xiàn)一個(gè)延遲隊(duì)列。希望本文能夠?qū)Υ蠹移綍r(shí)的學(xué)習(xí)和工作能有所啟發(fā)和幫助。

來(lái)自:http://www.kissyu.org/2017/11/18/Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列/

標(biāo)簽: Spring
相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
999国产精品永久免费视频app| 欧美成人综合| 亚洲精品激情| 蜜桃久久久久久久| 欧美日韩高清| 中国女人久久久| 美女久久一区| 亚洲精品少妇| 国产精品尤物| 久久精品国产成人一区二区三区| 国产精品嫩模av在线| 美女高潮久久久| 福利一区二区免费视频 | 国产一区清纯| 在线视频亚洲| 日本aⅴ亚洲精品中文乱码| 久久精品超碰| 韩日一区二区| 日韩中文字幕高清在线观看| 久久久9色精品国产一区二区三区| 亚洲一本视频| 男女性色大片免费观看一区二区| 亚洲尤物av| 国产日韩一区二区三区在线| 久久久精品区| 久久久久久久久99精品大| 免费欧美日韩| 国产精品亚洲综合久久| 激情综合婷婷| 蜜臀久久99精品久久一区二区| 国产精品嫩草99av在线| 日韩av午夜在线观看| 国产激情欧美| 日韩免费视频| 久久国产66| 国产精品免费99久久久| 99久久久久| 婷婷亚洲精品| 国产一区2区| 亚洲欧美日本日韩| 久久精品一区二区三区中文字幕| 不卡视频在线| 91精品日本| 免费一二一二在线视频| 免费在线视频一区| 精品国产欧美| 免播放器亚洲| 国产成人精品一区二区三区在线| 亚洲成人三区| 国产高清亚洲| 日韩一级网站| 精品一区二区三区亚洲| 亚洲激情欧美| 久久精品系列| 男人的天堂亚洲一区| 国产精品久久观看| 蜜臀a∨国产成人精品| 麻豆精品在线观看| 在线综合亚洲| 91视频精品| 视频国产精品| 成人日韩精品| 国产免费av一区二区三区| 99国产精品一区二区| 亚洲人亚洲人色久| 在线一区视频观看| 国产精品日本一区二区三区在线| 99亚洲视频| 亚洲黄色免费av| 日本va欧美va精品发布| 欧美亚洲国产激情| 麻豆91精品91久久久的内涵| 麻豆精品91| 亚洲国产福利| 欧美日韩亚洲一区| 99riav1国产精品视频| 精品视频在线一区二区在线| 久久av一区| 美女av在线免费看| 国产精品亚洲二区| 免费在线看一区| 免费国产自久久久久三四区久久| 久久久久亚洲精品中文字幕| 日欧美一区二区| 在线视频日韩| 亚洲午夜一级| 视频福利一区| 成人在线视频区| 国产精品免费大片| 日本不卡一区二区| 亚洲专区欧美专区| 欧美亚洲国产精品久久| 国产精品成久久久久| 日韩精品久久久久久| 亚洲女人av| 最新日韩欧美| 五月天综合网站| 成人精品亚洲| 日韩国产欧美一区二区| 精品欧美日韩精品| 麻豆一区在线| 国产精品欧美在线观看| 久久国产三级| 日本欧美一区| 日本不卡高清| 日韩在线网址| 四虎在线精品| 中文字幕亚洲影视| 中文亚洲欧美| 免费看的黄色欧美网站| 国产精品视区| 国产精品毛片在线看| 国产模特精品视频久久久久| 香蕉国产精品| 日韩视频一区二区三区在线播放免费观看| 九色porny丨国产首页在线| 国产成人免费精品| 日韩欧美自拍| 久久久久久美女精品| 久久要要av| 久久精品动漫| 偷拍欧美精品| 亚洲香蕉久久| 综合激情五月婷婷| 亚洲人成亚洲精品| 日本不卡视频在线| 国产精品亚洲四区在线观看 | 亚洲精品一级| 日本在线不卡视频| 日本亚洲最大的色成网站www| 亚洲资源在线| 婷婷综合一区| 国产精品任我爽爆在线播放| 麻豆国产精品视频| 日韩a一区二区| 久久久久中文| 欧美日韩四区| 美女国产一区| 老司机精品久久| 日韩精品第二页| 你懂的亚洲视频| 综合日韩av| 亚洲激情社区| 日本免费新一区视频| 国产精品白浆| 成人美女视频| 99精品99| 久久国际精品| 岛国av免费在线观看| 婷婷国产精品| 婷婷综合福利| 精品久久精品| 天堂日韩电影| 在线精品一区二区| 国产精品一区二区99| 亚洲天堂av影院| 蜜桃久久av| 国产精品一线天粉嫩av| 麻豆视频在线观看免费网站黄| 韩国精品主播一区二区在线观看 | 日韩国产综合| 免费久久99精品国产自在现线| 日本午夜精品| 久草免费在线视频| 最新亚洲一区| 国产亚洲高清一区| 日韩成人亚洲| 日韩综合一区二区| 国产一区二区视频在线看| 免费观看久久av| 久久精品99国产国产精| 亚洲涩涩在线| 热久久久久久久| 久久精品国产免费| 亚洲一区二区毛片| 牛牛精品成人免费视频| 蜜臀91精品国产高清在线观看| 亚洲精品在线a| 中文在线中文资源| 免费在线观看日韩欧美| 精品国产网站| 综合亚洲自拍| 日韩在线视频精品| 欧美影院精品| 日韩大片在线播放| 欧美日韩一区二区国产| 免费黄色成人| 久久不见久久见中文字幕免费| 欧美99久久| 国产精品日韩精品中文字幕| 亚洲黄色在线| 精品国产亚洲日本| 免费美女久久99| xxxxx性欧美特大| 日韩一区二区三免费高清在线观看| 伊伊综合在线| 国产乱码精品一区二区三区四区| 欧美影院三区| 麻豆视频一区二区| 午夜亚洲福利|