国产av日韩一区二区三区精品,成人性爱视频在线观看,国产,欧美,日韩,一区,www.成色av久久成人,2222eeee成人天堂

首頁 web前端 js教程 Kafka 基礎(chǔ)知識與實(shí)際範(fàn)例

Kafka 基礎(chǔ)知識與實(shí)際範(fàn)例

Dec 28, 2024 am 09:26 AM

在過去的幾周里,我一直在深入研究Kafka 並一路做筆記,我決定組織和構(gòu)建一篇博客文章,在上面,除了概念和技巧之外,還有一個使用構(gòu)建的實(shí)際示例NestJS和KafkaJs。

卡夫卡是什麼?

Apache Kafka 是一個分散式事件流平臺,旨在處理即時事件。它能夠儲存、處理和檢索大規(guī)模、高吞吐量、低延遲的資料流,使其適合建立即時資料管道和事件驅(qū)動的應(yīng)用程式。

主要特點(diǎn):

  • 事件流: Kafka 將資料組織成主題,它們是事件的有序日誌。
  • 分散式架構(gòu):Kafka 是為了可擴(kuò)展性和容錯能力而建構(gòu)的。它作為稱為代理的節(jié)點(diǎn)叢集運(yùn)行,可以跨多個伺服器分發(fā)資料。
  • 發(fā)布-訂閱模型:生產(chǎn)者將訊息寫入主題,消費(fèi)者從中讀取訊息。 Kafka支援多個消費(fèi)者,讓不同的應(yīng)用程式獨(dú)立處理同一個資料流。
  • 高效能: Kafka 針對高吞吐量進(jìn)行了最佳化,每秒處理數(shù)百萬則訊息,延遲較低。
  • 持久儲存: Kafka 將訊息儲存在磁碟上,保留期限可配置,確保資料持久性和可靠性。
  • 分區(qū)與複製:主題分為分區(qū)以實(shí)現(xiàn)可擴(kuò)展性,並跨代理進(jìn)行複製以實(shí)現(xiàn)容錯。
  • 可重玩性:消費(fèi)者可以透過重置其偏移量來重新讀取訊息,從而啟用資料重新處理或復(fù)原。
  • 集成和生態(tài)系統(tǒng): Kafka 與各種系統(tǒng)集成,並擁有 Kafka Connect(用於數(shù)據(jù)集成)和 Kafka Streams(用於流處理)等工具。

優(yōu)點(diǎn)

  • 可靠性:透過資料分發(fā)、複製和分區(qū)確保容錯。
  • 可擴(kuò)充性:Kafka 可以處理大量資料並水平擴(kuò)展而無需停機(jī)。
  • 持久性:訊息被及時存儲,確保彈性和資料持久性。
  • 效能:Kafka 在極端資料負(fù)載下保持高效能,處理大量資料而不會造成停機(jī)或資料遺失。

缺點(diǎn)

這些權(quán)衡是有意的設(shè)計選擇,旨在最大限度地提高 Kafka 的性能,但可能會給需要更大靈活性的用例帶來挑戰(zhàn):

  • 靈活性有限: Kafka 缺乏對擴(kuò)展查詢的支持,例如過濾報告中的特定資料。消費(fèi)者必須處理這些任務(wù),因為 Kafka 按訊息接收順序的偏移量檢索訊息。
  • 不適合長期儲存:Kafka 擅長串流數(shù)據(jù),但不適合長期儲存歷史資料。資料重複會使大型資料集的儲存成本高昂。
  • 無通配符主題支援: Kafka 不允許使用通配符模式(例如 log-2024-*)從多個主題進(jìn)行消費(fèi)。

使用案例

  • 即時分析:在資料流發(fā)生時進(jìn)行處理與分析。
  • 事件溯源: 將應(yīng)用程式狀態(tài)的所有變更記錄為事件序列。
  • 日誌聚合:從分散式系統(tǒng)收集和管理日誌。
  • 資料管道:可靠且有效率地在系統(tǒng)之間傳輸資料。
  • 物聯(lián)網(wǎng)應(yīng)用:即時處理來自物聯(lián)網(wǎng)設(shè)備的感測器資料。

卡夫卡如何運(yùn)作?

Kafka 整合了隊列和發(fā)布-訂閱訊息傳遞模型的功能,為消費(fèi)者提供每種方法的優(yōu)勢。

  • 佇列 透過在多個消費(fèi)者實(shí)例之間分配任務(wù)來實(shí)現(xiàn)可擴(kuò)展的資料處理,但傳統(tǒng)佇列不支援多個訂閱者。
  • 發(fā)布-訂閱模型支援多個訂閱者,但無法在多個工作進(jìn)程之間分配任務(wù),因為每個訊息都會發(fā)送給所有訂閱者。

Kafka 採用分區(qū)日誌系統(tǒng)來結(jié)合佇列和發(fā)布-訂閱模型的優(yōu)點(diǎn)。日誌是有序的記錄序列,被分成多個分區(qū),每個分區(qū)分配給不同的訂閱者(消費(fèi)者)。此設(shè)定使多個訂閱者能夠共享一個主題,同時保持可擴(kuò)展性。

Kafka fundamentals with a practical example

活動、主題和分區(qū)

我們已經(jīng)看到 Kafka 是一個旨在處理即時事件的平臺,在討論如何處理這些事件之前,我們需要對它們進(jìn)行定義:

事件是記錄應(yīng)用程式的操作、事件或更改,例如付款、網(wǎng)站點(diǎn)擊或溫度讀數(shù)。

Kafka 中的

事件被建模為鍵/值對,其中鍵和值都被序列化為位元組序列。

  • 通常表示序列化的域物件或原始輸入,例如感測器輸出或其他應(yīng)用程式資料。它們封裝了 Kafka 事件中傳輸?shù)暮诵挠嵪ⅰ?
  • 可以是複雜的域?qū)ο螅ǔJ呛唵蔚念愋?,如字串或整?shù)。鍵通常標(biāo)識系統(tǒng)內(nèi)的實(shí)體,例如特定使用者、訂單或連接的設(shè)備,而不是唯一標(biāo)識單一事件(如關(guān)聯(lián)式資料庫中的主鍵)。

Kafka 將事件組織成有序日誌,稱為主題。當(dāng)外部系統(tǒng)將事件寫入 Kafka 時,它會被附加到主題的末端。即使在閱讀後,訊息也會在主題中保留可配置的持續(xù)時間。與佇列不同,主題具有持久性、可複製性和容錯性,可以有效地儲存事件記錄。但日誌只能順序掃描,不能查詢。

主題作為日誌檔案儲存在磁碟上,但是磁碟具有有限的大小和 I/O 等限制。為了克服這個問題,Kafka 允許主題分為分區(qū),將單一日誌分解為多個可以分佈在不同伺服器上的日誌。這種分區(qū)使 Kafka 能夠水平擴(kuò)展,增強(qiáng)其處理大量事件和高吞吐量的能力。

Kafka 根據(jù)分割區(qū)是否有 key:

將訊息分配給分割區(qū)
  • 無鍵:訊息在所有分割區(qū)之間循環(huán)分發(fā),確保資料均勻分佈,但不保留訊息順序。
  • With Key: 分區(qū)是透過對 key 進(jìn)行哈希處理來確定的,確保具有相同 key 的訊息始終進(jìn)入相同的分區(qū)並保持其順序。

經(jīng)紀(jì)人

Kafka 使用名為 brokers 的節(jié)點(diǎn)作為分散式資料基礎(chǔ)設(shè)施運(yùn)行,這些節(jié)點(diǎn)共同形成 Kafka 叢集。代理程式可以在裸機(jī)硬體、雲(yún)端執(zhí)行個體、Kubernetes 管理的容器中、筆記型電腦上的 Docker 或任何可以執(zhí)行 JVM 程序的地方運(yùn)作。

經(jīng)紀(jì)商關(guān)注:

  • 將新事件寫入分割區(qū)。
  • 從分割區(qū)讀取服務(wù)。
  • 跨代理複製分區(qū)。

它們不執(zhí)行訊息計算或主題到主題的路由,從而保持設(shè)計簡單且有效率。

複製

Kafka 透過跨多個代理複製分區(qū)資料來確保資料的持久性和容錯性。分區(qū)的主要副本是領(lǐng)導(dǎo)副本,而其他副本是跟隨副本。資料被寫入領(lǐng)導(dǎo)者,領(lǐng)導(dǎo)者自動將更新複製到追隨者。

此複製過程可確保:

  • 資料安全,即使在代理程式或儲存發(fā)生故障的情況下也是如此。
  • 自動故障轉(zhuǎn)移,如果當(dāng)前領(lǐng)導(dǎo)者失敗,另一個副本將接管作為領(lǐng)導(dǎo)者。

開發(fā)人員可以從這些保證中受益,而無需直接管理複製,因為 Kafka 會透明地處理它。

製片人

Kafka 生產(chǎn)者 是一個客戶端應(yīng)用程序,它將資料發(fā)送(發(fā)布)到 Kafka 主題。它負(fù)責(zé)建立訊息(記錄)並將其傳送到 Kafka 叢集。生產(chǎn)者根據(jù)其配置和訊息金鑰的存在來決定儲存訊息的主題分區(qū)。生產(chǎn)者負(fù)責(zé)但不限於:

  • 訊息撰寫:
    • 每個訊息由一個鍵(可選)、一個值(實(shí)際資料)和元資料組成。
    • key決定訊息的分區(qū),確保具有相同key的訊息的順序。
  • 分區(qū)分配:
    • 如果提供了金鑰,生產(chǎn)者將使用雜湊演算法來決定分區(qū)。
    • 沒有金鑰,訊息以循環(huán)方式跨分區(qū)分發(fā)以實(shí)現(xiàn)負(fù)載平衡。
  • 壓縮:

    生產(chǎn)者可以壓縮訊息以減少網(wǎng)路頻寬和儲存使用。

消費(fèi)者

Kafka 消費(fèi)者 是一個客戶端應(yīng)用程序,它從Kafka 主題讀取訊息, 它按照自己的節(jié)奏從Kafka 分區(qū)檢索訊息,允許即時或批量處理數(shù)據(jù)。請注意,Kafka 不會將訊息推送給消費(fèi)者,而是透過請求資料從 Kafka 分區(qū)中拉取訊息。

消費(fèi)者也可以追蹤他們已處理的抵銷額。偏移量可以自動手動提交,確保消費(fèi)者失敗時資料不會遺失。這允許靈活的消費(fèi),包括透過重置偏移量來重播訊息。

消費(fèi)群

消費(fèi)者組是一組消費(fèi)者,它們合作消費(fèi)來自某些主題的數(shù)據(jù),從而允許分散式處理主題的訊息。

主題的分區(qū)在群組內(nèi)的消費(fèi)者之間劃分,確保每個訊息僅由群組內(nèi)的一個消費(fèi)者處理。多個消費(fèi)組可以獨(dú)立消費(fèi)同一個主題,互不干擾。

當(dāng)新的消費(fèi)者加入群組或現(xiàn)有消費(fèi)者失敗時,Kafka 會在群組中的消費(fèi)者之間重新分配分區(qū),以確保覆蓋所有分區(qū)。

序列化和反序列化

Kafka中的序列化和反序列化是將資料在其原始格式和位元組數(shù)組之間進(jìn)行轉(zhuǎn)換以進(jìn)行傳輸和存儲,從而使生產(chǎn)者和消費(fèi)者能夠高效地進(jìn)行通訊。

序列化

是將物件或資料結(jié)構(gòu)轉(zhuǎn)換為位元組流以便傳輸或儲存的過程。在生產(chǎn)者將資料傳送到 Kafka 主題之前,它將資料(鍵和值)序列化為位元組數(shù)組。

常見序列化格式:

  • JSON:人類可讀,廣泛相容。
  • Avro:緊湊高效,基於模式。
  • Protobuf:緊湊、基於模式且與語言無關(guān)。
  • 字串:簡單的基於文字的序列化。
  • 自訂序列化:滿足特定於應(yīng)用程式的需求。

反序列化

是相反的過程,其中位元組流被轉(zhuǎn)換回其原始物件或資料結(jié)構(gòu)。當(dāng)消費(fèi)者從 Kafka 主題讀取資料時,它將位元組數(shù)組反序列化回可用的格式進(jìn)行處理。

壓縮

壓縮是指在儲存或傳輸訊息之前減少訊息的大小。它透過在生產(chǎn)者、代理商和消費(fèi)者之間發(fā)送較小的有效負(fù)載來優(yōu)化儲存使用、減少網(wǎng)路頻寬消耗並提高整體效能。

當(dāng)生產(chǎn)者向 Kafka 主題發(fā)送訊息時,它可以在傳輸之前對訊息進(jìn)行壓縮。壓縮的訊息原樣儲存在代理程式上,並由消費(fèi)者在讀取訊息時解壓縮。

優(yōu)點(diǎn)

  • 減少網(wǎng)路頻寬:較小的有效負(fù)載意味著透過網(wǎng)路傳輸?shù)馁Y料較少。
  • 較低的儲存需求:壓縮訊息佔(zhàn)用較少的磁碟空間。
  • 提高吞吐量:較小的訊息可以實(shí)現(xiàn)更快的資料傳輸和處理。

什麼時候使用?

  • 訊息大小較大的用例:壓縮大大減少了資料大小。
  • 高吞吐量系統(tǒng):減少網(wǎng)路和儲存資源的壓力。
  • 批次:當(dāng)生產(chǎn)者將多個訊息一起批次時,壓縮效果最佳。

雖然壓縮可以節(jié)省資源,但必須平衡 CPU 使用率和壓縮優(yōu)勢之間的權(quán)衡,選擇適合您的用例的編解碼器。

支援的壓縮類型

  • 無: 無壓縮(預(yù)設(shè))。
  • Gzip:壓縮比高,但CPU佔(zhàn)用率較高。
  • Snappy:平衡的壓縮速度和CPU使用率,適合即時使用案例。
  • LZ4:更快的壓縮和解壓縮,針對低延遲系統(tǒng)進(jìn)行了最佳化。
  • Zstd: 高壓縮比,效能比 Gzip 更好,較新的 Kafka 版本支援。

調(diào)音

最佳化 Apache Kafka 的效能涉及微調(diào)各個元件以有效平衡吞吐量和延遲。本文僅觸及該主題的表面,以下是調(diào)優(yōu) Kafka 時需要考慮的一些方面:

  • 分區(qū)管理:

    • 分區(qū)計數(shù):增加分區(qū)數(shù)量以增強(qiáng)並行性和吞吐量。但是,請避免過多的分區(qū)以防止管理開銷。根據(jù)您的消費(fèi)者能力和所需的消費(fèi)率調(diào)整分區(qū)數(shù)量。
  • 生產(chǎn)者配置:

    • 批次:配置batch.size和linger.ms以實(shí)現(xiàn)高效的訊息批次,減少請求數(shù)量並提高吞吐量。
    • 壓縮: 實(shí)作壓縮(例如,compression.type=snappy)以減少訊息大小,從而減少網(wǎng)路和儲存使用。請注意壓縮帶來的額外 CPU 開銷。
  • 消費(fèi)者配置:

    • 取得設(shè)定:調(diào)整 fetch.min.bytes 和 fetch.max.wait.ms 以控制消費(fèi)者檢索訊息的方式,根據(jù)應(yīng)用程式的需求平衡延遲和吞吐量。

實(shí)際例子

想像一個應(yīng)用程式記錄房間內(nèi)的溫度並使用 Kafka 傳輸該數(shù)據(jù),然後另一個應(yīng)用程式處理該數(shù)據(jù)。為簡單起見,我們將僅關(guān)注 Kafka 方面,生產(chǎn)者和消費(fèi)者都在同一應(yīng)用程式中實(shí)現(xiàn)。在這種情況下,特定時刻記錄的每個溫度都代表一個事件:

{
  temperature: 42,
  timeStamp: new Date(),
};

所有程式碼都將在此儲存庫中。

首先,我們需要一個 Kafka 代理,但我們不需要在我們的機(jī)器中安裝 Kafka,只需使用這個 Docker Kafka 映像即可。

先拉取該影像:

docker 拉 apache/kafka

然後運(yùn)行它,映射 Kafka 在我們機(jī)器上的同一連接埠上偵聽的連接埠:

docker run -d -p 9092:9092 --name Broker apache/kafka:latest

就是這樣,我們有一個正在運(yùn)行的 Kafka 代理,在繼續(xù)之前,您可能想通過創(chuàng)建主題、發(fā)送和使用訊息來嘗試它,只需按照該圖像頁面上的說明進(jìn)行操作即可。

為了建立我們的應(yīng)用程序,我們將結(jié)合 NestJS 和 KafkaJS,首先使用 Nest CLI 建立應(yīng)用程式

嵌套新的我的巢項目

在專案資料夾內(nèi)安裝kafkajs

npm 我卡夫卡

並產(chǎn)生以下模組

巢g莫卡夫卡

nest g mo 製作人

巢 g mo 消費(fèi)者

巢穴溫度

Kafka 模組 將處理所有 Kafka 特定的操作,包括管理用於連接、斷開連接、發(fā)送和接收訊息的消費(fèi)者和生產(chǎn)者類別。這將是唯一直接與 kafkajs 套件互動的模組。

生產(chǎn)者和消費(fèi)者模組將充當(dāng)發(fā)布-訂閱平臺(在本例中為 Kafka)與應(yīng)用程式其餘部分之間的接口,抽象平臺特定的詳細(xì)資訊。

溫度模組將管理事件。它不需要知道正在使用哪個發(fā)布-訂閱平臺,只需要消費(fèi)者和生產(chǎn)者即可運(yùn)作。

建立模組後,我們也會建立一個資料夾 src/interface 並在其中新增以下介面:

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

在 src/kafka/ 資料夾中新增實(shí)作這些介面的生產(chǎn)者和消費(fèi)者類別:

// src/interfaces/consumer.interface.ts

export type ConsumerMessage = {
  key?: string;
  value: any;
};

export type OnMessage = (message: ConsumerMessage) => Promise<void>;

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (onMessage?: OnMessage) => Promise<void>;
  isConnected: () => boolean;
}
// src/kafka/kafka.producer.ts

export class KafkaProducer implements IProducer {
  private readonly logger = new Logger(KafkaProducer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly producer: Producer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
  ) {
    // The client must be configured with at least one broker
    this.kafka = new Kafka({
      brokers: [this.broker],
    });
    this.producer = this.kafka.producer();
  }

  async produce(
    message: Message,
    compression?: CompressionTypes,
    acks?: number,
    timeout?: number,
  ) {
    // To produce, at least a topic and a list of messages must be provided
    await this.producer.send({
      topic: this.topic,
      messages: [message],
      compression,
      timeout,
      acks,
    });
  }

  // To produce a message, the producer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the producer events
      // And storing the connection status
      this.producer.on('producer.connect', () => {
        this.logger.log(
          `producer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.producer.on('producer.disconnect', () => {
        this.logger.log(
          `producer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      // Connect to Kafka
      await this.producer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.producer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

不要忘記在 kafka.module.ts 中匯出這些類別

// src/kafka/kafka.cosumer.ts

export class KafkaConsumer implements IConsumer {
  private readonly logger = new Logger(KafkaConsumer.name, { timestamp: true });
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private connected: boolean = false;

  constructor(
    private readonly broker: string,
    private readonly topic: string,
    private readonly groupId: string,
  ) {
    if (this.broker && this.topic && this.groupId) {
      // The client must be configured with at least one broker
      this.kafka = new Kafka({
        brokers: [this.broker],
      });
      this.consumer = this.kafka.consumer({ groupId: this.groupId });
    } else {
      this.logger.warn('Broker, topic and groupId must be provided');
    }
  }

  // The onMessage function will be called when a message is received
  async consume(onMessage: OnMessage) {
    // Here we subscribe to the topic ...
    await this.consumer.subscribe({ topic: this.topic });

    // ... and handle the messages
    await this.consumer.run({
      eachMessage: async (payload) => {
        try {
          this.logger.log(
            `message: ${payload.message.value.toString()} (topic: ${payload.topic}, partition: ${payload.partition})`,
          );

          await onMessage({
            key: payload.message.key?.toString(),
            value: payload.message.value.toString(),
          });
        } catch (err) {
          this.logger.error('error on consuming message', err);
        }
      },
    });
  }

  // To consume, the consumer must be connected
  async connect() {
    try {
      // Just hooking up some logs in the consumer events
      // And storing the connection status
      this.consumer.on('consumer.connect', () => {
        this.logger.log(
          `consumer connected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = true;
      });

      this.consumer.on('consumer.disconnect', () => {
        this.logger.log(
          `consumer disconnected. broker: ${this.broker} topic: ${this.topic}`,
        );
        this.connected = false;
      });

      await this.consumer.connect();
    } catch (err) {
      this.logger.error(
        `failed to connect to kafka. broker: ${this.broker} topic: ${this.topic}`,
        err,
      );
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
  }

  isConnected(): boolean {
    return this.connected;
  }
}

現(xiàn)在我們可以轉(zhuǎn)到溫度模組並實(shí)例化這些 Kafka 類別並開始使用它們。然而,如果溫度模組不必?fù)?dān)心它正在使用哪個 pub-sub 平臺,那就更好了。相反,它應(yīng)該簡單地與注入的生產(chǎn)者和/或消費(fèi)者一起工作,只專注於發(fā)送和接收訊息,而不管底層平臺如何。這樣,如果我們決定將來切換到不同的 pub-sub 平臺,我們不需要對溫度模組進(jìn)行任何更改。

為了實(shí)現(xiàn)這種抽象,我們可以創(chuàng)建 Producer 和 Consumer 類別來處理 Kafka Producer 和 Consumer 實(shí)作的細(xì)節(jié)。讓我們從製作人開始:

// src/kafka/kafka.module.ts

@Module({
  imports: [],
  providers: [KafkaProducer, KafkaConsumer],
  exports: [KafkaProducer, KafkaConsumer],
})
export class KafkaModule {}
// src/producer/producer.service.ts

@Injectable()
export class ProducerService implements OnApplicationShutdown {
  // Expects any producer that implements the IProducer interface
  private readonly producer: IProducer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
  ) {
    this.producer = new KafkaProducer(broker, topic);
  }

  /** The produce() and message can receive more parameters,
   * refer to produce method in src/kafka/kafka.producer.ts
   */
  async produce(message: { key?: string; value: string }) {
    if (!this.producer.isConnected()) {
      await this.producer.connect();
    }
    await this.producer.produce(message);
  }

  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}

現(xiàn)在,消費(fèi)者:

// src/producer/producer.module.ts

@Module({
  imports: [KafkaModule],
  providers: [
    ProducerService,
    {
      provide: 'broker',
      useValue: 'default-broker-value',
    },
    {
      provide: 'topic',
      useValue: 'default-topic-value',
    },
  ],
  exports: [ProducerService],
})
export class ProducerModule {}
// src/consumer/consumer.service.ts

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  // Expects any consumer that implements the IConsumer interface
  private readonly consumer: IConsumer;

  constructor(
    @Inject('broker') broker: string,
    @Inject('topic') topic: string,
    @Inject('groupId') groupId: string,
  ) {
    this.consumer = new KafkaConsumer(broker, topic, groupId);
  }

  async consume(onMessage: OnMessage) {
    if (!this.consumer.isConnected()) {
      await this.consumer.connect();
    }
    await this.consumer.consume(onMessage);
  }

  async onApplicationShutdown() {
    await this.consumer.disconnect();
  }
}

現(xiàn)在,我們可以專注於建立溫度模組。在Temperature.service.ts 檔案中,我們將建立一個方法來註冊溫度,在本例中,該方法將簡單地使用生產(chǎn)者將溫度資料傳送到代理程式。此外,我們將實(shí)作一種方法來處理傳入訊息以用於演示目的。

這些方法可以由另一個服務(wù)或控制器呼叫。但是,為了簡單起見,在本範(fàn)例中,我們將在應(yīng)用程式啟動時利用 onModuleInit 方法直接呼叫它們。

{
  temperature: 42,
  timeStamp: new Date(),
};
// src/interfaces/producer.interface.ts

export interface IProducer {
  produce: (message: any) => Promise<void>;
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  isConnected: () => boolean;
}

就是這樣!透過在 Docker 容器中執(zhí)行代理,您可以啟動應(yīng)用程式來傳送和接收訊息。此外,您可以使用以下命令在代理容器內(nèi)開啟 shell:

docker exec --workdir /opt/kafka/bin/ -it Broker sh

從那裡,您可以直接與代理互動並向應(yīng)用程式發(fā)送訊息、從中接收訊息、創(chuàng)建新主題等。

這是包含本範(fàn)例程式碼的儲存庫。

以上是Kafka 基礎(chǔ)知識與實(shí)際範(fàn)例的詳細(xì)內(nèi)容。更多資訊請關(guān)注PHP中文網(wǎng)其他相關(guān)文章!

本網(wǎng)站聲明
本文內(nèi)容由網(wǎng)友自願投稿,版權(quán)歸原作者所有。本站不承擔(dān)相應(yīng)的法律責(zé)任。如發(fā)現(xiàn)涉嫌抄襲或侵權(quán)的內(nèi)容,請聯(lián)絡(luò)admin@php.cn

熱AI工具

Undress AI Tool

Undress AI Tool

免費(fèi)脫衣圖片

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅(qū)動的應(yīng)用程序,用於創(chuàng)建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費(fèi)的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費(fèi)的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強(qiáng)大的PHP整合開發(fā)環(huán)境

Dreamweaver CS6

Dreamweaver CS6

視覺化網(wǎng)頁開發(fā)工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

Java vs. JavaScript:清除混亂 Java vs. JavaScript:清除混亂 Jun 20, 2025 am 12:27 AM

Java和JavaScript是不同的編程語言,各自適用於不同的應(yīng)用場景。 Java用於大型企業(yè)和移動應(yīng)用開發(fā),而JavaScript主要用於網(wǎng)頁開發(fā)。

掌握J(rèn)avaScript評論:綜合指南 掌握J(rèn)avaScript評論:綜合指南 Jun 14, 2025 am 12:11 AM

評論arecrucialinjavascriptformaintainingclarityclarityandfosteringCollaboration.1)heelpindebugging,登機(jī),andOnderStandingCodeeVolution.2)使用林格forquickexexplanations andmentmentsmmentsmmentsmments andmmentsfordeffordEffordEffordEffordEffordEffordEffordEffordEddeScriptions.3)bestcractices.3)bestcracticesincracticesinclud

JavaScript評論:簡短說明 JavaScript評論:簡短說明 Jun 19, 2025 am 12:40 AM

JavascriptconcommentsenceenceEncorenceEnterential gransimenting,reading and guidingCodeeXecution.1)單inecommentsareusedforquickexplanations.2)多l(xiāng)inecommentsexplaincomplexlogicorprovideDocumentation.3)

如何在JS中與日期和時間合作? 如何在JS中與日期和時間合作? Jul 01, 2025 am 01:27 AM

JavaScript中的日期和時間處理需注意以下幾點(diǎn):1.創(chuàng)建Date對像有多種方式,推薦使用ISO格式字符串以保證兼容性;2.獲取和設(shè)置時間信息可用get和set方法,注意月份從0開始;3.手動格式化日期需拼接字符串,也可使用第三方庫;4.處理時區(qū)問題建議使用支持時區(qū)的庫,如Luxon。掌握這些要點(diǎn)能有效避免常見錯誤。

JavaScript與Java:開發(fā)人員的全面比較 JavaScript與Java:開發(fā)人員的全面比較 Jun 20, 2025 am 12:21 AM

JavaScriptIspreferredforredforwebdevelverment,而Javaisbetterforlarge-ScalebackendsystystemsandSandAndRoidApps.1)JavascriptexcelcelsincreatingInteractiveWebexperienceswebexperienceswithitswithitsdynamicnnamicnnamicnnamicnnamicnemicnemicnemicnemicnemicnemicnemicnemicnddommanipulation.2)

JavaScript:探索用於高效編碼的數(shù)據(jù)類型 JavaScript:探索用於高效編碼的數(shù)據(jù)類型 Jun 20, 2025 am 12:46 AM

javascripthassevenfundaMentalDatatypes:數(shù)字,弦,布爾值,未定義,null,object和symbol.1)numberSeadUble-eaduble-ecisionFormat,forwidevaluerangesbutbecautious.2)

為什麼要將標(biāo)籤放在的底部? 為什麼要將標(biāo)籤放在的底部? Jul 02, 2025 am 01:22 AM

PlacingtagsatthebottomofablogpostorwebpageservespracticalpurposesforSEO,userexperience,anddesign.1.IthelpswithSEObyallowingsearchenginestoaccesskeyword-relevanttagswithoutclutteringthemaincontent.2.Itimprovesuserexperiencebykeepingthefocusonthearticl

什麼是在DOM中冒泡和捕獲的事件? 什麼是在DOM中冒泡和捕獲的事件? Jul 02, 2025 am 01:19 AM

事件捕獲和冒泡是DOM中事件傳播的兩個階段,捕獲是從頂層向下到目標(biāo)元素,冒泡是從目標(biāo)元素向上傳播到頂層。 1.事件捕獲通過addEventListener的useCapture參數(shù)設(shè)為true實(shí)現(xiàn);2.事件冒泡是默認(rèn)行為,useCapture設(shè)為false或省略;3.可使用event.stopPropagation()阻止事件傳播;4.冒泡支持事件委託,提高動態(tài)內(nèi)容處理效率;5.捕獲可用於提前攔截事件,如日誌記錄或錯誤處理。了解這兩個階段有助於精確控制JavaScript響應(yīng)用戶操作的時機(jī)和方式。

See all articles