日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術(shù)文章
文章詳情頁

Java kafka如何實現(xiàn)自定義分區(qū)類和攔截器

瀏覽:26日期:2022-08-31 13:14:07

生產(chǎn)者發(fā)送到對應(yīng)的分區(qū)有以下幾種方式:

(1)指定了patition,則直接使用;(可以查閱對應(yīng)的java api, 有多種參數(shù))

(2)未指定patition但指定key,通過對key的value進(jìn)行hash出一個patition;

(3)patition和key都未指定,使用輪詢選出一個patition。

但是kafka提供了,自定義分區(qū)算法的功能,由業(yè)務(wù)手動實現(xiàn)分布:

1、實現(xiàn)一個自定義分區(qū)類,CustomPartitioner實現(xiàn)Partitioner

import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner { /** * * @param topic 當(dāng)前的發(fā)送的topic * @param key 當(dāng)前的key值 * @param keyBytes 當(dāng)前的key的字節(jié)數(shù)組 * @param value 當(dāng)前的value值 * @param valueBytes 當(dāng)前的value的字節(jié)數(shù)組 * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //這邊根據(jù)返回值就是分區(qū)號, 這邊就是固定發(fā)送到三號分區(qū) return 3; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}

2、producer配置文件指定,具體的分區(qū)類

// 具體的分區(qū)類props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner');

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機(jī)會對消息做一些定制化需求,比如修改消息等。

許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測試下:

1、定義消息攔截器,實現(xiàn)消息處理(可以是加時間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;import java.util.UUID;public class MessageInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { System.out.println('這是MessageInterceptor的configure方法'); } /** * 這個是消息發(fā)送之前進(jìn)行處理 * * @param record * @return */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 創(chuàng)建一個新的record,把uuid入消息體的最前部 System.out.println('為消息添加uuid'); return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),UUID.randomUUID().toString().replace('-', '') + ',' + record.value()); } /** * 這個是生產(chǎn)者回調(diào)函數(shù)調(diào)用之前處理 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println('MessageInterceptor攔截器的onAcknowledgement方法'); } @Override public void close() { System.out.println('MessageInterceptor close 方法'); }}

2、定義計數(shù)攔截器

import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { System.out.println('這是CounterInterceptor的configure方法'); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { System.out.println('CounterInterceptor計數(shù)過濾器不對消息做任何操作'); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統(tǒng)計成功和失敗的次數(shù) System.out.println('CounterInterceptor過濾器執(zhí)行統(tǒng)計失敗和成功數(shù)量'); if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結(jié)果 System.out.println('Successful sent: ' + successCounter); System.out.println('Failed sent: ' + errorCounter); }}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class Producer1 { public static void main(String[] args) throws Exception { Properties props = new Properties(); // Kafka服務(wù)端的主機(jī)名和端口號 props.put('bootstrap.servers', 'localhost:9092'); // 等待所有副本節(jié)點的應(yīng)答 props.put('acks', 'all'); // 消息發(fā)送最大嘗試次數(shù) props.put('retries', 0); // 一批消息處理大小 props.put('batch.size', 16384); // 請求延時,可能生產(chǎn)數(shù)據(jù)太快了 props.put('linger.ms', 1); // 發(fā)送緩存區(qū)內(nèi)存大小,數(shù)據(jù)是先放到生產(chǎn)者的緩沖區(qū) props.put('buffer.memory', 33554432); // key序列化 props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // value序列化 props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // 具體的分區(qū)類 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner'); //定義攔截器 List<String> interceptors = new ArrayList<>(); interceptors.add('kafka.MessageInterceptor'); interceptors.add('kafka.CounterInterceptor'); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1; i++) { producer.send(new ProducerRecord<String, String>('test_0515', i + '', 'xxx-' + i), new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println('這是producer回調(diào)函數(shù)');} }); } /*System.out.println('現(xiàn)在執(zhí)行關(guān)閉producer'); producer.close();*/ producer.close(); }}

總結(jié),我們可以知道攔截器鏈各個方法的執(zhí)行順序,假如有A、B攔截器,在一個攔截器鏈中:

(1)執(zhí)行A的configure方法,執(zhí)行B的configure方法

(2)執(zhí)行A的onSend方法,B的onSend方法

(3)生產(chǎn)者發(fā)送完畢后,執(zhí)行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執(zhí)行producer自身的callback回調(diào)函數(shù)。

(5)執(zhí)行A的close方法,B的close方法。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Java
相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
日韩福利在线观看| 美女视频黄 久久| 福利视频一区| 国产精品99视频| 日韩欧美一区二区三区在线视频| 成人台湾亚洲精品一区二区| 精品国产精品久久一区免费式| 国产精品扒开腿做爽爽爽软件| 久久av偷拍| 日韩国产欧美一区二区| 久久亚洲专区| 亚洲精品观看| 国产精品二区不卡| 999国产精品| 久久国产小视频| 亚洲午夜国产成人| 蜜桃久久久久| 一区二区小说| 日韩国产在线一| 高清一区二区| 99亚洲视频| 国产麻豆一区二区三区| 欧洲av一区二区| 婷婷精品在线| 国产美女高潮在线观看| 午夜在线播放视频欧美| 国产精品一区二区精品| 欧美日韩在线观看视频小说| 亚洲精品国产精品粉嫩| 91亚洲精品视频在线观看| 国产精品99在线观看| 国产精品日韩久久久| 麻豆精品一区二区综合av| 狠狠久久婷婷| yellow在线观看网址| 久久国产精品久久久久久电车 | 97精品久久| 久久精品不卡| 国内精品美女在线观看| 亚洲深深色噜噜狠狠爱网站| 久久激五月天综合精品| 亚洲精品一级二级| 国产精品一级在线观看| 综合干狼人综合首页| 久久久久国产| 岛国av在线网站| 国产精品亚洲二区| 日韩精品视频在线看| 免费毛片在线不卡| 午夜精品久久久久久久久久蜜桃| 欧美日本精品| 清纯唯美亚洲综合一区| 亚洲综合五月| 天使萌一区二区三区免费观看| 欧美freesex黑人又粗又大| 国内自拍视频一区二区三区| 日本不卡的三区四区五区| 日韩一区精品字幕| 亚洲自拍另类| 亚洲一区不卡| 亚洲尤物在线| 一区二区三区四区精品视频| 鲁大师成人一区二区三区| 亚洲精品在线观看91| 夜夜嗨一区二区| 日韩专区在线视频| 午夜亚洲福利| 国产亚洲一区二区三区不卡| 日本a级不卡| 国产精久久一区二区| 久久三级毛片| 国产高清精品二区| www.com.cn成人| 国产精品7m凸凹视频分类| 久久大逼视频| 国产麻豆一区| 四虎影视精品| 蘑菇福利视频一区播放| 日韩高清在线观看一区二区| 国产探花在线精品| 高潮久久久久久久久久久久久久| 丰满少妇一区| 久久中文视频| 日本亚洲不卡| 色婷婷综合网| 日本 国产 欧美色综合| 亚洲一区二区日韩| 老司机精品在线| 不卡在线一区二区| 欧美日韩在线精品一区二区三区激情综合| 美腿丝袜亚洲三区| 亚洲一级二级| 国产精品第十页| 免费久久精品| 久久99久久人婷婷精品综合| 久久天堂av| 国产精品毛片aⅴ一区二区三区| 日韩欧美中文| 国产精品一区二区中文字幕| 91精品国产成人观看| 国产欧美欧美| 少妇久久久久| 美日韩一区二区三区| 女人天堂亚洲aⅴ在线观看| 国产精品三级| 日韩一区二区三区精品视频第3页| 国产精品精品| 久久av综合| 日韩成人在线看| 国产精品日本| 欧美一级精品| 色综合五月天| 国产精品久久久久久久久久白浆 | 五月亚洲婷婷 | 久久国产中文字幕| 精品亚洲自拍| 欧美黄色精品| 国产精品va| 国产日韩亚洲欧美精品| 中文字幕一区二区三区在线视频| 久久亚洲精品中文字幕蜜潮电影| 精品视频自拍| 国产精品亚洲产品| 久久99精品久久久久久园产越南| 日韩va欧美va亚洲va久久| 日韩中文字幕不卡| 免费在线观看日韩欧美| 鲁大师成人一区二区三区| 少妇久久久久| 欧洲一区二区三区精品| 鲁鲁在线中文| 欧美亚洲国产激情| 夜夜精品视频| 亚洲精品高潮| 久久激五月天综合精品| 国产精品红桃| 日韩成人三级| 午夜在线精品| 日本在线不卡视频| 美女久久久精品| 日本综合字幕| 模特精品在线| 国产精品多人| 国产传媒在线观看| 91精品成人| 欧美日本不卡| 亚洲四虎影院| 欧美综合二区| 精品72久久久久中文字幕| 神马日本精品| 欧美日本三区| 国产区精品区| 精品视频自拍| 国产黄色一区| 久久影院一区二区三区| se01亚洲视频| 日韩高清三区| 97欧美在线视频| 亚洲激情五月| 精品视频网站| 中文无码久久精品| 伊人网在线播放| 免费人成精品欧美精品| 久久一区欧美| 亚洲精一区二区三区| 特黄毛片在线观看| 青青国产精品| 婷婷综合亚洲| 蜜桃成人精品| 美女国产精品久久久| 亚洲精品大全| 国产在线日韩| 日韩一区二区中文| 伊人www22综合色| 欧美/亚洲一区| 91青青国产在线观看精品| 国产一级成人av| 视频一区在线视频| 蜜桃tv一区二区三区| 正在播放日韩精品| 国产精品黄色| 国产精品最新| 国产精品一区2区3区| 日韩精品视频一区二区三区| 中文在线日韩| 伊人精品久久| 亚洲精品九九| 亚洲性视频在线| 激情综合网站| 在线日韩视频| 狠狠操综合网| 中文精品在线| 亚洲涩涩av| 久久激情五月婷婷| 国产精品xxx| 日本а中文在线天堂| 久久精品二区三区| 不卡在线一区二区| 亚洲专区视频| 国产精品亚洲二区|