日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Spring Cloud Stream簡單用法

瀏覽:170日期:2023-06-29 18:49:32
目錄簡單使用Spring Cloud Stream 構建基于RocketMQ的生產者和消費者生產者消費者Stream其他特性消息發送失敗的處理消費者錯誤處理

Spring Cloud Stream對Spring Cloud體系中的Mq進⾏了很好的上層抽象,可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細節差異,就像Hibernate屏蔽掉了具體數據庫(Mysql/Oracle⼀樣)。如此⼀來,我們學習、開發、維護MQ都會變得輕松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在這個基礎上提供了RocketMQ的支持

簡單使用Spring Cloud Stream 構建基于RocketMQ的生產者和消費者生產者

pom文件中加入依賴

<dependencies><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.1.0.RELEASE</version></dependency> </dependencies>

配置文件中增加關于Spring Cloud Stream binder和bindings的配置

spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: output: producer: group: test sync: true bindings:output: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON

其中destination代表生產的數據發送到的topic 然后定義一個channel用于數據發送

import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;public interface TestChannel { @Output('output') MessageChannel output();}

最后構造數據發送的接口

@Controllerpublic class SendMessageController { @Resource private TestChannel testChannel; @ResponseBody @RequestMapping(value = 'send', method = RequestMethod.GET) public String sendMessage() {String messageId = UUID.randomUUID().toString();Message<String> message = MessageBuilder.withPayload('this is a test:' + messageId).setHeader(MessageConst.PROPERTY_TAGS, 'test').build();try { testChannel.output().send(message); return messageId + '發送成功';} catch (Exception e) { return messageId + '發送失敗,原因:' + e.getMessage();} }}消費者

消費者的pom引入與生產者相同,在此不再贅述,配置時需要將stream的output修改為input并修改對應屬性

spring: application: name: zhao-cloud-stream-consumer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: input: consumer: tags: test bindings:input: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON group: test

另外關于channel的構造也要做同樣的修改

import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface TestChannel { @Input('input') SubscribableChannel input();}

最后我在啟動類中對收到的消息進行了監聽

@StreamListener('input') public void receiveInput(@Payload Message message) throws ValidationException {System.out.println('input1 receive: ' + message.getPayload() + ', foo header: ' + message.getHeaders().get('foo')); }

測試結果

Spring Cloud Stream簡單用法

Spring Cloud Stream簡單用法

Stream其他特性消息發送失敗的處理

消息發送失敗后悔發送到默認的一個“topic.errors'的channel中(topic是配置的destination)。要配置消息發送失敗的處理,需要將錯誤消息的channel打開 消費者配置如下

spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: output: producer: group: test sync: true bindings:output: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON producer: errorChannelEnabled: true

在啟動類中配置錯誤消息的Channel信息

@Bean('stream-test-topic.errors') MessageChannel testoutPutErrorChannel(){return new PublishSubscribeChannel(); }

新建異常處理service

import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.Message;import org.springframework.stereotype.Service;@Servicepublic class ErrorProducerService { @ServiceActivator(inputChannel = 'stream-test-topic.errors') public void receiveProducerError(Message message){System.out.println('receive error msg :'+message); }}

當發生異常時,由于測試類中已經將異常捕獲,處理發送異常主要是在這里進行。模擬,應用與rocketMq斷開的場景。可見

Spring Cloud Stream簡單用法 Spring Cloud Stream簡單用法

消費者錯誤處理

首先增加配置為

spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq:binder: name-server: 127.0.0.1:9876bindings: output: producer: group: test sync: true bindings:output: destination: stream-test-topic content-type: text/plain # 內容格式。這里使用 JSON producer: errorChannelEnabled: true

增加相應的模擬異常的操作

@StreamListener('input') public void receiveInput(@Payload Message message) throws ValidationException {//System.out.println('input1 receive: ' + message.getPayload() + ', foo header: ' + message.getHeaders().get('foo'));throw new RuntimeException('oops'); } @ServiceActivator(inputChannel = 'stream-test-topic.test.errors') public void receiveConsumeError(Message message){System.out.println('receive error msg'+message.getPayload()); }

Spring Cloud Stream簡單用法

代碼地址https://github.com/zhendiao/deme-code/tree/main/zp

到此這篇關于Spring Cloud Stream簡單用法的文章就介紹到這了,更多相關Spring Cloud Stream使用內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Spring
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
国产99久久| 日韩欧美中文字幕在线视频| 欧美在线日韩| 欧美一区自拍| 精品一区二区三区在线观看视频| 精品免费av一区二区三区| av在线最新| 99re国产精品| 国产日韩亚洲| 国产成人在线中文字幕| 日韩国产在线观看| 中文字幕日韩欧美精品高清在线| 亚洲欧美日韩国产综合精品二区| 精品国产三区在线| 亚洲一区成人| 国产福利资源一区| 电影91久久久| 亚洲一二av| 午夜电影亚洲| 群体交乱之放荡娇妻一区二区| 美女视频黄久久| 中文不卡在线| 麻豆91精品视频| 日韩一级欧洲| 麻豆成人91精品二区三区| 亚洲国产综合在线看不卡| 日本久久一区| 久久精品卡一| 国产精品亚洲产品| 久久影院一区二区三区| 欧美久久精品一级c片| 国产乱码精品一区二区三区亚洲人| 激情久久一区二区| 日韩一区免费| 2023国产精品久久久精品双| 国产欧美日韩一区二区三区在线| 久久久精品午夜少妇| 日韩av中文字幕一区二区| 不卡中文一二三区| 精品视频在线你懂得| 亚洲精品一级二级三级| 午夜欧美巨大性欧美巨大| 国产精品第一| 日韩欧美另类中文字幕| 精品国产乱码久久久久久樱花| 美女亚洲一区| 91精品一区二区三区综合在线爱| 欧美日一区二区三区在线观看国产免| 亚洲国产影院| 麻豆精品蜜桃| 99久久99久久精品国产片果冰 | 国产精品毛片| 欧洲激情综合| 激情婷婷欧美| 精品在线91| 欧美日韩国产综合网| 久久久久久久久久久妇女| 红杏一区二区三区| 国产精品亚洲四区在线观看| 国产精品羞羞答答在线观看| 国产精品探花在线观看| 麻豆免费精品视频| 久久99国产精品视频| 福利一区二区免费视频| 吉吉日韩欧美| 亚洲精品一二三区区别| 久久福利毛片| 中文一区一区三区免费在线观 | 精品久久久久中文字幕小说| 麻豆视频久久| 日本免费一区二区三区四区| 日韩综合一区| 91精品电影| 日韩视频1区| www在线观看黄色| 亚洲欧美日韩国产一区二区| 国产午夜精品一区在线观看| 9999国产精品| 日韩精品第二页| 亚洲二区在线| 日韩不卡一区二区| 99久久夜色精品国产亚洲狼| 欧美日韩一区二区三区四区在线观看| 国际精品欧美精品| 伊人影院久久| 电影91久久久| 久久电影一区| 岛国精品一区| 综合视频一区| re久久精品视频| 国产精品视频一区二区三区综合| 精品国模一区二区三区| 欧美片第1页综合| 国产视频亚洲| 日本久久成人网| 久久精品资源| 色婷婷久久久| 国内精品亚洲| 激情综合五月| 人人香蕉久久| 亚洲欧美久久| 日韩啪啪电影网| 国产一精品一av一免费爽爽| 91精品啪在线观看国产18| 欧美黑人做爰爽爽爽| 久久国产三级精品| 国产一区白浆| 欧美91精品| 麻豆成人av在线| 91久久精品无嫩草影院| 欧美+亚洲+精品+三区| av在线日韩| 国产精品激情电影| 中文字幕av一区二区三区四区| 国产亚洲在线观看| 亚洲精品字幕| 蜜桃久久久久久久| 热久久免费视频| 亚洲精品三级| 国产精品久久乐| 久久久久久一区二区| 国产亚洲精品美女久久| 国产va免费精品观看精品视频| 999国产精品999久久久久久| 国产精品一区二区99| 国产精品1区在线| 国产国产精品| 欧洲激情综合| 国产日韩电影| 亚洲午夜一级| 精品午夜av| 一本色道精品久久一区二区三区| 精品国产欧美日韩一区二区三区| 日韩中文字幕一区二区高清99| 国产日韩一区| 国产综合亚洲精品一区二| 久久三级视频| 在线精品观看| 美女国产精品久久久| 亚洲在线一区| 日韩不卡一区| 亚洲欧美日本国产专区一区| 日韩三区四区| 高清日韩欧美| 婷婷综合在线| 夜夜精品视频| 亚洲精品乱码| 久久久精品区| 亚洲综合三区| 黄色网一区二区| 激情综合五月| 欧美不卡高清一区二区三区| 免费高潮视频95在线观看网站| 色综合视频一区二区三区日韩| 特黄毛片在线观看| 亚洲午夜91| 国产精品一区二区美女视频免费看| 亚洲精品乱码| 不卡一区2区| 亚洲黄色在线| 亚洲色图网站| 欧美激情五月| 91精品观看| 免费视频久久| 国产麻豆一区| 久久精品二区三区| 日韩精品视频中文字幕| 成人午夜网址| 亚洲免费影视| 精品视频一区二区三区在线观看| 亚洲精品伊人| 久久国产成人| 亚洲午夜在线| 久久久久国产精品一区三寸| 久久尤物视频| 国产在线不卡一区二区三区| 国产aⅴ精品一区二区三区久久 | 亚洲欧洲av| 在线看片福利| 老司机精品在线| 国产精品久久亚洲不卡| 日韩三级一区| 老牛国内精品亚洲成av人片| 欧美精品第一区| 国产精品欧美三级在线观看| 国产欧美视频在线| 久久这里只有精品一区二区| 国产精品久av福利在线观看| 久久精品国产福利| 深夜视频一区二区| 欧美日韩亚洲在线观看| xxxxx性欧美特大| 日韩精品91亚洲二区在线观看| 亚洲婷婷在线| 日韩在线一二三区| 9999国产精品| 日本综合视频| 国产免费av一区二区三区| 国产午夜精品一区二区三区欧美 | 久久九九精品|