日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Java操作Kafka執行不成功

瀏覽:156日期:2023-12-29 14:49:40

問題描述

使用kafka-clients操作kafka始終不成功,原因不清楚,下面貼出相關代碼及配置,請懂得指點一下,謝謝!

環境及依賴

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version></dependency>

JDK版本為1.8、Kafka版本為2.12-0.10.2.0,服務器使用CentOS-7構建。

測試代碼

TestBase.java

public class TestBase { protected Logger log = LoggerFactory.getLogger(this.getClass()); protected String kafka_server = '192.168.60.160:9092' ; protected String topic = 'zlikun_topic';}

ProducerTest.java

public class ProducerTest extends TestBase { protected Properties props = new Properties(); @Before public void init() {props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ProducerConfig.ACKS_CONFIG, 'all');props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ; } @Test public void test() throws InterruptedException {KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 發送消息for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) {System.out.printf('offset = %d ,partition = %d n', recordMetadata.offset() ,recordMetadata.partition()); } else {log.error('send error !' ,e); }} });}TimeUnit.SECONDS.sleep(3);producer.close(); }}

ConsumerTest.java

public class ConsumerTest extends TestBase { private Properties props = new Properties(); @Before public void init() {props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ConsumerConfig.GROUP_ID_CONFIG ,'zlikun') ;props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 'true');props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, '1000');props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } @Test public void test() {Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));//consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) {System.out.printf('offset = %d, key = %s, value = %s%n', record.offset(), record.key(), record.value()); }} }}問題

# 測試topic為手動創建$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

控制臺輸出信息

[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time

問題解答

回答1:

測試了下, 正常 https://github.com/MOBX/kafka...

建議檢查下kafka集群連接是否正常,你報的是TimeoutException;如果不行, kafka-clients降到0.8.2.0試試

回答2:

我把日志調成DEBUG級別,觀察日志發現是不能正確解析主機名造成的。

2017-04-11 13:49:46.046 [main] DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 0 at m160:9092:java.io.IOException: Can’t resolve address: m160:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:182) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:57) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:768) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:684) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:347) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.zlikun.mq.ConsumerTest.test(ConsumerTest.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:107) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:649) at org.apache.kafka.common.network.Selector.connect(Selector.java:179) ... 36 more

網上找到一篇博文http://blog.sina.com.cn/s/blo...也支持了這一點,同樣我是在hosts文件中配置了主機名,測試就正常了。不過感覺這樣做似乎不太合理,實際應用中這樣用,太影響運維了吧,不知道有沒有其它更好的解決辦法。

[2017/04/11 16:16]剛從網上找到一篇文章http://www.tuicool.com/articl...,解決了這個問題!

標簽: java
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
亚洲综合电影| 蜜桃av一区二区三区电影| 亚洲视频综合| 成人精品高清在线视频| 中文字幕一区二区av| 蜜桃国内精品久久久久软件9| 国产a久久精品一区二区三区| 日韩在线网址| 鲁大师成人一区二区三区| 精品视频一区二区三区四区五区| 日韩国产一区二| 日本精品国产| 国产探花在线精品| 国产日韩欧美三级| 日本h片久久| 亚洲精华国产欧美| 久久精品99久久无色码中文字幕| 国产成人精品一区二区三区视频| 欧美国产中文高清| 精品三级国产| 久久久久久一区二区| 岛国av在线网站| 日韩欧美综合| 四虎4545www国产精品| 日韩精品视频网站| 欧美午夜三级| 国产精品观看| 国产91欧美| 久久精品主播| 日韩午夜高潮| 日韩在线观看中文字幕| 日本免费一区二区视频| 久久国产精品免费一区二区三区| 欧美自拍一区| 精品五月天堂| 国产综合视频| 嫩草伊人久久精品少妇av杨幂| 成人精品亚洲| 国产日韩免费| 久久国产亚洲| 国产毛片久久久| 午夜精品婷婷| 精品视频在线一区二区在线| 在线成人直播| 免费在线亚洲欧美| 丝袜脚交一区二区| 日韩啪啪电影网| 欧美日韩亚洲国产精品| 国产一区久久| 精品久久久亚洲| 日本一区二区三区视频在线看| 国产一区调教| 91综合久久爱com| 在线日韩中文| 久久99影视| 中文字幕亚洲在线观看| 日韩一区欧美| 国产精品国码视频| 亚洲精品自拍| 91久久黄色| 丝袜美腿一区| 美女毛片一区二区三区四区最新中文字幕亚洲 | 9久re热视频在线精品| 麻豆精品视频在线观看视频| 丝瓜av网站精品一区二区| 麻豆网站免费在线观看| 国产伦理久久久久久妇女| 免费成人av在线播放| 五月精品视频| 91精品国产调教在线观看| 精品少妇一区| 国产精品对白久久久久粗| 日韩av一区二区在线影视| 免费看精品久久片| 亚洲欧美高清| 亚洲欧美激情诱惑| 99在线|亚洲一区二区| 激情婷婷综合| 国产婷婷精品| 在线亚洲人成| 精品视频自拍| 精品午夜久久| 国产成人久久精品一区二区三区| 国产精品免费精品自在线观看| 日本在线成人| 日韩精品一区二区三区中文在线| 亚洲精品免费观看| 少妇高潮一区二区三区99| 玖玖玖国产精品| 欧美一区=区| 久久福利毛片| 蜜桃视频一区二区| 一区二区国产在线| 亚洲最大av| 久久成人国产| 中文字幕av一区二区三区人| 中文一区一区三区免费在线观 | 亚洲午夜av| 欧美日韩国产一区二区三区不卡| 日韩精品欧美激情一区二区| 久久中文字幕av一区二区不卡| 国际精品欧美精品| 精品久久97| 大香伊人久久精品一区二区| 精品一区二区三区免费看 | 国产亚洲一区二区三区啪| 欧美三区不卡| 精品国产一区二区三区av片| sm久久捆绑调教精品一区| 国产成人精品一区二区免费看京| 成人国产精品| 国产精品av久久久久久麻豆网| 精品一区在线| 亚洲日本在线观看视频| 日韩精品亚洲一区二区三区免费| 亚洲欧美日本国产| 久久a爱视频| 特黄毛片在线观看| 国产精品老牛| 午夜天堂精品久久久久| 日韩精品欧美大片| 久久精品国产99国产精品| 激情视频网站在线播放色| 久久中文字幕av| 亚洲精品少妇| 国产一区一一区高清不卡| 极品裸体白嫩激情啪啪国产精品| 日韩精品一级中文字幕精品视频免费观看 | 日韩电影免费网站| 亚洲免费激情| 欧美日本不卡高清| 天堂а√在线最新版中文在线| 好吊视频一区二区三区四区| 亚洲精品美女91| 国产一区二区三区视频在线| 蜜桃视频欧美| 亚洲a成人v| 色婷婷色综合| 蜜芽一区二区三区| 精品日产乱码久久久久久仙踪林| 亚洲精品.com| 日本伊人久久| 都市激情国产精品| 亚洲一二av| 天堂8中文在线最新版在线| 中文字幕一区二区三区四区久久 | 水野朝阳av一区二区三区| 欧美激情视频一区二区三区免费 | 日韩在线一二三区| 麻豆精品国产91久久久久久| 亚洲成人免费| 国产精品久久久久久妇女| 亚洲特级毛片| 欧美激情久久久久久久久久久| 激情五月综合网| 日韩av网站免费在线| 日韩国产欧美| 日韩国产在线观看一区| 99热精品久久| 国产美女视频一区二区| 好吊视频一区二区三区四区| 麻豆国产精品视频| 中文字幕一区二区精品区| 日本免费一区二区三区四区| 日本va欧美va精品发布| 欧美国产91| 97在线精品| 国产情侣一区在线| 中文在线一区| 欧美二三四区| 欧美日韩午夜| 日韩影院在线观看| 99精品在线观看| 精品视频亚洲| 欧美日韩亚洲一区| 日韩精品一二三| 亚洲婷婷在线| 日韩欧美一区二区三区在线视频| 久久国产婷婷国产香蕉| 久色成人在线| 久久五月天小说| 岛国av在线网站| 麻豆视频久久| 国产精品一区高清| 少妇精品久久久| 国产亚洲精品久久久久婷婷瑜伽| 欧美日韩视频免费观看| 国产精品欧美大片| 日本成人在线不卡视频| 国产亚洲精品v| 黄色不卡一区| 激情六月综合| 国产99久久| 久久婷婷久久| 欧美日韩免费看片| 日韩免费视频| 国产在线一区不卡| 精品久久影院| 岛国av在线网站| 日韩欧美不卡|