日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Springboot RocketMq實現過程詳解

瀏覽:236日期:2023-05-16 13:09:35

首先,在虛擬機上安裝rocketmq和rocketMq可視化控制,安裝不做描述。

1、pom.xml文件添加依賴

mq的版本與連接的rocketmq版本保持一致

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>4.4.0</version> </dependency>

2、yml文件添加rocketmq配置

apache: rocketmq: #消費者的配置 consumer: pushConsumer: myConsumer #生產者的配置 producer: producerGroup: myGroup namesrvAddr: 192.168.233.128:9876

3、生產者類RocketProducer

package com.zp.springbootdemo.rocketmq;import com.alibaba.fastjson.JSONObject;import com.sun.org.apache.xpath.internal.objects.XString;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import org.apache.rocketmq.remoting.exception.RemotingException;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import org.springframework.util.StopWatch;import javax.annotation.PostConstruct;import java.io.UnsupportedEncodingException;/** * @Author zp * @Description rocketmq生產者 * @Date 22:06 2020/5/22 * @Param * @return **/@Componentpublic class RocketProducer { /** * 生產者的組名 */ @Value('${apache.rocketmq.producer.producerGroup}') private String producerGroup; /** * NameServer 地址 */ @Value('${apache.rocketmq.namesrvAddr}') private String namesrvAddr; private DefaultMQProducer defaultMQProducer; @PostConstruct public void defaultMQProducer(){ //生產者的組名 defaultMQProducer = new DefaultMQProducer(producerGroup); defaultMQProducer.setNamesrvAddr(namesrvAddr); defaultMQProducer.setVipChannelEnabled(false); try { defaultMQProducer.start(); System.out.println('producer啟動了。。。'); } catch (MQClientException e) { e.printStackTrace(); } } public String send(String topic,String tags,String body) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(topic,tags,body.getBytes(RemotingHelper.DEFAULT_CHARSET)); StopWatch stop = new StopWatch(); stop.start(); SendResult result = defaultMQProducer.send(message); System.out.println('發送響應:MsgId:' + result.getMsgId() + ',發送狀態:' + result.getSendStatus()); JSONObject jsonObject = new JSONObject(); jsonObject.put('msgId',result.getMsgId()); jsonObject.put('sendStatus',result.getSendStatus()); stop.stop(); return jsonObject.toJSONString(); }}

4、消費者類RocketConsumer

package com.zp.springbootdemo.rocketmq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.CommandCustomHeader;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;/** * @Author zp * @Description rocketmq消費者 * @Date 22:33 2020/5/22 * @Param * @return **/@Componentpublic class RockerConsumer implements CommandLineRunner { /** * 消費者 */ @Value('${apache.rocketmq.consumer.pushConsumer}') private String pushConsumer; //myConsumer /** * NameServer 地址 */ @Value('${apache.rocketmq.namesrvAddr}') private String namesrvAddr; /** * 初始化RocketMq的監聽信息,渠道信息 */ public void messageListener(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(pushConsumer); consumer.setNamesrvAddr(namesrvAddr); try { // 訂閱PushTopic下Tag為push的消息,都訂閱消息 consumer.subscribe('firstTopic','push'); // 程序第一次啟動從消息隊列頭獲取數據 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //可以修改每次消費消息的數量,默認設置是每次消費一條 consumer.setConsumeMessageBatchMaxSize(1); //在此監聽中消費信息,并返回消費的狀態信息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs,context)->{// 會把不同的消息分別放置到不同的隊列中for (Message msg:msgs){ System.out.println('接收到了消息:'+new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } /** * Callback used to run the bean. * * @param args incoming main method arguments * @throws Exception on error */ @Override public void run(String... args) throws Exception { this.messageListener(); }}

5、controller中編寫發送消息

package com.zp.springbootdemo.rocketmq;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.remoting.exception.RemotingException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.io.UnsupportedEncodingException;@RestController@RequestMapping('/rocketMq')public class MQController { @Autowired private RocketProducer producer; @RequestMapping('/myFirstProducer') public String pushMsg(String msg){ try { System.out.println('======'+msg); return producer.send('firstTopic','push',msg); } catch (InterruptedException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQClientException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return 'ERROR'; }}

6.測試

請求地址:http://127.0.0.1:8080/rocketMq/myFirstProducer?msg=hello

響應:{'msgId':'C0A8010E1A3818B4AAC2711E8CD50000','sendStatus':'SEND_OK'}

通過rocketMq可視化控制查看:

Springboot RocketMq實現過程詳解

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。

標簽: Spring
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
欧美aaaaaa午夜精品| 日韩大片在线| 欧美成人综合| 亚洲一级网站| 欧美在线网站| 中国女人久久久| 国产视频一区三区| 激情综合亚洲| 一级欧洲+日本+国产| 亚洲午夜电影| 久久夜色精品| 婷婷综合国产| 国产精品一站二站| 国内精品伊人| 久久91导航| 欧美高清一区| 蜜臀久久久久久久| 日韩av在线播放中文字幕| 欧美片网站免费| 久久精品亚洲一区二区| 中文在线а√天堂| 国产在线日韩| 最新亚洲一区| 亚洲欧美在线专区| 国产另类在线| 日韩一区自拍| 国产精品嫩草99av在线| 伊人久久亚洲| 国产精品2区| 神马午夜久久| 老鸭窝毛片一区二区三区| 日韩一区二区三区高清在线观看| 国产美女久久| 欧美日韩视频网站| 另类亚洲自拍| 国产精品久久久久9999高清| 波多视频一区| 午夜欧美精品久久久久久久| 综合激情在线| 欧美a在线观看| 久久高清免费| 三级久久三级久久久| 精品国产乱码久久久久久1区2匹| 尤物tv在线精品| 亚洲aⅴ网站| 国产一区二区三区免费在线| 亚州av乱码久久精品蜜桃| 日韩国产一二三区| 丁香六月综合| 亚洲视频二区| 久久男人天堂| 亚洲精品综合| 国产欧美高清视频在线| 日韩影院二区| 欧美亚洲三区| 亚洲女同一区| 久久69成人| 爽好久久久欧美精品| 久久精品国产网站| 麻豆亚洲精品| 国产精品成久久久久| 免费久久99精品国产自在现线| 久久av日韩| 亚洲欧洲日本mm| 国产精品久久久久久久久久10秀| 亚洲香蕉视频| 色偷偷色偷偷色偷偷在线视频| 在线观看一区| 国产精品亚洲一区二区三区在线观看| 亚洲精品乱码日韩| 成人羞羞在线观看网站| 日韩av二区在线播放| 久久久影院免费| 久久中文字幕一区二区| 蜜臀av一区二区在线免费观看| аⅴ资源天堂资源库在线| 日韩综合一区二区三区| 国产综合婷婷| 国产黄大片在线观看| 日韩精品一页| 亚洲精品一区二区在线看| 岛国精品一区| 日韩一区二区三区高清在线观看| 欧美日韩在线网站| 精品久久99| 国产亚洲欧美日韩在线观看一区二区| 尤物在线精品| 欧美aa一级| 成人国产精品久久| 国产欧美日韩在线观看视频 | 日韩精品永久网址| 国产精品成人**免费视频 | 日韩精品亚洲一区二区三区免费| 91av亚洲| 精品久久久网| 国产精区一区二区| 日本亚洲欧洲无免费码在线| 国产精品av久久久久久麻豆网| 风间由美中文字幕在线看视频国产欧美| 日韩国产一二三区| 在线日韩成人| 久久99伊人| 欧美午夜不卡| 99久精品视频在线观看视频| 国产精品久久久久久久久久10秀| 国产精品v亚洲精品v日韩精品| 日韩av不卡一区二区| 石原莉奈在线亚洲二区| 久久在线视频免费观看| 岛国av在线网站| 精品久久福利| 成人亚洲精品| 国产一区国产二区国产三区| 精品中文字幕一区二区三区四区| 国产乱子精品一区二区在线观看 | 精品国产亚洲一区二区三区大结局| 综合国产精品| 一区二区电影| 婷婷精品在线| 日韩国产精品久久久| 日本一区福利在线| 日本不卡一二三区黄网| 色8久久久久| 91精品美女| 久久激情五月婷婷| 国产精品一国产精品k频道56| 欧美日韩国产一区二区在线观看| 日本大胆欧美人术艺术动态| 国产精品美女久久久浪潮软件| 一区二区视频欧美| 爽爽淫人综合网网站| 免费观看在线综合色| 中文字幕日韩欧美精品高清在线| 一区二区精彩视频| 日本中文字幕一区二区| 欧美日韩午夜电影网| 国产精品调教视频| 精品一区91| 电影亚洲精品噜噜在线观看| 亚洲91视频| 亚洲一区二区三区免费在线观看| 在线看片日韩| 国产欧美大片| 国产99在线| 亚洲激情另类| 只有精品亚洲| 日本91福利区| 欧美1区2区3| 日韩国产专区| 欧美精品一线| 综合激情网站| 另类小说一区二区三区| 日韩欧美一区免费| 99国产精品视频免费观看一公开 | 日韩欧美三级| 欧美专区一区二区三区| 日本在线成人| 久久香蕉精品香蕉| 色综合www| 中文字幕日韩亚洲| 麻豆国产欧美日韩综合精品二区| 中文字幕在线视频久| 精品一区三区| 日韩精品久久久久久| 国产一区二区三区四区五区传媒| 欧美日韩激情| 欧美一区精品| 日韩另类视频| 亚洲三级毛片| 精品免费av一区二区三区| 99久久久国产精品美女| 亚洲三级国产| 四季av一区二区凹凸精品| 亚洲性图久久| 欧美日本久久| 欧美日韩在线播放视频| 日韩在线成人| 伊伊综合在线| 亚洲一区二区三区中文字幕在线观看 | 三级欧美在线一区| 国产精品久久久久9999高清| 久久精品观看| 日韩激情视频网站| 成人一区不卡| 亚洲免费在线| 国产精久久久| 91精品福利| 欧美激情综合| 亚洲一区二区毛片| 精品一区二区三区中文字幕| 亚洲欧美日韩专区| 精品久久免费| 亚洲毛片在线免费| 欧美精选视频一区二区| 日本国产亚洲| 99精品综合| 欧美国产视频| 日韩在线一区二区| 久久婷婷av| 97成人超碰|