日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術(shù)文章
文章詳情頁

如何通過Python實(shí)現(xiàn)RabbitMQ延遲隊列

瀏覽:25日期:2022-07-03 18:32:40

最近在做一任務(wù)時,遇到需要延遲處理的數(shù)據(jù),最開始的做法是現(xiàn)將數(shù)據(jù)存儲在數(shù)據(jù)庫,然后寫個腳本,隔五分鐘掃描數(shù)據(jù)表再處理數(shù)據(jù),實(shí)際效果并不好。因為系統(tǒng)本身一直在用RabbitMQ做異步處理任務(wù)的中間件,所以想到是否可以利用RabbitMQ實(shí)現(xiàn)延遲隊列。功夫不負(fù)有心人,RabbitMQ雖然沒有現(xiàn)成可用的延遲隊列,但是可以利用其兩個重要特性來實(shí)現(xiàn)之:1、Time To Live(TTL)消息超時機(jī)制;2、Dead Letter Exchanges(DLX)死信隊列。下面將具體描述實(shí)現(xiàn)原理以及實(shí)現(xiàn)代

延遲隊列的基礎(chǔ)原理Time To Live(TTL)

RabbitMQ可以針對Queue設(shè)置x-expires 或者 針對Message設(shè)置 x-message-ttl,來控制消息的生存時間,如果超時(兩者同時設(shè)置以最先到期的時間為準(zhǔn)),則消息變?yōu)閐ead letter(死信)RabbitMQ消息的過期時間有兩種方法設(shè)置。

通過隊列(Queue)的屬性設(shè)置,隊列中所有的消息都有相同的過期時間。(本次延遲隊列采用的方案)對消息單獨(dú)設(shè)置,每條消息TTL可以不同。

如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數(shù)值為準(zhǔn)。消息在隊列的生存時間一旦超過設(shè)置的TTL值,就成為死信(dead letter)

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數(shù),如果隊列內(nèi)出現(xiàn)了dead letter,則按照這兩個參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊列。

x-dead-letter-exchange:出現(xiàn)死信(dead letter)之后將dead letter重新發(fā)送到指定exchange x-dead-letter-routing-key:出現(xiàn)死信(dead letter)之后將dead letter重新按照指定的routing-key發(fā)送

隊列中出現(xiàn)死信(dead letter)的情況有:

消息或者隊列的TTL過期。(延遲隊列利用的特性) 隊列達(dá)到最大長度 消息被消費(fèi)端拒絕(basic.reject or basic.nack)并且requeue=false

綜合上面兩個特性,將隊列設(shè)置TTL規(guī)則,隊列TTL過期后消息會變成死信,然后利用DLX特性將其轉(zhuǎn)發(fā)到另外的交換機(jī)和隊列就可以被重新消費(fèi),達(dá)到延遲消費(fèi)效果。

如何通過Python實(shí)現(xiàn)RabbitMQ延遲隊列

延遲隊列設(shè)計及實(shí)現(xiàn)(Python)

從上面描述,延遲隊列的實(shí)現(xiàn)大致分為兩步:

產(chǎn)生死信,有兩種方式Per-Message TTL和 Queue TTL,因為我的需求中是所有的消息延遲處理時間相同,所以本實(shí)現(xiàn)中采用 Queue TTL設(shè)置隊列的TTL,如果需要將隊列中的消息設(shè)置不同的延遲處理時間,則設(shè)置Per-Message TTL(官方文檔)

設(shè)置死信的轉(zhuǎn)發(fā)規(guī)則,Dead Letter Exchanges設(shè)置方法(官方文檔)

完整代碼如下:

'''Created on Fri Aug 3 17:00:44 2018@author: Bge'''import pika,json,loggingclass RabbitMQClient: def __init__(self, conn_str=’amqp://user:pwd@host:port/%2F’): self.exchange_type = 'direct' self.connection_string = conn_str self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string)) self.channel = self.connection.channel() self._declare_retry_queue() #RetryQueue and RetryExchange logging.debug('connection established') def close_connection(self): self.connection.close() logging.debug('connection closed') def declare_exchange(self, exchange): self.channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type, durable=True) def declare_queue(self, queue): self.channel.queue_declare(queue=queue, durable=True,) def declare_delay_queue(self, queue,DLX=’RetryExchange’,TTL=60000): ''' 創(chuàng)建延遲隊列 :param TTL: ttl的單位是us,ttl=60000 表示 60s :param queue: :param DLX:死信轉(zhuǎn)發(fā)的exchange :return: ''' arguments={} if DLX: #設(shè)置死信轉(zhuǎn)發(fā)的exchange arguments[ ’x-dead-letter-exchange’]=DLX if TTL: arguments[’x-message-ttl’]=TTL print(arguments) self.channel.queue_declare(queue=queue, durable=True, arguments=arguments) def _declare_retry_queue(self): ''' 創(chuàng)建異常交換器和隊列,用于存放沒有正常處理的消息。 :return: ''' self.channel.exchange_declare(exchange=’RetryExchange’, exchange_type=’fanout’, durable=True) self.channel.queue_declare(queue=’RetryQueue’, durable=True) self.channel.queue_bind(’RetryQueue’, ’RetryExchange’,’RetryQueue’) def publish_message(self,routing_key, msg,exchange=’’,delay=0,TTL=None): ''' 發(fā)送消息到指定的交換器 :param exchange: RabbitMQ交換器 :param msg: 消息實(shí)體,是一個序列化的JSON字符串 :return: ''' if delay==0: self.declare_queue(routing_key) else: self.declare_delay_queue(routing_key,TTL=TTL) if exchange!=’’: self.declare_exchange(exchange) self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg, properties=pika.BasicProperties( delivery_mode=2, type=exchange )) self.close_connection() print('message send out to %s' % exchange) logging.debug('message send out to %s' % exchange) def start_consume(self,callback,queue=’#’,delay=1): ''' 啟動消費(fèi)者,開始消費(fèi)RabbitMQ中的消息 :return: ''' if delay==1: queue=’RetryQueue’ else: self.declare_queue(queue) self.channel.basic_qos(prefetch_count=1) try: self.channel.basic_consume( # 消費(fèi)消息callback, # 如果收到消息,就調(diào)用callback函數(shù)來處理消息queue=queue, # 你要從那個隊列里收消息 ) self.channel.start_consuming() except KeyboardInterrupt: self.stop_consuming() def stop_consuming(self): self.channel.stop_consuming() self.close_connection() def message_handle_successfully(channel, method): ''' 如果消息處理正常完成,必須調(diào)用此方法, 否則RabbitMQ會認(rèn)為消息處理不成功,重新將消息放回待執(zhí)行隊列中 :param channel: 回調(diào)函數(shù)的channel參數(shù) :param method: 回調(diào)函數(shù)的method參數(shù) :return: ''' channel.basic_ack(delivery_tag=method.delivery_tag) def message_handle_failed(channel, method): ''' 如果消息處理失敗,應(yīng)該調(diào)用此方法,會自動將消息放入異常隊列 :param channel: 回調(diào)函數(shù)的channel參數(shù) :param method: 回調(diào)函數(shù)的method參數(shù) :return: ''' channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

發(fā)布消息代碼如下:

from MQ.RabbitMQ import RabbitMQClientprint('start program')client = RabbitMQClient()msg1 = ’{'key':'value'}’client.publish_message(’test-delay’,msg1,delay=1,TTL=10000)print('message send out')

消費(fèi)者代碼如下:

from MQ.RabbitMQ import RabbitMQClientimport jsonprint('start program')client = RabbitMQClient()def callback(ch, method, properties, body): msg = body.decode() print(msg) # 如果處理成功,則調(diào)用此消息回復(fù)ack,表示消息成功處理完成。 RabbitMQClient.message_handle_successfully(ch, method)queue_name = 'RetryQueue'client.start_consume(callback,queue_name,delay=0)

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Python 編程
相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
欧美a级一区| 日韩av一区二区三区四区| 99re国产精品| 91精品一区二区三区综合| 国产精品99精品一区二区三区∴ | 久久久久久久久丰满| 麻豆一区在线| 美女视频网站久久| 激情国产在线| 色偷偷色偷偷色偷偷在线视频| 欧美日中文字幕| 免费国产自线拍一欧美视频| 午夜在线一区| 国产三级精品三级在线观看国产| 国产精品日本| 国产日产一区| 久久精品官网| 亚洲一区二区免费看| 欧美日韩xxxx| 久久精品天堂| 欧美~级网站不卡| 日韩av电影一区| 在线一区av| 日韩中文av| 欧美国产中文高清| 亚洲v在线看| 久久高清国产| 国产劲爆久久| 国产午夜久久| 蜜桃视频第一区免费观看| 国产精品免费看| 鲁鲁在线中文| 石原莉奈一区二区三区在线观看 | 99国产一区| 日韩三级一区| 91精品一区二区三区综合| 婷婷综合电影| 91精品国产调教在线观看| 麻豆国产91在线播放| 免费看日韩精品| 激情欧美丁香| 亚洲91精品| 日韩av在线中文字幕| 99久久夜色精品国产亚洲1000部| 国产精品久久久免费| 免费久久99精品国产| 国产精品久久久久久模特| 欧美va天堂在线| 欧美激情视频一区二区三区在线播放| 欧美搞黄网站| 日韩国产网站| 深夜视频一区二区| 精品久久99| 国产亚洲欧美日韩精品一区二区三区 | 老牛国内精品亚洲成av人片| 国产亚洲欧美日韩精品一区二区三区 | 另类专区亚洲| 日韩av网站在线观看| 欧美中文字幕一区二区| 日韩精品不卡一区二区| 欧美极品中文字幕| 天堂精品久久久久| 亚洲精品国产日韩| 亚洲黄页一区| 影院欧美亚洲| 夜久久久久久| 久久亚洲色图| 久久福利一区| 国产免费av一区二区三区| 久久狠狠亚洲综合| 国产精品伦一区二区| 欧美黄色一区| 欧洲av一区二区| 亲子伦视频一区二区三区| 久久久久国产精品一区二区| 精精国产xxxx视频在线野外| 欧美日韩四区| 日韩精品第二页| 国产极品久久久久久久久波多结野| 日韩精品欧美成人高清一区二区| 国产情侣一区在线| 国产成人精品一区二区免费看京| 日韩激情视频网站| 麻豆久久久久久| 精品久久在线| 视频在线观看一区二区三区| 91麻豆精品| 中文字幕在线看片| 蜜臀av免费一区二区三区| 亚洲aⅴ网站| 久久久精品区| 国产视频一区免费看| 久久精品 人人爱| 97精品视频在线看| 先锋影音久久久| 国产精品porn| 亚洲一区日本| 午夜电影一区| 91精品国产91久久久久久黑人| 日本a级不卡| 亚洲精品一区三区三区在线观看| 日韩国产在线不卡视频| 98精品久久久久久久| 日韩亚洲精品在线观看| bbw在线视频| 久久的色偷偷| 日本高清久久| 国产一区二区三区精品在线观看| 午夜日韩av| 国产成人免费视频网站视频社区| 91麻豆精品| 午夜精品一区二区三区国产| 国产激情欧美| 久热re这里精品视频在线6| а√在线中文在线新版| 日韩有码av| 欧美影院三区| 91看片一区| 麻豆中文一区二区| 日本精品另类| 九色精品91| 欧美成人午夜| 欧美国产91| 日韩久久精品| 香蕉久久精品| 久久视频国产| 久久要要av| 麻豆91精品91久久久的内涵| 男女精品网站| 91精品一区二区三区综合在线爱| 色偷偷偷在线视频播放| 韩国久久久久久| 国产精品久一| 国产一区国产二区国产三区| 久久这里只有精品一区二区| 麻豆视频久久| 鲁大师精品99久久久| 久久一区国产| 日本久久成人网| 亲子伦视频一区二区三区| 婷婷激情图片久久| 91成人精品| 在线看片不卡| 中文字幕日韩高清在线| 日韩在线观看中文字幕| 国产精品一站二站| 美女福利一区二区三区| 久久国产免费| 国产亚洲永久域名| 日韩1区2区日韩1区2区| 亚洲午夜天堂| 亚洲男女av一区二区| 亚洲一级大片| 麻豆精品新av中文字幕| 久久一区二区三区喷水| 国产午夜精品一区二区三区欧美 | 亚洲va在线| 午夜在线视频一区二区区别| 日本aⅴ亚洲精品中文乱码| 精品三级在线观看视频| 久久天堂av| 亚洲免费成人av在线| 日本精品在线播放| 国产精品毛片久久| 综合国产在线| 美腿丝袜亚洲一区| 九一成人免费视频| 亚洲综合专区| 久久不卡日韩美女| 水蜜桃久久夜色精品一区| 91精品国产福利在线观看麻豆| 只有精品亚洲| 国产一区二区三区不卡av| 视频一区二区欧美| 丝袜美腿一区| 国内揄拍国内精品久久| 美女国产一区| 都市激情国产精品| 亚洲人成高清| 国产在线|日韩| 国产激情欧美| 欧美一级一区| 精品1区2区3区4区| 成人在线丰满少妇av| 亚洲无线一线二线三线区别av| 久久国产三级| 午夜亚洲福利在线老司机| 成人福利视频| 国产精品香蕉| 在线观看视频免费一区二区三区| 久久激情一区| 97精品中文字幕| 国产精品一线天粉嫩av| 欧美欧美黄在线二区| 久久亚洲欧美| 亚洲丝袜美腿一区| 国产视频一区欧美| 国产精品久久久久av电视剧| 日韩欧美看国产| 国产精品久久久一区二区|