日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

.Net Core和RabbitMQ限制循環消費的方法

瀏覽:951日期:2022-06-09 11:06:55
目錄
  • 前言
  • 循環場景
  • 解決方案
  • 一次消費
  • 消息不重入隊列
  • 限定重試次數
    • 消息頭設定次數
    • 存儲重試次數
    • 隊列使用Quorum類型
    • 隊列消息過期
  • 參考資料

    前言

    當消費者端接收消息處理業務時,如果出現異?;蚴蔷苁障⑾⒂肿兏鼮榈却哆f再次推送給消費者,這樣一來,則形成循環的條件。

    循環場景

    生產者發送100條消息到RabbitMQ中,消費者設定讀取到第50條消息時,設置拒收,同時設定是否還留存在當前隊列中(當requeue為false時,設置了死信隊列則進入死信隊列,否則移除消息)。

    consumer.Received += (model, ea) =>{? ? var message = ea.Body;? ? Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));? ? if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))? ? {? ? ? ? Console.WriteLine("拒收");? ? ? ? ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);? ? ? ? return;? ? }? ? ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};

    當第50條消息拒收,則仍在隊列中且處在隊列頭部,重新推送給消費者,再次拒收,再次推送,反反復復。

    最終其他消息全部消費完畢,僅剩第50條消息往復間不斷消費,拒收,消費,這將可能導致RabbitMQ出現內存泄漏問題。

    解決方案

    RabbitMQ及AMQP協議本身沒有提供這類重試功能,但可以利用一些已有的功能來間接實現重試限定(以下只考慮基于手動確認模式情況)。此處只想到或是只查到了如下幾種方案解決消息循環消費問題。

    • 一次消費
      • 無論成功與否,消費者都對外返回ack,將拒收原因或是異常信息catch存入本地或是新隊列中另作重試。
      • 消費者拒絕消息或是出現異常,返回Nack或Reject,消息進入死信隊列或丟棄(requeue設定為false)。
    • 限定重試次數
      • 在消息的頭中添加重試次數,并將消息重新發送出去,再每次重新消費時從頭中判斷重試次數,遞增或遞減該值,直到達到限制,requeue改為false,最終進入死信隊列或丟棄。
      • 可以在Redis、Memcache或其他存儲中存儲消息唯一鍵(例如Guid、雪花Id等,但必須在發布消息時手動設置它),甚至在mysql中連同重試次數一起存儲,然后在每次重新消費時遞增/遞減該值,直到達到限制,requeue改為false,最終進入死信隊列或丟棄。
      • 隊列使用Quorum類型,限制投遞次數,超過次數消息被刪除。
    • 隊列消息過期
      • 設置過期時間,給隊列或是消息設置TTL,重試一定次數消息達到過期時間后進入死信隊列或丟棄(requeue設定為true)。
    • 也許還有更多好的方案...

    一次消費

    對外總是Ack

    消息到達了消費端,可因某些原因消費失敗了,對外可以發送Ack,而在內部走額外的方式去執行補償操作,比如將消息轉發到內部的RabbitMQ或是其他處理方式,終歸是只消費一次。

    var queueName = "alwaysack_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{? ? try? ? {? ? ? ? var message = ea.Body;? ? ? ? Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));? ? ? ? if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))? ? ? ? {? ? ? ? ? ? throw new Exception("模擬異常");? ? ? ? }? ? }? ? catch (Exception ex)? ? {? ? ? ? Console.WriteLine(ex.Message);? ? }? ? finally? ? {? ? ? ? ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);? ? }};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    當消費端收到消息,處理時出現異常,可以另想辦法去處理,而對外保持著ack的返回,以避免消息的循環消費。

    消息不重入隊列

    在消費者端,因異常或是拒收消息時,對requeue設置為false時,如果設置了死信隊列,則符合“消息被拒絕且不重入隊列”這一進入死信隊列的情況,從而避免消息反復重試。如未設置死信隊列,則消息被丟失。

    此處假定接收100條消息,在接收到第50條消息時設置拒收,并且設置了requeue為false。

    var dlxExchangeName = "dlx_exchange";channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);var dlxQueueName = "dlx_queue";channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");var queueName = "nackorreject_queue";var arguments = new Dictionary<string, object>{    { "x-dead-letter-exchange", dlxExchangeName }};channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {Console.WriteLine("拒收");((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//關鍵在于requeue=falsereturn;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    如此一來,拒收消息不會重入隊列,并且現有隊列綁定了死信交換機,因此,消息進入到死信隊列中,如不綁定,則消息丟失。

    限定重試次數

    設置重試次數,限定循環消費的次數,允許短暫的循環,但最終打破循環。

    消息頭設定次數

    在消息頭中設置次數記錄作為標記,但是,消費端無法對接收到的消息修改消息頭然后將原消息送回MQ,因此,需要將原消息內容重新發送消息到MQ,具體步驟如下

    • 原消息設置不重入隊列。
    • 再發送新的消息其內容與原消息一致,可設置新消息的消息頭來攜帶重試次數。
    • 消費端再次消費時,便可從消息頭中查看消息被消費的次數。

    此處假定接收10條消息,在接收到第5條消息時設置拒收, 當消息頭中重試次數未超過設定的3次時,消息可以重入隊列,再次被消費。

    var queueName = "messageheaderretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{? ? var message = ea.Body;? ? Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));? ? if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))? ? {? ? ? ? var maxRetryCount = 3;? ? ? ? Console.WriteLine($"拒收 {DateTime.Now}");? ? ? ? //初次消費? ? ? ? if (ea.BasicProperties.Headers == null)? ? ? ? {? ? ? ? ? ? //原消息設置為不重入隊列? ? ? ? ? ? ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);? ? ? ? ? ? //發送新消息到隊列中? ? ? ? ? ? RetryPublishMessage(channel, queueName, message.ToArray(), 1);? ? ? ? ? ? return;? ? ? ? }? ? ? ? //獲取重試次數? ? ? ? var retryCount = ParseRetryCount(ea);? ? ? ? if (retryCount < maxRetryCount)? ? ? ? {? ? ? ? ? ? //原消息設置為不重入隊列? ? ? ? ? ? ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);? ? ? ? ? ? //發送新消息到隊列中? ? ? ? ? ? RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1);? ? ? ? ? ? return;? ? ? ? }? ? ? ? //到達最大次數,不再重試消息? ? ? ? ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);? ? ? ? return;? ? }? ? ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount){? ? var basicProperties = channel.CreateBasicProperties();? ? basicProperties.Headers = new Dictionary<string, object>();? ? basicProperties.Headers.Add("retryCount", retryCount);? ? channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);}static int ParseRetryCount(BasicDeliverEventArgs ea){? ? var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);? ? if (!existRetryRecord)? ? {? ? ? ? throw new Exception("沒有設置重試次數");? ? }? ? return (int)retryCount;}

    消息被拒收后,再重新發送消息到原有交換機或是隊列下中,以使得消息像是消費失敗回到了隊列中,如此來控制消費次數,但是這種場景下,新消息排在了隊列的尾部,而不是原消息排在隊列頭部。

    存儲重試次數

    在存儲服務中存儲消息的唯一標識與對應重試次數,消費消息前對消息進行判斷是否存在。

    與消息頭判斷一致,只是消息重試次數的存儲從消息本身挪入存儲服務中了。需要注意的是,消息發送端需要設置消息的唯一標識(MessageId屬性)

    //模擬外部存儲服務var MessageRetryCounts = new Dictionary<ulong, int>();var queueName = "storageretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{? ? var message = ea.Body;? ? Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {    ? ? var maxRetryCount = 3;    ? ? Console.WriteLine("拒收");        ? ? //重試次數判斷    ? ? var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);    ? ? if (!existRetryRecord)    ? ? {    ? ? ? ? //重入隊列,繼續重試    ? ? ? ? MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);    ? ? ? ? ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);    ? ? ? ? return;    ? ? }        ? ? if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)    ? ? {    ? ? ? ? //重入隊列,繼續重試    ? ? ? ? MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;    ? ? ? ? ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);    ? ? ? ? return;    ? ? }        ? ? //到達最大次數,不再重試消息    ? ? ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);    ? ? return;    }? ? ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    除第一次拒收外,允許三次重試機會,三次重試完畢后,設置requeue為false,消息丟失或進入死信隊列(如有設置的話)。

    隊列使用Quorum類型

    第一種和第二種分別是消息自身、外部存儲服務來管理消息重試次數,使用Quorum,由MQ來限定消息的投遞次數,也就控制了重試次數。

    設置隊列類型為quorum,設置投遞最大次數,當超過投遞次數后,消息被丟棄。

    var queueName = "quorumtype_queue";var arguments = new Dictionary<string, object>(){? ? { "x-queue-type", "quorum"},? ? { "x-delivery-limit", 3 }};channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{? ? var message = ea.Body;? ? Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));? ? if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))? ? {? ? ? ? Console.WriteLine($"拒收 {DateTime.Now}");? ? ? ? ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);? ? ? ? return;? ? }? ? ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    第一次消費被拒收重入隊列后,經最大三次投遞后,消費端不再收到消息,如此一來也限制了消息的循環消費。

    隊列消息過期

    當為消息設置了過期時間時,當消息沒有受到Ack,且還在隊列中,受到過期時間的限制,反復消費但未能成功時,消息將走向過期,進入死信隊列或是被丟棄。

    聚焦于過期時間的限制,因此在消費者端,因異?;蚴蔷苁障r,需要對requeue設置為true,將消息再次重入到原隊列中。

    設定消費者端第五十條消息會被拒收,且隊列的TTL設置為5秒。

    //死信交換機和死信隊列var dlxExchangeName = "dlx_exchange";channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);var dlxQueueName = "dlx_queue";channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");//常規隊列var queueName = "normalmessage_queue";var arguments = new Dictionary<string, object>{? ? { "x-message-ttl", 5000},? ? { "x-dead-letter-exchange", dlxExchangeName }};channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{? ? var message = ea.Body;? ? Console.WriteLine("接收到信息為:" + Encoding.UTF8.GetString(message.ToArray()));? ? if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))? ? {? ? ? ? Console.WriteLine($"拒收 {DateTime.Now}");? ? ? ? ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);? ? ? ? return;? ? }? ? ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    當消費者端拒收消息后消息重入隊列,再次消費,反復進行超過5秒后,消息在隊列中達到了過期時間,則被挪入到死信隊列中。

    從Web管理中死信隊列中可查看該條過期的消息。

    參考資料

    到此這篇關于.Net Core和RabbitMQ限制循環消費的文章就介紹到這了,更多相關.net core rabbitmq循環消費內容請搜索以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持!

    標簽: ASP.NET
    日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
    日韩一区自拍| 在线一区视频观看| 亚洲精品大全| 亚洲精品看片| 中文字幕一区二区三区在线视频| 亚洲五月婷婷| 久久午夜影视| 人人爽香蕉精品| 亚洲天堂日韩在线| 国产日韩欧美一区在线| 免费视频一区二区三区在线观看 | 日韩电影在线视频| 国产精品成人一区二区不卡| 日本在线啊啊| 一区二区精彩视频| 国产精品magnet| 成人啊v在线| 夜夜精品视频| 国产欧美三级| 香蕉久久精品| 久久国产乱子精品免费女| 精品久久一区| 亚洲综合丁香| 久久精品一区二区国产| 欧美成人国产| 日韩一区精品| 日韩综合在线| 久久亚洲风情| 久久97视频| 首页亚洲欧美制服丝腿| 久久99青青| 好吊视频一区二区三区四区| 91嫩草精品| 亚洲国产综合在线看不卡| 日韩超碰人人爽人人做人人添| 国产 日韩 欧美 综合 一区 | 亚洲激情国产| 黄色日韩精品| 国际精品欧美精品| 亚洲精品在线国产| 日韩国产激情| 久久a爱视频| 亚洲色图综合| 激情丁香综合| 精品欧美一区二区三区在线观看| 日韩不卡一二三区| sm捆绑调教国产免费网站在线观看 | 婷婷亚洲五月| 国产精选在线| 美女视频黄免费的久久| 日韩一区二区三免费高清在线观看| 视频福利一区| 久久九九精品| 日韩精品一卡| 成人免费网站www网站高清| 高清一区二区| 老司机免费视频一区二区| 青青草国产成人99久久| 首页国产欧美久久| 亚洲高清不卡| 婷婷成人在线| 精品在线91| 欧美日韩四区| 蜜臀av性久久久久蜜臀aⅴ流畅| 一区久久精品| 只有精品亚洲| 日韩va亚洲va欧美va久久| 日韩激情一区二区| 国产精品亚洲欧美| 精品亚洲自拍| 欧美精品一区二区三区精品| 性欧美69xoxoxoxo| 六月婷婷一区| 中文字幕一区二区三区四区久久| 亚洲天堂免费| 91成人精品在线| 鲁大师精品99久久久| 亚洲欧洲美洲av| 国产亚洲毛片| 国产精品久久久久77777丨| 久久只有精品| 欧美美女一区| 日韩激情精品| 天堂中文在线播放| 日韩精品一二三区| 久久精品理论片| 亚洲资源av| 欧美成a人片免费观看久久五月天| а√天堂中文在线资源8| 久久在线免费| 日本免费在线视频不卡一不卡二| 福利视频一区| 日韩一二三区在线观看| 福利一区在线| 亚洲毛片在线免费| 亚洲伊人av| 欧美日韩一区自拍| 欧美女激情福利| 国产一区二区三区久久久久久久久| 麻豆成人91精品二区三区| 欧美手机在线| 国产va免费精品观看精品视频| 综合色就爱涩涩涩综合婷婷| 午夜久久av| 激情欧美一区二区三区| 欧美激情一区| 亚洲aa在线| 国产亚洲一级| 久久精品国产www456c0m| 欧美国产三级| 亚洲aⅴ网站| 亚洲欧美日韩专区| 久久国产欧美| 日韩精品dvd| 日韩av免费大片| 久久精品三级| 国产精品嫩模av在线| 日本综合精品一区| 最新国产精品| 亚洲欧美日韩在线观看a三区| 精品国产不卡| 91精品福利观看| 久久国产生活片100| 婷婷精品在线| 日韩精品免费视频一区二区三区| 中文一区一区三区免费在线观| 国产精品7m凸凹视频分类| 久久久久久久久丰满| 久久九九国产| 黄色av一区| 中文字幕一区二区三区在线视频| 久久高清免费| 999国产精品999久久久久久| 亚洲一区成人| 日韩不卡在线观看日韩不卡视频 | 日本h片久久| 日韩一区二区三区免费播放| 亚洲婷婷丁香| 乱人伦精品视频在线观看| 国产日产一区| 日韩在线a电影| 欧美日韩水蜜桃| 黄色aa久久| 国产精品二区不卡| 国产精品羞羞答答在线观看| 亚洲一区二区三区免费在线观看| 麻豆精品一区二区综合av| 老色鬼久久亚洲一区二区| 99久久九九| 亚洲伦乱视频| 国产v日韩v欧美v| 精品国产乱码久久久久久樱花| 欧美精品国产一区| 欧美在线精品一区| 中文字幕日韩欧美精品高清在线| 99久久婷婷| 久久影视三级福利片| 亚洲69av| 亚洲一级大片| 国产一区二区高清| 99热精品久久| 久久精品国产99国产| 亚洲aa在线| 亚洲字幕久久| 丝袜诱惑制服诱惑色一区在线观看| 成人亚洲一区二区| 精品国产网站| 日韩成人免费| 日韩免费av| 蜜桃视频在线网站| 特黄毛片在线观看| 电影亚洲精品噜噜在线观看| 国产69精品久久| 国产成人久久精品麻豆二区| 麻豆中文一区二区| 欧美黄色精品| 国产精品最新| 国产欧美午夜| 久久69成人| av综合电影网站| 久久国产主播| 亚洲一区欧美激情| 亚洲无线观看| 欧美日一区二区在线观看| 国产精品激情| 国产精品久久久久久久免费观看| 免费福利视频一区二区三区| 欧美亚洲国产精品久久| 蘑菇福利视频一区播放| 日韩av午夜在线观看| 美女国产一区二区三区| 国产99精品| 亚洲欧洲一区| 国产精品亚洲综合色区韩国| 91亚洲国产| 亚洲免费成人av在线| 高清久久一区| 一区二区三区四区日韩| 精品伊人久久久| 香蕉久久国产|