国产av日韩一区二区三区精品,成人性爱视频在线观看,国产,欧美,日韩,一区,www.成色av久久成人,2222eeee成人天堂

目錄
添加Kafka 依賴
編寫Kafka 生產(chǎn)者(Producer)
編寫Kafka 消費者(Consumer)
常見問題與註意事項
首頁 Java java教程 如何使用Java從Apache Kafka中產(chǎn)生和消費消息?

如何使用Java從Apache Kafka中產(chǎn)生和消費消息?

Jul 11, 2025 am 01:43 AM
java kafka

使用Java 生產(chǎn)和消費Apache Kafka 消息的關鍵在於正確配置Producer 和Consumer API 並理解其基本流程。 1. 首先添加Kafka 客戶端依賴,確保版本與集群兼容;2. 編寫生產(chǎn)者時配置bootstrap.servers、key.serializer 和value.serializer,並創(chuàng)建KafkaProducer 實例發(fā)送消息,注意關閉資源和可選回調處理;3. 編寫消費者時配置group.id、反序列化器等,使用KafkaConsumer 訂閱主題並循環(huán)拉取消息,注意提交offset 的策略;4. 常見問題包括group.id 衝突、offset 提交不當、資源未正確關閉等,建議先在本地單機環(huán)境測試驗證流程。

How to produce and consume messages from Apache Kafka with Java?

要用Java 生產(chǎn)和消費Apache Kafka 消息,其實並不難。核心在於理解Kafka 的基本概念,並掌握Producer 和Consumer API 的使用方法。只要配置得當,代碼結構清晰,就能輕鬆實現(xiàn)消息的發(fā)送與接收。

How to produce and consume messages from Apache Kafka with Java?

添加Kafka 依賴

在開始寫代碼之前,首先需要把Kafka 的客戶端庫引入項目中。如果你用的是Maven,可以在pom.xml中添加如下依賴:

 <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

這個版本比較新,也相對穩(wěn)定。當然你也可以根據(jù)你的Kafka 集群版本選擇對應的客戶端版本。

How to produce and consume messages from Apache Kafka with Java?

編寫Kafka 生產(chǎn)者(Producer)

生產(chǎn)者的任務是往Kafka 主題裡發(fā)消息。主要步驟包括配置屬性、創(chuàng)建Producer 實例、構建消息記錄並發(fā)送。

關鍵配置項有:

How to produce and consume messages from Apache Kafka with Java?
  • bootstrap.servers :Kafka broker 地址
  • key.serializervalue.serializer :指定key 和value 的序列化方式,一般用StringSerializer

示例代碼如下:

 Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");

producer.send(record);
producer.close();

幾點需要注意:

  • 發(fā)送後最好調用close() ,避免資源洩漏
  • 如果想確認是否發(fā)送成功,可以加上回調函數(shù).send(record, callback)
  • 如果只是測試,可以省略key

編寫Kafka 消費者(Consumer)

消費者負責從Kafka 主題中讀取消息。相比生產(chǎn)者,消費者的邏輯稍複雜一點,因為它需要主動拉取消息、處理偏移量等。

關鍵配置包括:

  • bootstrap.servers :同樣要指定broker 地址
  • group.id :消費者組ID,必須設置,否則會報錯
  • key.deserializervalue.deserializer :反序列化器,通常用StringDeserializer

一個簡單的消費者流程如下:

 Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received: " record.value());
    }
}

注意點:

  • poll()方法會拉取一批數(shù)據(jù),循環(huán)處理即可
  • 消費完不要忘了提交offset,可以選擇自動或手動提交
  • 如果你想只消費一次然後退出,可以在處理完後break 循環(huán)

常見問題與註意事項

有時候你會發(fā)現(xiàn)消息沒收到或者重複消費了,這通常是下面幾個原因造成的:

  • 消費者group.id 設置錯誤或衝突
  • 自動提交offset 頻率太高或太低
  • 沒有正確關閉producer 或consumer 導致狀態(tài)??不一致
  • Kafka broker 沒啟動或者網(wǎng)絡不通

建議開發(fā)時先跑通本地單機環(huán)境,再上集群。


基本上就這些。 Java 接入Kafka 的過程不算複雜,但有些細節(jié)容易忽略,比如序列化、group.id、提交策略這些,只要注意配置和流程,很快就能跑起來。

以上是如何使用Java從Apache Kafka中產(chǎn)生和消費消息?的詳細內容。更多資訊請關注PHP中文網(wǎng)其他相關文章!

本網(wǎng)站聲明
本文內容由網(wǎng)友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發(fā)現(xiàn)涉嫌抄襲或侵權的內容,請聯(lián)絡admin@php.cn

熱AI工具

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創(chuàng)建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發(fā)環(huán)境

Dreamweaver CS6

Dreamweaver CS6

視覺化網(wǎng)頁開發(fā)工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

如何在Java的地圖上迭代? 如何在Java的地圖上迭代? Jul 13, 2025 am 02:54 AM

遍歷Java中的Map有三種常用方法:1.使用entrySet同時獲取鍵和值,適用於大多數(shù)場景;2.使用keySet或values分別遍歷鍵或值;3.使用Java8的forEach簡化代碼結構。 entrySet返回包含所有鍵值對的Set集合,每次循環(huán)獲取Map.Entry對象,適合頻繁訪問鍵和值的情況;若只需鍵或值,可分別調用keySet()或values(),也可在遍歷鍵時通過map.get(key)獲取值;Java8中可通過Lambda表達式使用forEach((key,value)-&gt

Java可選示例 Java可選示例 Jul 12, 2025 am 02:55 AM

Optional能清晰表達意圖並減少null判斷的代碼噪音。 1.Optional.ofNullable是處理可能為null對象的常用方式,如從map中取值時可結合orElse提供默認值,邏輯更清晰簡潔;2.通過鍊式調用map實現(xiàn)嵌套取值,安全地避免NPE,任一環(huán)節(jié)為null則自動終止並返回默認值;3.filter可用於條件篩選,滿足條件才繼續(xù)執(zhí)行後續(xù)操作,否則直接跳到o??rElse,適合輕量級業(yè)務判斷;4.不建議過度使用Optional,如基本類型或簡單邏輯中其反而增加複雜度,部分場景直接返回nu

如何修復java.io.notserializable Exception? 如何修復java.io.notserializable Exception? Jul 12, 2025 am 03:07 AM

遇到java.io.NotSerializableException的核心解決方法是確保所有需序列化的類實現(xiàn)Serializable接口,並檢查嵌套對象的序列化支持。 1.給主類添加implementsSerializable;2.確保類中自定義字段對應的類也實現(xiàn)Serializable;3.用transient標記不需要序列化的字段;4.檢查集合或嵌套對像中的非序列化類型;5.查看異常信息定位具體哪個類未實現(xiàn)接口;6.對無法修改的類考慮替換設計,如保存關鍵數(shù)據(jù)或使用可序列化的中間結構;7.考慮改

Java中的可比較與比較器 Java中的可比較與比較器 Jul 13, 2025 am 02:31 AM

在Java中,Comparable用於類內部定義默認排序規(guī)則,Comparator用於外部靈活定義多種排序邏輯。 1.Comparable是類自身實現(xiàn)的接口,通過重寫compareTo()方法定義自然順序,適用於類有固定、最常用的排序方式,如String或Integer。 2.Comparator是外部定義的函數(shù)式接口,通過compare()方法實現(xiàn),適合同一類需要多種排序方式、無法修改類源碼或排序邏輯經(jīng)常變化的情況。兩者區(qū)別在於Comparable只能定義一種排序邏輯且需修改類本身,而Compar

Java方法參考解釋了 Java方法參考解釋了 Jul 12, 2025 am 02:59 AM

方法引用是Java中一種簡化Lambda表達式的寫法,使代碼更簡潔。它不是新語法,而是Java8引入的Lambda表達式的一種快捷方式,適用於函數(shù)式接口的上下文。其核心在於將已有方法直接作為函數(shù)式接口的實現(xiàn)來使用。例如System.out::println等價於s->System.out.println(s)。方法引用主要有四種形式:1.靜態(tài)方法引用(ClassName::staticMethodName);2.實例方法引用(綁定到特定對象,instance::methodName);3.

如何處理Java中的字符編碼問題? 如何處理Java中的字符編碼問題? Jul 13, 2025 am 02:46 AM

處理Java中的字符編碼問題,關鍵是在每一步都明確指定使用的編碼。 1.讀寫文本時始終指定編碼,使用InputStreamReader和OutputStreamWriter並傳入明確的字符集,避免依賴系統(tǒng)默認編碼。 2.在網(wǎng)絡邊界處理字符串時確保兩端一致,設置正確的Content-Type頭並用庫顯式指定編碼。 3.謹慎使用String.getBytes()和newString(byte[]),應始終手動指定StandardCharsets.UTF_8以避免平臺差異導致的數(shù)據(jù)損壞??傊?,通過在每個階段

如何在Java解析JSON? 如何在Java解析JSON? Jul 11, 2025 am 02:18 AM

解析JSON在Java中的常見方式有三種:使用Jackson、Gson或org.json。 1.Jackson適合大多數(shù)項目,性能好且功能全面,支持對象與JSON字符串之間的轉換及註解映射;2.Gson更適合Android項目或輕量級需求,使用簡單但處理複雜結構和高性能場景略遜;3.org.json適用於簡單任務或小腳本,不推薦用於大型項目,因其靈活性和類型安全不足。選擇應根據(jù)實際需求決定。

新電子郵件的Outlook快捷方式 新電子郵件的Outlook快捷方式 Jul 11, 2025 am 03:25 AM

在Outlook中快速新建郵件的方法如下:1.桌面版使用快捷鍵Ctrl Shift M,可直接彈出新郵件窗口;2.網(wǎng)頁版可通過創(chuàng)建包含JavaScript的書籤(如javascript:document.querySelector("divrole='button'").click())實現(xiàn)一鍵新建郵件;3.使用瀏覽器插件(如Vimium、CrxMouseGestures)自定義快捷鍵觸發(fā)“新建郵件”按鈕;4.Windows用戶還可通過右鍵任務欄Outlook圖標選擇“新建電

See all articles