日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Python RabbitMQ實現簡單的進程間通信示例

瀏覽:233日期:2022-07-18 18:36:11

RabbitMQ 消息隊列

PYthreading Queue進程Queue 父進程與子進程,或同一父進程下的多個子進程進行交互缺點:兩個不同Python文件不能通過上面兩個Queue進行交互

erlong基于這個語言創建的一種中間商win中需要先安裝erlong才能使用rabbitmq_server start

安裝 Python module

pip install pika

or

easy_install pika

or源碼

rabbit 默認端口15672查看當前時刻的隊列數rabbitmqctl.bat list_queue

exchange在定義的時候就是有類型的,決定到底哪些queue符合條件,可以接受消息fanout:所有bind到此exchange的queue都可以收到消息direct:通過routingkey和exchange決定唯一的queue可以接受消息topic: 所有符合routingkey(此時可以是一個表達式)的routingkey所bind的queue都可以接受消息 表達式符號說明: # 代表一個或多個字符 * 代表任何字符

RPCremote procedure call 雙向傳輸,指令<-------->指令執行結果實現方法:創建兩個隊列,一個隊列收指令,一個隊列發送執行結果

用rabbitmq實現簡單的生產者消費者模型

1) rabbit_producer.py

# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# create the queue, the name of queue is 'hello'# durable=True can make the queue be exist, although the service have stopped before.channel.queue_declare(queue='hello', durable=True)# n RabbitMQ a message can never be sent directly to queue,it always need to go throughchannel.basic_publish(exchange = ' ', routing_key = 'hello', body = 'Hello world!', properties = pika.BasicPropreties( delivery_mode=2, # make the message persistence ) )print('[x] sent ’Hello world!’')connection.close()

2) rabbit_consumer.py

# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='hello', durable=True)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# follow is for consumer to auto change with the abilitychannel.basic_qos(profetch_count=1)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的fanout模式實現廣播模式

1) fanout_rabbit_publish.py

# Author : Xuefengimport pikaimport sys# 廣播模式:# 生產者發送一條消息,所有的開通鏈接的消費者都可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='logs', type='fanout')message = ’ ’.join(sys.argv[1:]) or 'info:Hello world!'channel.basic_publish( exchange='logs', routing_key='', body=message)print('[x] Send %r' % message)connection.close()

2) fanout_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)channel.queue_bind(exchange='logs', queue=queue_name)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的direct模式實現消息過濾模式

1) direct_rabbit_publisher.py

# Author : Xuefengimport pikaimport sys# 消息過濾模式:# 生產者發送一條消息,通過severity優先級來確定是否可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message)print('[x] Send %r:%r' % (severity, message))connection.close()

2) direct_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]if not severities: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的topic模式實現細致消息過濾模式

1) topic_rabbit_publisher.py

# Author : Xuefengimport pikaimport sys# 消息細致過濾模式:# 生產者發送一條消息,通過運行腳本 *.info 等確定接收消息類型進行對應接收connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')binding_key = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='topic_logs', routing_key=binding_key, body=message)print('[x] Send %r:%r' % (binding_key, message))connection.close()

2) topic_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)binding_keys = sys.argv[1:]if not binding_keys: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag=method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue='hello', no_ack=True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq實現rpc操作

1) Rpc_rabbit_client.py

# Author : Xuefengimport pikaimport timeimport uuidclass FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 隨機的生成一個接收命令執行結果的隊列 self.channel.basic_consume(self.on_response, # 只要收到消息就調用 no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicPropreties( rely_to=self.callback_queue, correlation_id=self.corr_id # 通過隨機生成的ID來驗證指令執行結果與指令的匹配性 ), body=str(n) ) while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consume,有沒有消息都繼續 print('no message...') time.sleep(0.5) return int(self.response)fibonacci_rcp = FibonacciRpcClient()print('[x] Requesting fib(30)')response = fibonacci_rcp.call(30)print('[x] Rec %r' % response)

2) Rpc_rabbit_server.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1)+fib(n-2)def on_request(ch, method, props, body): n = int(body) print('[.] fib(%s)' % n) response = fib(n) ch.basic_publish( exchange='', routing_key=props.rely_to, properties=pika.BasicPropreties(correlation_id= props.correlation), body = str(body) ) ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print('[x] Awaiting RPC requests')channel.start_consumeing()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]

到此這篇關于Python RabbitMQ實現簡單的進程間通信示例的文章就介紹到這了,更多相關Python RabbitMQ進程間通信內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Python 編程
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
国产精品专区免费| 综合亚洲自拍| 欧美精品一区二区久久| 蜜桃免费网站一区二区三区| 精品女同一区二区三区在线观看| 国产精品婷婷| 电影亚洲精品噜噜在线观看| 亚洲欧美久久精品| 快播电影网址老女人久久| 国产欧美日韩在线一区二区| 噜噜噜久久亚洲精品国产品小说| 国产一区二区三区四区五区| 国产精品久久久久久av公交车| 999久久久精品国产| 美女久久久精品| 久久免费视频66| 麻豆一区二区99久久久久| 国产精品密蕾丝视频下载| 国产精品视频一区二区三区 | 亚洲人成亚洲精品| 91高清一区| 麻豆精品网站| 人人精品亚洲| 亚洲a级精品| 欧美香蕉视频| 青草综合视频| 国产免费成人| 国产a亚洲精品| 欧美日韩午夜| 麻豆9191精品国产| 老司机免费视频一区二区| 欧美 日韩 国产一区二区在线视频| 国产精品日本| 91亚洲人成网污www| 极品av在线| 国产综合精品| 日本少妇精品亚洲第一区| 国产va免费精品观看精品视频| 麻豆精品视频在线观看| 1024精品一区二区三区| 午夜一级在线看亚洲| 日韩高清二区| 国产成人精品福利| 婷婷综合六月| 亚洲精一区二区三区| 国产精品日本一区二区不卡视频| 麻豆国产精品| 色偷偷偷在线视频播放| 一区二区三区四区精品视频| 欧美一区精品| 免费av一区| 国产精品1区| 99国产精品久久久久久久成人热| 日韩高清电影一区| 亚洲大全视频| 国产精品mv在线观看| 日韩网站在线| 日韩精品欧美激情一区二区| 你懂的亚洲视频| 蜜臀va亚洲va欧美va天堂 | 国产免费av国片精品草莓男男| 激情不卡一区二区三区视频在线| 视频一区中文字幕| 国产精品二区不卡| 国产精一区二区| 99视频在线精品国自产拍免费观看| 亚洲精品一级二级| 国产精品久久久久久模特| 免费在线看一区| 亚洲午夜av| 欧美日韩免费观看视频| 韩日一区二区| 国产精品视频一区二区三区 | 激情亚洲影院在线观看| 久久精品国产99国产精品| 精品亚洲成人| 国产毛片久久久| 里番精品3d一二三区| 日韩欧美不卡| 噜噜噜躁狠狠躁狠狠精品视频| 最新亚洲国产| 国产精品www994| 福利一区在线| 欧美午夜精彩| 天堂va蜜桃一区二区三区| 视频精品一区| 国产欧美一级| 综合日韩av| 日韩一区欧美二区| 日本91福利区| 国产欧美成人| 1024精品久久久久久久久| 日韩成人午夜精品| 精品国产一区二区三区2021| 日韩影院二区| 亚洲九九精品| 久久字幕精品一区| 国产真实久久| 亚洲乱码久久| 97国产成人高清在线观看| 国产精品91一区二区三区| 免费看久久久| 久久国产精品久久w女人spa| av在线最新| 久久97视频| 亚州国产精品| 日韩中文字幕| 免费观看在线综合色| 午夜在线播放视频欧美| 亚洲精品小说| 欧美91精品| 亚洲涩涩在线| 欧美13videosex性极品| 蜜桃av在线播放| 欧美1区免费| 综合激情网站| 久久爱www成人| 精品丝袜在线| 亚洲免费精品| 亚洲欧美在线综合| 香蕉成人久久| 久久激情五月激情| 国产精品天堂蜜av在线播放| 国产精品香蕉| 午夜精品久久久久久久久久蜜桃| 久久久久欧美精品| 91成人超碰| 日韩动漫一区| 日本不卡免费高清视频在线| 欧美一区二区三区高清视频| 欧美中文日韩| 欧美国产中文高清| 999国产精品999久久久久久| 性一交一乱一区二区洋洋av| 国产日韩高清一区二区三区在线 | 美女毛片一区二区三区四区最新中文字幕亚洲 | 中文字幕日韩欧美精品高清在线| 日本va欧美va精品| 国产精品国产一区| 免费精品视频在线| 亚洲播播91| 国产精品久久久久9999高清| 五月天综合网站| 精品国产乱码久久久| 综合国产精品| 欧美精品一区二区三区精品| 国产精品一页| 亚洲影院天堂中文av色| 国产精品99免费看| 免费视频一区二区三区在线观看| 99视频在线精品国自产拍免费观看| 国产欧美激情| 综合色就爱涩涩涩综合婷婷| av高清一区| 精品国产不卡| 日韩有吗在线观看| 精品在线91| 久久永久免费| 在线视频亚洲欧美中文| 精品深夜福利视频| 久久xxxx| 在线日韩一区| 国产一区二区精品福利地址| 日韩欧美中文字幕一区二区三区| 成人小电影网站| 日韩精品久久久久久| 蜜桃成人精品| 国产精品日本一区二区三区在线| 日韩视频一区二区三区在线播放免费观看| 精品中文字幕一区二区三区| 亚洲+小说+欧美+激情+另类| 欧美在线亚洲综合一区| 久久青青视频| 欧美成人一二区| 国产剧情在线观看一区| 亚洲日本在线观看视频| 丝袜美腿亚洲色图| 狠狠久久婷婷| 播放一区二区| 特黄特色欧美大片| sm久久捆绑调教精品一区| 欧美中文一区| 蜜臀av国产精品久久久久| 久久久久久黄| 免费观看不卡av| 亚洲人成在线网站| 在线精品亚洲欧美日韩国产| 福利片在线一区二区| 国产精品自拍区| 91午夜精品| 国产欧美日韩精品一区二区免费| 亚洲人成网77777色在线播放| 久久亚洲色图| 日韩一区二区三免费高清在线观看| 亚州av一区| 国产精品久久久久77777丨| 国内精品亚洲| 精品一区电影| 日韩欧美一区二区三区在线观看| 久久激情婷婷|