国产av日韩一区二区三区精品,成人性爱视频在线观看,国产,欧美,日韩,一区,www.成色av久久成人,2222eeee成人天堂

首頁(yè) web前端 js教程 Kafka 基礎(chǔ)知識(shí)和實(shí)際示例

Kafka 基礎(chǔ)知識(shí)和實(shí)際示例

Dec 28, 2024 am 09:26 AM

在過(guò)去的幾周里,我一直在深入研究 Kafka 并一路做筆記,我決定組織和構(gòu)建一篇博客文章,在上面,除了概念和技巧之外,還有一個(gè)使用構(gòu)建的實(shí)際示例NestJS 和 KafkaJs。

卡夫卡是什么?

Apache Kafka 是一個(gè)分布式事件流平臺(tái),旨在處理實(shí)時(shí)事件。它能夠存儲(chǔ)、處理和檢索大規(guī)模、高吞吐量、低延遲的數(shù)據(jù)流,使其適合構(gòu)建實(shí)時(shí)數(shù)據(jù)管道和事件驅(qū)動(dòng)的應(yīng)用程序。

主要特點(diǎn):

  • 事件流: Kafka 將數(shù)據(jù)組織成主題,它們是事件的有序日志。
  • 分布式架構(gòu):Kafka 是為了可擴(kuò)展性和容錯(cuò)能力而構(gòu)建的。它作為稱(chēng)為代理的節(jié)點(diǎn)集群運(yùn)行,可以跨多個(gè)服務(wù)器分發(fā)數(shù)據(jù)。
  • 發(fā)布-訂閱模型:生產(chǎn)者將消息寫(xiě)入主題,消費(fèi)者從中讀取消息。 Kafka支持多個(gè)消費(fèi)者,允許不同的應(yīng)用程序獨(dú)立處理同一個(gè)數(shù)據(jù)流。
  • 高性能: Kafka 針對(duì)高吞吐量進(jìn)行了優(yōu)化,每秒處理數(shù)百萬(wàn)條消息,延遲較低。
  • 持久存儲(chǔ): Kafka 將消息存儲(chǔ)在磁盤(pán)上,保留期限可配置,確保數(shù)據(jù)持久性和可靠性。
  • 分區(qū)和復(fù)制:主題分為分區(qū)以實(shí)現(xiàn)可擴(kuò)展性,并跨代理進(jìn)行復(fù)制以實(shí)現(xiàn)容錯(cuò)。
  • 可重玩性:消費(fèi)者可以通過(guò)重置其偏移量來(lái)重新讀取消息,從而啟用數(shù)據(jù)重新處理或恢復(fù)。
  • 集成和生態(tài)系統(tǒng): Kafka 與各種系統(tǒng)集成,并擁有 Kafka Connect(用于數(shù)據(jù)集成)和 Kafka Streams(用于流處理)等工具。

優(yōu)點(diǎn)

  • 可靠性:通過(guò)數(shù)據(jù)分發(fā)、復(fù)制和分區(qū)確保容錯(cuò)。
  • 可擴(kuò)展性:Kafka 可以處理海量數(shù)據(jù)并水平擴(kuò)展而無(wú)需停機(jī)。
  • 持久性:消息被及時(shí)存儲(chǔ),確保彈性和數(shù)據(jù)持久性。
  • 性能:Kafka 在極端數(shù)據(jù)負(fù)載下保持高性能,處理大量數(shù)據(jù)而不會(huì)造成停機(jī)或數(shù)據(jù)丟失。

缺點(diǎn)

這些權(quán)衡是有意的設(shè)計(jì)選擇,旨在最大限度地提高 Kafka 的性能,但可能會(huì)給需要更大靈活性的用例帶來(lái)挑戰(zhàn):

  • 靈活性有限: Kafka 缺乏對(duì)擴(kuò)展查詢(xún)的支持,例如過(guò)濾報(bào)告中的特定數(shù)據(jù)。消費(fèi)者必須處理這些任務(wù),因?yàn)?Kafka 按消息接收順序的偏移量檢索消息。
  • 不適合長(zhǎng)期存儲(chǔ):Kafka 擅長(zhǎng)流數(shù)據(jù),但不適合長(zhǎng)期存儲(chǔ)歷史數(shù)據(jù)。數(shù)據(jù)重復(fù)會(huì)使大型數(shù)據(jù)集的存儲(chǔ)成本高昂。
  • 無(wú)通配符主題支持: Kafka 不允許使用通配符模式(例如 log-2024-*)從多個(gè)主題進(jìn)行消費(fèi)。

使用案例

  • 實(shí)時(shí)分析:在數(shù)據(jù)流發(fā)生時(shí)對(duì)其進(jìn)行處理和分析。
  • 事件溯源: 將應(yīng)用程序狀態(tài)的所有更改記錄為事件序列。
  • 日志聚合:從分布式系統(tǒng)收集和管理日志。
  • 數(shù)據(jù)管道:可靠且高效地在系統(tǒng)之間傳輸數(shù)據(jù)。
  • 物聯(lián)網(wǎng)應(yīng)用:實(shí)時(shí)處理來(lái)自物聯(lián)網(wǎng)設(shè)備的傳感器數(shù)據(jù)。

卡夫卡如何工作?

Kafka 集成了隊(duì)列和發(fā)布-訂閱消息傳遞模型的功能,為消費(fèi)者提供每種方法的優(yōu)勢(shì)。

  • 隊(duì)列 通過(guò)在多個(gè)消費(fèi)者實(shí)例之間分配任務(wù)來(lái)實(shí)現(xiàn)可擴(kuò)展的數(shù)據(jù)處理,但傳統(tǒng)隊(duì)列不支持多個(gè)訂閱者。
  • 發(fā)布-訂閱模型支持多個(gè)訂閱者,但無(wú)法在多個(gè)工作進(jìn)程之間分配任務(wù),因?yàn)槊織l消息都會(huì)發(fā)送給所有訂閱者。

Kafka 采用分區(qū)日志系統(tǒng)來(lái)結(jié)合隊(duì)列和發(fā)布-訂閱模型的優(yōu)點(diǎn)。日志是有序的記錄序列,被分為多個(gè)分區(qū),每個(gè)分區(qū)分配給不同的訂閱者(消費(fèi)者)。此設(shè)置使多個(gè)訂閱者能夠共享一個(gè)主題,同時(shí)保持可擴(kuò)展性。

Kafka fundamentals with a practical example

事件、主題和分區(qū)

我們已經(jīng)看到 Kafka 是一個(gè)旨在處理實(shí)時(shí)事件的平臺(tái),在討論如何處理這些事件之前,我們需要對(duì)它們進(jìn)行定義:

事件是記錄應(yīng)用程序的操作、事件或更改,例如付款、網(wǎng)站點(diǎn)擊或溫度讀數(shù)。

Kafka 中的

事件被建模為鍵/值對(duì),其中鍵和值都被序列化為字節(jié)序列。

  • 通常表示序列化的域?qū)ο蠡蛟驾斎耄鐐鞲衅鬏敵龌蚱渌麘?yīng)用程序數(shù)據(jù)。它們封裝了 Kafka 事件中傳輸?shù)暮诵男畔ⅰ?/li>
  • 可以是復(fù)雜的域?qū)ο?,但通常是?jiǎn)單的類(lèi)型,如字符串或整數(shù)。鍵通常標(biāo)識(shí)系統(tǒng)內(nèi)的實(shí)體,例如特定用戶(hù)、訂單或連接的設(shè)備,而不是唯一標(biāo)識(shí)單個(gè)事件(如關(guān)系數(shù)據(jù)庫(kù)中的主鍵)。

Kafka 將事件組織成有序日志,稱(chēng)為主題。當(dāng)外部系統(tǒng)將事件寫(xiě)入 Kafka 時(shí),它會(huì)被附加到主題的末尾。即使在閱讀后,消息也會(huì)在主題中保留可配置的持續(xù)時(shí)間。與隊(duì)列不同,主題具有持久性、可復(fù)制性和容錯(cuò)性,可以有效地存儲(chǔ)事件記錄。但日志只能順序掃描,不能查詢(xún)。

主題作為日志文件存儲(chǔ)在磁盤(pán)上,但是磁盤(pán)具有有限的大小和 I/O 等限制。為了克服這個(gè)問(wèn)題,Kafka 允許主題分為分區(qū),將單個(gè)日志分解為多個(gè)可以分布在不同服務(wù)器上的日志。這種分區(qū)使 Kafka 能夠水平擴(kuò)展,增強(qiáng)其處理大量事件和高吞吐量的能力。

Kafka 根據(jù)分區(qū)是否有 key:

將消息分配給分區(qū)
  • 無(wú)鍵:消息在所有分區(qū)之間循環(huán)分發(fā),確保數(shù)據(jù)均勻分布,但不保留消息順序。
  • With Key: 分區(qū)是通過(guò)對(duì) key 進(jìn)行哈希處理來(lái)確定的,確保具有相同 key 的消息始終進(jìn)入相同的分區(qū)并保持其順序。

經(jīng)紀(jì)人

Kafka 使用名為 brokers 的節(jié)點(diǎn)作為分布式數(shù)據(jù)基礎(chǔ)設(shè)施運(yùn)行,這些節(jié)點(diǎn)共同形成 Kafka 集群。代理可以在裸機(jī)硬件、云實(shí)例、Kubernetes 管理的容器中、筆記本電腦上的 Docker 中或任何可以運(yùn)行 JVM 進(jìn)程的地方運(yùn)行。

經(jīng)紀(jì)商關(guān)注:

  • 將新事件寫(xiě)入分區(qū)。
  • 從分區(qū)讀取服務(wù)。
  • 跨代理復(fù)制分區(qū)。

它們不執(zhí)行消息計(jì)算或主題到主題的路由,從而保持設(shè)計(jì)簡(jiǎn)單高效。

復(fù)制

Kafka 通過(guò)跨多個(gè)代理復(fù)制分區(qū)數(shù)據(jù)來(lái)確保數(shù)據(jù)的持久性和容錯(cuò)性。分區(qū)的主要副本是領(lǐng)導(dǎo)副本,而其他副本是跟隨副本。數(shù)據(jù)被寫(xiě)入領(lǐng)導(dǎo)者,領(lǐng)導(dǎo)者自動(dòng)將更新復(fù)制到追隨者。

此復(fù)制過(guò)程可確保:

  • 數(shù)據(jù)安全,即使在代理或存儲(chǔ)發(fā)生故障的情況下也是如此。
  • 自動(dòng)故障轉(zhuǎn)移,如果當(dāng)前領(lǐng)導(dǎo)者失敗,另一個(gè)副本將接管作為領(lǐng)導(dǎo)者。

開(kāi)發(fā)人員可以從這些保證中受益,而無(wú)需直接管理復(fù)制,因?yàn)?Kafka 會(huì)透明地處理它。

制片人

Kafka 生產(chǎn)者 是一個(gè)客戶(hù)端應(yīng)用程序,它將數(shù)據(jù)發(fā)送(發(fā)布)到 Kafka 主題。它負(fù)責(zé)創(chuàng)建消息(記錄)并將其傳輸?shù)?Kafka 集群。生產(chǎn)者根據(jù)其配置和消息密鑰的存在來(lái)確定存儲(chǔ)消息的主題分區(qū)。生產(chǎn)者負(fù)責(zé)但不限于:

  • 消息撰寫(xiě):
    • 每條消息由一個(gè)鍵(可選)、一個(gè)值(實(shí)際數(shù)據(jù))和元數(shù)據(jù)組成。
    • key決定消息的分區(qū),確保具有相同key的消息的順序。
  • 分區(qū)分配:
    • 如果提供了密鑰,生產(chǎn)者將使用哈希算法來(lái)確定分區(qū)。
    • 沒(méi)有密鑰,消息以循環(huán)方式跨分區(qū)分發(fā)以實(shí)現(xiàn)負(fù)載平衡。
  • 壓縮:

    生產(chǎn)者可以壓縮消息以減少網(wǎng)絡(luò)帶寬和存儲(chǔ)使用。

消費(fèi)者

Kafka 消費(fèi)者 是一個(gè)客戶(hù)端應(yīng)用程序,它從 Kafka 主題讀取消息, 它按照自己的節(jié)奏從 Kafka 分區(qū)檢索消息,允許實(shí)時(shí)或批量處理數(shù)據(jù)。請(qǐng)注意,Kafka 不會(huì)將消息推送給消費(fèi)者,而是通過(guò)請(qǐng)求數(shù)據(jù)從 Kafka 分區(qū)中拉取消息。

消費(fèi)者還可以跟蹤他們已處理的抵消額。偏移量可以自動(dòng)手動(dòng)提交,確保消費(fèi)者失敗時(shí)數(shù)據(jù)不會(huì)丟失。這允許靈活的消費(fèi),包括通過(guò)重置偏移量來(lái)重放消息。

消費(fèi)群體

消費(fèi)者組是一組消費(fèi)者,它們合作消費(fèi)來(lái)自某些主題的數(shù)據(jù),從而允許分布式處理主題的消息。

主題的分區(qū)在組內(nèi)的消費(fèi)者之間劃分,確保每條消息僅由組內(nèi)的一個(gè)消費(fèi)者處理。多個(gè)消費(fèi)組可以獨(dú)立消費(fèi)同一個(gè)主題,互不干擾。

當(dāng)新的消費(fèi)者加入組或現(xiàn)有消費(fèi)者失敗時(shí),Kafka 會(huì)在組中的消費(fèi)者之間重新分配分區(qū),以確保覆蓋所有分區(qū)。

序列化和反序列化

Kafka中的序列化和反序列化是將數(shù)據(jù)在其原始格式和字節(jié)數(shù)組之間進(jìn)行轉(zhuǎn)換以進(jìn)行傳輸和存儲(chǔ),從而使生產(chǎn)者和消費(fèi)者能夠高效地進(jìn)行通信。

序列化

是將對(duì)象或數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為字節(jié)流以便傳輸或存儲(chǔ)的過(guò)程。在生產(chǎn)者將數(shù)據(jù)發(fā)送到 Kafka 主題之前,它將數(shù)據(jù)(鍵和值)序列化為字節(jié)數(shù)組。

常見(jiàn)序列化格式:

  • JSON:人類(lèi)可讀,廣泛兼容。
  • Avro:緊湊高效,基于模式。
  • Protobuf:緊湊、基于模式且與語(yǔ)言無(wú)關(guān)。
  • 字符串:簡(jiǎn)單的基于文本的序列化。
  • 自定義序列化:滿(mǎn)足特定于應(yīng)用程序的需求。

反序列化

是相反的過(guò)程,其中字節(jié)流被轉(zhuǎn)換回其原始對(duì)象或數(shù)據(jù)結(jié)構(gòu)。當(dāng)消費(fèi)者從 Kafka 主題讀取數(shù)據(jù)時(shí),它將字節(jié)數(shù)組反序列化回可用的格式進(jìn)行處理。

壓縮

壓縮是指在存儲(chǔ)或傳輸消息之前減小消息的大小。它通過(guò)在生產(chǎn)者、代理和消費(fèi)者之間發(fā)送較小的有效負(fù)載來(lái)優(yōu)化存儲(chǔ)使用、減少網(wǎng)絡(luò)帶寬消耗并提高整體性能。

當(dāng)生產(chǎn)者向 Kafka 主題發(fā)送消息時(shí),它可以在傳輸之前對(duì)消息進(jìn)行壓縮。壓縮的消息按原樣存儲(chǔ)在代理上,并由消費(fèi)者在讀取消息時(shí)解壓縮。

優(yōu)點(diǎn)

  • 減少網(wǎng)絡(luò)帶寬:較小的有效負(fù)載意味著通過(guò)網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)較少。
  • 較低的存儲(chǔ)要求:壓縮消息占用更少的磁盤(pán)空間。
  • 提高吞吐量:較小的消息可以實(shí)現(xiàn)更快的數(shù)據(jù)傳輸和處理。

什么時(shí)候使用?

  • 消息大小較大的用例:壓縮大大減少了數(shù)據(jù)大小。
  • 高吞吐量系統(tǒng):減少網(wǎng)絡(luò)和存儲(chǔ)資源的壓力。
  • 批處理:當(dāng)生產(chǎn)者將多個(gè)消息一起批處理時(shí),壓縮效果最佳。

雖然壓縮可以節(jié)省資源,但必須平衡 CPU 使用率和壓縮優(yōu)勢(shì)之間的權(quán)衡,選擇適合您的用例的編解碼器。

支持的壓縮類(lèi)型

  • 無(wú): 無(wú)壓縮(默認(rèn))。
  • Gzip:壓縮比高,但CPU占用率較高。
  • Snappy:平衡的壓縮速度和CPU使用率,適合實(shí)時(shí)用例。
  • LZ4:更快的壓縮和解壓縮,針對(duì)低延遲系統(tǒng)進(jìn)行了優(yōu)化。
  • Zstd: 高壓縮比,性能比 Gzip 更好,較新的 Kafka 版本支持。

調(diào)音

優(yōu)化 Apache Kafka 的性能涉及微調(diào)各個(gè)組件以有效平衡吞吐量和延遲。本文僅觸及該主題的表面,以下是調(diào)優(yōu) Kafka 時(shí)需要考慮的一些方面:

  • 分區(qū)管理:

    • 分區(qū)計(jì)數(shù):增加分區(qū)數(shù)量以增強(qiáng)并行性和吞吐量。但是,請(qǐng)避免過(guò)多的分區(qū)以防止管理開(kāi)銷(xiāo)。根據(jù)您的消費(fèi)者能力和所需的消費(fèi)率調(diào)整分區(qū)數(shù)量。
  • 生產(chǎn)者配置:

    • 批處理:配置batch.size和linger.ms以實(shí)現(xiàn)高效的消息批處理,減少請(qǐng)求數(shù)量并提高吞吐量。
    • 壓縮: 實(shí)施壓縮(例如,compression.type=snappy)以減小消息大小,從而減少網(wǎng)絡(luò)和存儲(chǔ)使用。請(qǐng)注意壓縮帶來(lái)的額外 CPU 開(kāi)銷(xiāo)。
  • 消費(fèi)者配置:

    • 獲取設(shè)置:調(diào)整 fetch.min.bytes 和 fetch.max.wait.ms 以控制消費(fèi)者檢索消息的方式,根據(jù)應(yīng)用程序的需求平衡延遲和吞吐量。

實(shí)際例子

想象一個(gè)應(yīng)用程序記錄房間內(nèi)的溫度并使用 Kafka 傳輸該數(shù)據(jù),然后另一個(gè)應(yīng)用程序處理該數(shù)據(jù)。為簡(jiǎn)單起見(jiàn),我們將僅關(guān)注 Kafka 方面,生產(chǎn)者和消費(fèi)者都在同一應(yīng)用程序中實(shí)現(xiàn)。在這種情況下,特定時(shí)刻記錄的每個(gè)溫度都代表一個(gè)事件:

{
  temperature: 42,
  timeStamp: new Date(),
};

所有代碼都將在此存儲(chǔ)庫(kù)中。

首先,我們需要一個(gè) Kafka 代理,但我們不需要在我們的機(jī)器中安裝 Kafka,只需使用這個(gè) Docker Kafka 鏡像即可。

首先拉取該圖像:

docker 拉 apache/kafka

然后運(yùn)行它,映射 Kafka 在我們機(jī)器上的同一端口上偵聽(tīng)的端口:

docker run -d -p 9092:9092 --name Broker apache/kafka:latest

就是這樣,我們有一個(gè)正在運(yùn)行的 Kafka 代理,在繼續(xù)之前,您可能想通過(guò)創(chuàng)建主題、發(fā)送和使用消息來(lái)嘗試一下它,只需按照該圖像頁(yè)面上的說(shuō)明進(jìn)行操作即可。

為了構(gòu)建我們的應(yīng)用程序,我們將結(jié)合使用 NestJS 和 KafkaJS,首先使用 Nest CLI 創(chuàng)建應(yīng)用程序

嵌套新的我的巢項(xiàng)目

在項(xiàng)目文件夾內(nèi)安裝kafkajs

npm 我卡夫卡

并生成以下模塊

巢g莫卡夫卡

nest g mo 制作人

巢 g mo 消費(fèi)者

巢穴溫度

Kafka 模塊 將處理所有 Kafka 特定的操作,包括管理用于連接、斷開(kāi)連接、發(fā)送和接收消息的消費(fèi)者和生產(chǎn)者類(lèi)。這將是唯一直接與 kafkajs 包交互的模塊。

生產(chǎn)者和消費(fèi)者模塊將充當(dāng)發(fā)布-訂閱平臺(tái)(在本例中為 Kafka)與應(yīng)用程序其余部分之間的接口,抽象平臺(tái)特定的詳細(xì)信息。

溫度模塊將管理事件。它不需要知道正在使用哪個(gè)發(fā)布-訂閱平臺(tái),只需要消費(fèi)者和生產(chǎn)者即可運(yùn)行。

創(chuàng)建模塊后,我們還創(chuàng)建一個(gè)文件夾 src/interface 并在其中添加以下接口:

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

在 src/kafka/ 文件夾中添加實(shí)現(xiàn)這些接口的生產(chǎn)者和消費(fèi)者類(lèi):

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

不要忘記在 kafka.module.ts 中導(dǎo)出這些類(lèi)

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

現(xiàn)在我們可以轉(zhuǎn)到溫度模塊并實(shí)例化這些 Kafka 類(lèi)并開(kāi)始使用它們。然而,如果溫度模塊不必?fù)?dān)心它正在使用哪個(gè) pub-sub 平臺(tái),那就更好了。相反,它應(yīng)該簡(jiǎn)單地與注入的生產(chǎn)者和/或消費(fèi)者一起工作,只專(zhuān)注于發(fā)送和接收消息,而不管底層平臺(tái)如何。這樣,如果我們決定將來(lái)切換到不同的 pub-sub 平臺(tái),我們不需要對(duì)溫度模塊進(jìn)行任何更改。

為了實(shí)現(xiàn)這種抽象,我們可以創(chuàng)建 Producer 和 Consumer 類(lèi)來(lái)處理 Kafka Producer 和 Consumer 實(shí)現(xiàn)的細(xì)節(jié)。讓我們從制作人開(kāi)始:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}

現(xiàn)在,消費(fèi)者:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}

現(xiàn)在,我們可以專(zhuān)注于構(gòu)建溫度模塊。在Temperature.service.ts 文件中,我們將創(chuàng)建一個(gè)方法來(lái)注冊(cè)溫度,在本例中,該方法將簡(jiǎn)單地使用生產(chǎn)者將溫度數(shù)據(jù)發(fā)送到代理。此外,我們將實(shí)現(xiàn)一種方法來(lái)處理傳入消息以用于演示目的。

這些方法可以由另一個(gè)服務(wù)或控制器調(diào)用。但是,為了簡(jiǎn)單起見(jiàn),在本示例中,我們將在應(yīng)用程序啟動(dòng)時(shí)利用 onModuleInit 方法直接調(diào)用它們。

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

就是這樣!通過(guò)在 Docker 容器中運(yùn)行代理,您可以啟動(dòng)應(yīng)用程序來(lái)發(fā)送和接收消息。此外,您可以使用以下命令在代理容器內(nèi)打開(kāi) shell:

docker exec --workdir /opt/kafka/bin/ -it Broker sh

從那里,您可以直接與代理交互并向應(yīng)用程序發(fā)送消息、從中接收消息、創(chuàng)建新主題等。

這是包含本示例代碼的存儲(chǔ)庫(kù)。

以上是Kafka 基礎(chǔ)知識(shí)和實(shí)際示例的詳細(xì)內(nèi)容。更多信息請(qǐng)關(guān)注PHP中文網(wǎng)其他相關(guān)文章!

本站聲明
本文內(nèi)容由網(wǎng)友自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,本站不承擔(dān)相應(yīng)法律責(zé)任。如您發(fā)現(xiàn)有涉嫌抄襲侵權(quán)的內(nèi)容,請(qǐng)聯(lián)系admin@php.cn

熱AI工具

Undress AI Tool

Undress AI Tool

免費(fèi)脫衣服圖片

Undresser.AI Undress

Undresser.AI Undress

人工智能驅(qū)動(dòng)的應(yīng)用程序,用于創(chuàng)建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用于從照片中去除衣服的在線(xiàn)人工智能工具。

Clothoff.io

Clothoff.io

AI脫衣機(jī)

Video Face Swap

Video Face Swap

使用我們完全免費(fèi)的人工智能換臉工具輕松在任何視頻中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費(fèi)的代碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

功能強(qiáng)大的PHP集成開(kāi)發(fā)環(huán)境

Dreamweaver CS6

Dreamweaver CS6

視覺(jué)化網(wǎng)頁(yè)開(kāi)發(fā)工具

SublimeText3 Mac版

SublimeText3 Mac版

神級(jí)代碼編輯軟件(SublimeText3)

熱門(mén)話(huà)題

Java vs. JavaScript:清除混亂 Java vs. JavaScript:清除混亂 Jun 20, 2025 am 12:27 AM

Java和JavaScript是不同的編程語(yǔ)言,各自適用于不同的應(yīng)用場(chǎng)景。Java用于大型企業(yè)和移動(dòng)應(yīng)用開(kāi)發(fā),而JavaScript主要用于網(wǎng)頁(yè)開(kāi)發(fā)。

JavaScript評(píng)論:簡(jiǎn)短說(shuō)明 JavaScript評(píng)論:簡(jiǎn)短說(shuō)明 Jun 19, 2025 am 12:40 AM

JavascriptconcommentsenceenceEncorenceEnterential gransimenting,reading and guidingCodeeXecution.1)單inecommentsareusedforquickexplanations.2)多l(xiāng)inecommentsexplaincomplexlogicorprovideDocumentation.3)

如何在JS中與日期和時(shí)間合作? 如何在JS中與日期和時(shí)間合作? Jul 01, 2025 am 01:27 AM

JavaScript中的日期和時(shí)間處理需注意以下幾點(diǎn):1.創(chuàng)建Date對(duì)象有多種方式,推薦使用ISO格式字符串以保證兼容性;2.獲取和設(shè)置時(shí)間信息可用get和set方法,注意月份從0開(kāi)始;3.手動(dòng)格式化日期需拼接字符串,也可使用第三方庫(kù);4.處理時(shí)區(qū)問(wèn)題建議使用支持時(shí)區(qū)的庫(kù),如Luxon。掌握這些要點(diǎn)能有效避免常見(jiàn)錯(cuò)誤。

JavaScript與Java:開(kāi)發(fā)人員的全面比較 JavaScript與Java:開(kāi)發(fā)人員的全面比較 Jun 20, 2025 am 12:21 AM

JavaScriptIspreferredforredforwebdevelverment,而Javaisbetterforlarge-ScalebackendsystystemsandSandAndRoidApps.1)JavascriptexcelcelsincreatingInteractiveWebexperienceswebexperienceswithitswithitsdynamicnnamicnnamicnnamicnnamicnemicnemicnemicnemicnemicnemicnemicnemicnddommanipulation.2)

為什么要將標(biāo)簽放在的底部? 為什么要將標(biāo)簽放在的底部? Jul 02, 2025 am 01:22 AM

PlacingtagsatthebottomofablogpostorwebpageservespracticalpurposesforSEO,userexperience,anddesign.1.IthelpswithSEObyallowingsearchenginestoaccesskeyword-relevanttagswithoutclutteringthemaincontent.2.Itimprovesuserexperiencebykeepingthefocusonthearticl

JavaScript:探索用于高效編碼的數(shù)據(jù)類(lèi)型 JavaScript:探索用于高效編碼的數(shù)據(jù)類(lèi)型 Jun 20, 2025 am 12:46 AM

javascripthassevenfundaMentalDatatypes:數(shù)字,弦,布爾值,未定義,null,object和symbol.1)numberSeadUble-eaduble-ecisionFormat,forwidevaluerangesbutbecautious.2)

什么是在DOM中冒泡和捕獲的事件? 什么是在DOM中冒泡和捕獲的事件? Jul 02, 2025 am 01:19 AM

事件捕獲和冒泡是DOM中事件傳播的兩個(gè)階段,捕獲是從頂層向下到目標(biāo)元素,冒泡是從目標(biāo)元素向上傳播到頂層。1.事件捕獲通過(guò)addEventListener的useCapture參數(shù)設(shè)為true實(shí)現(xiàn);2.事件冒泡是默認(rèn)行為,useCapture設(shè)為false或省略;3.可使用event.stopPropagation()阻止事件傳播;4.冒泡支持事件委托,提高動(dòng)態(tài)內(nèi)容處理效率;5.捕獲可用于提前攔截事件,如日志記錄或錯(cuò)誤處理。了解這兩個(gè)階段有助于精確控制JavaScript響應(yīng)用戶(hù)操作的時(shí)機(jī)和方式。

Java和JavaScript有什么區(qū)別? Java和JavaScript有什么區(qū)別? Jun 17, 2025 am 09:17 AM

Java和JavaScript是不同的編程語(yǔ)言。1.Java是靜態(tài)類(lèi)型、編譯型語(yǔ)言,適用于企業(yè)應(yīng)用和大型系統(tǒng)。2.JavaScript是動(dòng)態(tài)類(lèi)型、解釋型語(yǔ)言,主要用于網(wǎng)頁(yè)交互和前端開(kāi)發(fā)。

See all articles