日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術(shù)文章
文章詳情頁

Java實現(xiàn)Redis延時消息隊列

瀏覽:189日期:2023-02-11 15:31:28
目錄什么是延時任務(wù)延時任務(wù)的特點實現(xiàn)思路:代碼實現(xiàn)1.消息模型2.RedisMq 消息隊列實現(xiàn)類3.消息生產(chǎn)者4.消息消費者5. 消息執(zhí)接口6. 任務(wù)類型的實現(xiàn)類:可以根據(jù)自己的情況去實現(xiàn)對應(yīng)的隊列需求 什么是延時任務(wù)

延時任務(wù),顧名思義,就是延遲一段時間后才執(zhí)行的任務(wù)。舉個例子,假設(shè)我們有個發(fā)布資訊的功能,運營需要在每天早上7點準時發(fā)布資訊,但是早上7點大家都還沒上班,這個時候就可以使用延時任務(wù)來實現(xiàn)資訊的延時發(fā)布了。只要在前一天下班前指定第二天要發(fā)送資訊的時間,到了第二天指定的時間點資訊就能準時發(fā)出去了。如果大家有運營過公眾號,就會知道公眾號后臺也有文章定時發(fā)送的功能??偠灾?,延時任務(wù)的使用還是很廣泛的。

延時任務(wù)的特點 時間有序性 時間具體性 任務(wù)中攜帶詳細的信息 ,通常包括 任務(wù)ID, 任務(wù)的類型 ,時間點。實現(xiàn)思路:

將整個Redis當(dāng)做消息池,以kv形式存儲消息,key為id,value為具體的消息body使用ZSET做優(yōu)先隊列,按照score維持優(yōu)先級(用當(dāng)前時間+需要延時的時間作為score)輪詢ZSET,拿出score比當(dāng)前時間戳大的數(shù)據(jù)(已過期的)根據(jù)id拿到消息池的具體消息進行消費消費成功,刪除改隊列和消息消費失敗,讓該消息重新回到隊列

代碼實現(xiàn)

Java實現(xiàn)Redis延時消息隊列

1.消息模型

import lombok.Data;import lombok.experimental.Accessors;import javax.validation.constraints.NotNull;import java.io.Serializable;/** * Redis 消息隊列中的消息體 * @author shikanatsu */@Data@Accessors(chain = true)public class RedisMessage implements Serializable { /** 消息隊列組 **/ private String group; /** * 消息id */ private String id; /** * 消息延遲/ 秒 */ @NotNull(message = '消息延時時間不能為空') private long delay; /** * 消息存活時間 單位:秒 */ @NotNull(message = '消息存活時間不能為空') private int ttl; /** * 消息體,對應(yīng)業(yè)務(wù)內(nèi)容 */ private Object body; /** * 創(chuàng)建時間,如果只有優(yōu)先級沒有延遲,可以設(shè)置創(chuàng)建時間為0 * 用來消除時間的影響 */ private long createTime;}2.RedisMq 消息隊列實現(xiàn)類

package com.shixun.base.redisMq;import com.shixun.base.jedis.service.RedisService;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * Redis消息隊列 * * @author shikanatsu */@Componentpublic class RedisMq { /** * 消息池前綴,以此前綴加上傳遞的消息id作為key,以消息{@link MSG_POOL} * 的消息體body作為值存儲 */ public static final String MSG_POOL = 'Message:Pool:'; /** * zset隊列 名稱 queue */ public static final String QUEUE_NAME = 'Message:Queue:';// private static final int SEMIH = 30 * 60; @Resource private RedisService redisService; /** * 存入消息池 * * @param message * @return */ public boolean addMsgPool(RedisMessage message) {if (null != message) { redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl()); return true;}return false; } /** * 從消息池中刪除消息 * * @param id * @return */ public void deMsgPool(String group, String id) {redisService.remove(MSG_POOL + group + id); } /** * 向隊列中添加消息 * * @param key * @param score 優(yōu)先級 * @param val * @return 返回消息id */ public void enMessage(String key, long score, String val) {redisService.zsset(key, val, score); } /** * 從隊列刪除消息 * * @param id * @return */ public boolean deMessage(String key, String id) {return redisService.zdel(key, id); }}3.消息生產(chǎn)者

import cn.hutool.core.convert.Convert;import cn.hutool.core.lang.Assert;import cn.hutool.core.util.IdUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.validation.annotation.Validated;import javax.annotation.Resource;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.TimeUnit;/** * 消息生產(chǎn)者 * * @author shikanatsu */@Componentpublic class MessageProvider { static Logger logger = LoggerFactory.getLogger(MessageProvider.class); @Resource private RedisMq redisMq; SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss'); public boolean sendMessage(@Validated RedisMessage message) {Assert.notNull(message);//The priority is if there is no creation time//message.setCreateTime(System.currentTimeMillis());message.setId(IdUtil.fastUUID());Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);try { redisMq.addMsgPool(message); redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId()); logger.info('RedisMq發(fā)送消費信息{},當(dāng)前時間:{},消費時間預(yù)計{}',message.toString(),new Date(),sdf.format(delayTime));}catch (Exception e){ e.printStackTrace(); logger.error('RedisMq 消息發(fā)送失敗,當(dāng)前時間:{}',new Date()); return false;}return true; }}4.消息消費者

/** * Redis消息消費者 * @author shikanatsu */@Componentpublic class RedisMqConsumer { private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class); @Resource private RedisMq redisMq; @Resource private RedisService redisService; @Resource private MessageProvider provider; SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss'); //@Scheduled(cron = '*/1 * * * * ? ') /** Instead of a thread loop, you can use Cron expressions to perform periodic tasks */ public void baseMonitor(RedisMqExecute mqExecute){String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();//The query is currently expiredSet<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());if (null != set) { long current = System.currentTimeMillis(); for (Object id : set) {long score = redisService.getScore(queueName, id.toString()).longValue();//Once again the guarantee has expired , And then perform the consumptionif (current >= score) { String str = ''; RedisMessage message = null; String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName(); try {message = (RedisMessage)redisService.get(msgPool + id.toString());log.debug('RedisMq:{},get RedisMessage success now Time:{}',str,sdf.format(System.currentTimeMillis()));if(null==message){ return;}//Do something ; You can add a judgment here and if it fails you can add it to the queue againmqExecute.execute(message); } catch (Exception e) {e.printStackTrace();//If an exception occurs, it is put back into the queue// todo: If repeated, this can lead to repeated cycleslog.error('RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}',new Date());provider.sendMessage(message); } finally {redisMq.deMessage(queueName, id.toString());redisMq.deMsgPool(message.getGroup(),id.toString()); }} }} }}5. 消息執(zhí)接口

/** * @author shikanatsu */public interface RedisMqExecute { /** * 獲取隊列名稱 * @return */ public String getQueueName(); /** * 統(tǒng)一的通過執(zhí)行期執(zhí)行 * @param message * @return */ public boolean execute(RedisMessage message); /** * Perform thread polling */ public void threadPolling();}6. 任務(wù)類型的實現(xiàn)類:可以根據(jù)自己的情況去實現(xiàn)對應(yīng)的隊列需求

/** * 訂單執(zhí)行 * * @author shikanatsu */@Servicepublic class OrderMqExecuteImpl implements RedisMqExecute { private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class); public final static String name = 'orderPoll:'; @Resource private RedisMqConsumer redisMqConsumer; private RedisMqExecute mqExecute = this; @Resource private OrderService orderService; @Override public String getQueueName() {return name; } @Override /** * For the time being, only all orders will be processed. You can change to make orders */ public boolean execute(RedisMessage message) {logger.info('Do orderMqPoll ; Time:{}',new Date()); //Do return true; } @Override /** 通過線程去執(zhí)行輪詢的過程,時間上可以自由控制 **/ public void threadPolling() {ThreadUtil.execute(() -> { while (true) {redisMqConsumer.baseMonitor(mqExecute);ThreadUtil.sleep(5, TimeUnit.MICROSECONDS); }}); }}

使用事例 1. 實現(xiàn)RedisMqExecute 接口 創(chuàng)建對應(yīng)的輪詢或者采取定時器的方式執(zhí)行 和實現(xiàn)具體的任務(wù)。 2. 通過MessageProvider 實現(xiàn)相對應(yīng)的消息服務(wù)和綁定隊列組,通過隊列組的方式執(zhí)行。 3. 提示: 采取線程的方式需要在項目啟動過程中執(zhí)行,采取定時器或者調(diào)度的方式可以更加動態(tài)的調(diào)整。

到此這篇關(guān)于Java實現(xiàn)Redis延時消息隊列的文章就介紹到這了,更多相關(guān)Java Redis延時消息隊列內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!

標簽: Java
相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
日本少妇一区| 亚洲网站视频| 午夜在线播放视频欧美| 亚洲成人免费| 伊人久久亚洲影院| 欧美午夜精彩| 欧美特黄一区| 模特精品在线| 最新亚洲一区| 亚洲欧美网站在线观看| 999视频精品| 久久九九精品| 日韩一级精品| 中文无码久久精品| 首页国产欧美日韩丝袜| 亚洲精品乱码久久久久久蜜桃麻豆| 亚洲精选成人| 日韩成人精品一区二区三区| 欧美日韩亚洲三区| 麻豆国产欧美日韩综合精品二区| 久久精品 人人爱| 欧美激情网址| 四虎成人av| 中文在线中文资源| 日本精品在线中文字幕| 欧洲激情综合| 首页国产欧美久久| 欧美日韩中文| 国产在线一区不卡| 欧美特黄一级大片| 在线日韩成人| 免费日韩一区二区三区| 日本国产精品| 免费成人av在线播放| 国产欧美日韩影院| 日韩在线中文| 免费观看在线综合| 精品中国亚洲| 91超碰国产精品| 性欧美精品高清| 欧美精品影院| 伊人久久在线| 精品一区亚洲| 少妇精品在线| 美腿丝袜在线亚洲一区| 日韩在线综合| 亚洲香蕉久久| 91精品视频一区二区| 91视频精品| 只有精品亚洲| 国产成人精品一区二区三区免费| 欧美精品一线| 国产精品亚洲产品| 国产真实久久| 婷婷亚洲精品| 日韩欧美二区| 日本在线视频一区二区| 精品欧美日韩精品| 亚洲欧美日本国产专区一区| 国产伦理久久久久久妇女| 久久精品亚洲人成影院| 91亚洲精品在看在线观看高清| 国产成人精品三级高清久久91| 麻豆精品网站| 国产黄大片在线观看| 日日夜夜免费精品视频| 丝袜美腿诱惑一区二区三区| 日本亚州欧洲精品不卡| 久久天堂精品| 国产香蕉精品| 尹人成人综合网| 久久香蕉网站| 亚洲精品一级二级三级| 亚洲天堂一区二区| 久久激情综合网| 99精品网站| 国产精品美女久久久久久不卡| 狠狠干成人综合网| 91亚洲国产| 欧美一级二区| 黄色成人精品网站| 水蜜桃精品av一区二区| 91成人在线| 免费日韩一区二区| 天堂av在线| 国产精品毛片aⅴ一区二区三区| 一区三区视频| 日产精品一区| 精品国产乱码久久久久久樱花| 亚洲午夜久久| 三级精品视频| 精品久久福利| 欧美日一区二区在线观看| 免费在线成人网| 91九色精品国产一区二区| 久久久男人天堂| 美女视频黄久久| 日韩av中文在线观看| 夜夜精品视频| 999国产精品永久免费视频app| 久久av影视| 日韩av一级片| 亚洲欧美日韩在线观看a三区 | 四虎4545www国产精品| 国产日韩亚洲欧美精品| 亚洲精品成人一区| 亚洲精品123区| 午夜欧美巨大性欧美巨大| 久久一区欧美| 国产欧美成人| 青青草91久久久久久久久| 综合欧美精品| 蜜桃久久精品一区二区| 韩国精品主播一区二区在线观看| 国产一区三区在线播放| 国产精品一区二区三区美女 | 四虎精品永久免费| 视频一区在线播放| 狠狠久久婷婷| 午夜影院欧美| 亚洲国产日韩欧美在线| 亚洲精品.com| 欧美xxxx中国| 国产一区二区精品久| 欧美黄页在线免费观看| 国产精品4hu.www| 国产欧美一级| 国产精品亚洲一区二区在线观看| 日韩国产一区二| 日本麻豆一区二区三区视频| 亚洲欧美日韩在线观看a三区| 国精品一区二区| 不卡在线一区| 亚洲视频www| 99成人在线| 性色av一区二区怡红| 男女激情视频一区| 日韩中文字幕91| 亚洲欧美日韩精品一区二区| 欧美a级一区| 欧美精品一区二区三区精品| 尹人成人综合网| 三级欧美在线一区| 亚洲精品国产日韩| 青青在线精品| 国产精品久久久一区二区| 国产精品99久久免费观看| 国产精品一区二区三区美女| 国产精品久久亚洲不卡| 精品美女在线视频| 视频小说一区二区| 亚洲一区欧美激情| 亚洲精品系列| 日韩精品亚洲专区| 久久99久久人婷婷精品综合| 精品资源在线| 91精品一区二区三区综合| 夜夜精品视频| 日韩欧美美女在线观看| 国产麻豆一区| 成人亚洲精品| 久久在线免费| 在线综合亚洲| 日韩高清中文字幕一区| 国产福利一区二区精品秒拍 | 久久电影tv| 国产综合激情| 亚洲1区在线观看| 久久精品国产精品亚洲毛片| 色网在线免费观看| 9国产精品视频| 日本伊人久久| 日韩av自拍| 亚洲一区国产| 国产精品久久久久久av公交车| 日本黄色精品| 99视频一区| 国产日韩欧美在线播放不卡| 免费一区二区三区在线视频| 99热国内精品| 午夜天堂精品久久久久| 日本激情一区| 午夜在线视频观看日韩17c| 国产日韩中文在线中文字幕 | 人人精品人人爱| 国产91在线精品| 欧美在线亚洲| 国产精品va视频| 免费高潮视频95在线观看网站| 中文在线一区| 精品久久福利| 视频一区二区三区入口| 国产一区二区三区黄网站| 国产视频一区三区| 蜜桃精品视频| 美女黄网久久| 久久精品资源| 久久国产66| sm捆绑调教国产免费网站在线观看| 亚洲欧美日韩国产综合精品二区|