亚洲国产日韩欧美一区二区三区,精品亚洲国产成人av在线,国产99视频精品免视看7,99国产精品久久久久久久成人热,欧美日韩亚洲国产综合乱

ホームページ Java &#&チュートリアル Spring EmbeddedKafka プロデューサがコンシューマの確認(rèn)を待つための実裝計(jì)畫(huà)

Spring EmbeddedKafka プロデューサがコンシューマの確認(rèn)を待つための実裝計(jì)畫(huà)

Oct 15, 2025 am 11:36 AM

Spring EmbeddedKafka プロデューサがコンシューマの確認(rèn)を待つための実裝計(jì)畫(huà)

この記事は、Spring EmbeddedKafka テスト シナリオで、プロデューサーがコンシューマーによるメッセージの確認(rèn)をどのように待つかという問(wèn)題を解決することを目的としています。 Kafka のプロデューサーとコンシューマーは獨(dú)立しているため、「acks」はブローカーがメッセージを受信して??保持することを保証するだけであり、コンシューマーとは何の関係もありません。したがって、コンシューマの確認(rèn)を待つプロデューサの機(jī)能を?qū)g裝するには、カスタム ロジックが必要です。この記事では、この機(jī)能を?qū)g裝するためのアイデアと方法を紹介します。

Spring EmbeddedKafka 環(huán)境では、特に統(tǒng)合テストにおいて、プロデューサーによって送信されたメッセージが正しく処理され、コンシューマーによって確認(rèn)されることを保証することが一般的な要件です。ただし、Kafka のデザイン自體は、プロデューサーと消費(fèi)者を切り離しています。プロデューサー側(cè)の acks 設(shè)定は、ブローカー側(cè)の確認(rèn)メカニズムのみを制御し、コンシューマーの確認(rèn)を待つプロデューサーの機(jī)能を直接実裝することはできません。したがって、この目標(biāo)を達(dá)成するには追加のメカニズムを?qū)毪工氡匾ⅳ辘蓼埂?/p>

中心的なアイデア: 中間狀態(tài)の同期メカニズムを?qū)毪工?/strong>

プロデューサーとコンシューマーは獨(dú)立しているため、メッセージの処理後にコンシューマーがプロデューサーに通知する方法が必要です。一般的なアプローチは、次のような共有狀態(tài)ストアを?qū)毪工毪长趣扦埂?/p>

  • 共有 ConcurrentHashMap:単一の JVM テスト環(huán)境に適しており、シンプルで効率的です。
  • Redis またはその他の外部ストレージ:分散テスト環(huán)境に適しており、よりスケーラブルです。

実裝手順:

  1. プロデューサー側(cè):

    • メッセージを送信する前に、一意の ID (UUID など) を生成します。
    • ID をメッセージ ヘッダーまたはメッセージ本文の一部としてコンシューマに送信します。
    • 確認(rèn)を待つマップに ID を保存し、タイムアウトを設(shè)定します。
    • ID がマップから削除されているかどうかを定期的に確認(rèn)してください。タイムアウト後に削除されない場(chǎng)合、メッセージ処理は失敗したとみなされます。
  2. 消費(fèi)者側(cè):

    • メッセージを受信したら、メッセージ內(nèi)の ID を抽出します。
    • メッセージが処理された後、ID は共有狀態(tài)ストアから削除されます。
    • Acknowledgement.acknowledge() を呼び出して、メッセージが消費(fèi)されたことを確認(rèn)します。

サンプルコード (ConcurrentHashMap を使用):

 org.springframework.kafka.core.KafkaTemplate をインポートします。
org.springframework.kafka.support.Acknowledgment をインポートします。
org.springframework.stereotype.Component をインポートします。

java.util.UUIDをインポートします。
インポートjava.util.concurrent.ConcurrentHashMap;
java.util.concurrent.TimeUnitをインポートします。

@成分
パブリック クラス MessageHandler {

    private Final KafkaTemplate<string string> kafkaTemplate;
    privatefinalConcurrentHashMap<string boolean>processedMessages = new ConcurrentHashMap();

    public MessageHandler(KafkaTemplate<string string> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) throws Exception {
        文字列メッセージId = UUID.randomUUID().toString();
        処理済みメッセージ.put(メッセージId、false); // マップに追加して確認(rèn)を待機(jī)しています

        kafkaTemplate.send(トピック、メッセージID、メッセージ).get(); // メッセージが正常に送信されたことを確認(rèn)します // コンシューマの確認(rèn)を待機(jī)し、タイムアウトを設(shè)定します waitForconfirmation(messageId, 5, TimeUnit.SECONDS);
    }

    public void ConsumerMessage(String messageId, String message, Acknowledgment acknowledgment) {
        試す {
            // メッセージを処理します...
            System.out.println("消費(fèi)されたメッセージ: " message);
            処理済みメッセージ.削除(メッセージId); //確認(rèn)されたことを示す ID を削除しますacknowledgment.acknowledge();
        } catch (例外 e) {
            // 例外を処理します...
        }
    }

    private void waitForconfirmation(String messageId、長(zhǎng)いタイムアウト、TimeUnit 単位) throws Exception {
        長(zhǎng)い startTime = System.currentTimeMillis();
        while (processedMessages.containsKey(messageId)) {
            if (System.currentTimeMillis() - startTime >unit.toMillis(timeout)) {
                throw new Exception("メッセージ確認(rèn)待機(jī)中のタイムアウト: " messageId);
            }
            Thread.sleep(100); //過(guò)剰な CPU 使用率を避けるために短時(shí)間スリープします}
    }
}</string></string></string>

KafkaListener を構(gòu)成します。

 org.springframework.kafka.annotation.KafkaListener をインポートします。
org.springframework.kafka.support.Acknowledgment をインポートします。
org.springframework.stereotype.Component をインポートします。

@成分
パブリック クラス KafkaConsumer {

    プライベート最終 MessageHandler メッセージハンドラー;

    public KafkaConsumer(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @KafkaListener(トピック = "your_topic"、groupId = "your_group_id")
    public void listen(String messageId, String message, 確認(rèn)応答) {
        messageHandler.consumeMessage(messageId, メッセージ, 確認(rèn)応答);
    }
}

注意すべき點(diǎn):

  • タイムアウト メカニズム:無(wú)制限に待機(jī)することを避けるために、適切なタイムアウトを設(shè)定する必要があります。
  • 例外処理:コンシューマ側(cè)では、メッセージ処理の失敗は、キューに再登録したり、エラー ログを記録したりするなど、適切に処理する必要があります。
  • メッセージ ID の一意性:メッセージ ID がシステム全體で一意であることを確認(rèn)します。
  • 同時(shí)実行性の問(wèn)題:複數(shù)のプロデューサーが同時(shí)にメッセージを送信する場(chǎng)合は、スレッドセーフなマップや分散ロックの使用など、同時(shí)実行性の問(wèn)題を考慮する必要があります。

要約:

Kafka 自體は、プロデューサーがコンシューマーの確認(rèn)を待つメカニズムを提供しませんが、中間狀態(tài)同期メカニズムを?qū)毪工毪长趣扦长螜C(jī)能を?qū)g現(xiàn)できます。上記のサンプル コードは、ConcurrentHashMap に基づく実裝ソリューションを提供します。 Redis やその他の外部ストレージの使用など、実際のニーズに基づいて、より適切なソリューションを選択できます。重要なのは、プロデューサーとコンシューマーの間で共有狀態(tài)を確立し、メッセージの処理ステータスを同期させることです。このようにして、統(tǒng)合テストの信頼性を効果的に向上させ、メッセージが正しく処理されることを確認(rèn)できます。

以上がSpring EmbeddedKafka プロデューサがコンシューマの確認(rèn)を待つための実裝計(jì)畫(huà)の詳細(xì)內(nèi)容です。詳細(xì)については、PHP 中國(guó)語(yǔ) Web サイトの他の関連記事を參照してください。

このウェブサイトの聲明
この記事の內(nèi)容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰屬します。このサイトは、それに相當(dāng)する法的責(zé)任を負(fù)いません。盜作または侵害の疑いのあるコンテンツを見(jiàn)つけた場(chǎng)合は、admin@php.cn までご連絡(luò)ください。

ホットAIツール

Undress AI Tool

Undress AI Tool

脫衣畫(huà)像を無(wú)料で

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード寫(xiě)真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

寫(xiě)真から衣服を削除するオンライン AI ツール。

Stock Market GPT

Stock Market GPT

AIを活用した投資調(diào)査により賢明な意思決定を?qū)g現(xiàn)

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無(wú)料のコードエディター

SublimeText3 中國(guó)語(yǔ)版

SublimeText3 中國(guó)語(yǔ)版

中國(guó)語(yǔ)版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強(qiáng)力な PHP 統(tǒng)合開(kāi)発環(huán)境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開(kāi)発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

ホットトピック

JavaのClassPathにJARファイルを追加する方法は? JavaのClassPathにJARファイルを追加する方法は? Sep 21, 2025 am 05:09 AM

-CPパラメーターを使用してJARをClassPathに追加して、JVMがjava-cplibrary.jarcom.example.mainなどの內(nèi)部クラスとリソースをロードできるようにします。

Javaでファイルを作成する方法 Javaでファイルを作成する方法 Sep 21, 2025 am 03:54 AM

usefile.createNewfile()tocreatefileonlyifitdoes notexist、avolididingoverwriting;

Javaサービスプロバイダーインターフェイス(SPI)を使用して拡張可能なアプリケーションを構(gòu)築する Javaサービスプロバイダーインターフェイス(SPI)を使用して拡張可能なアプリケーションを構(gòu)築する Sep 21, 2025 am 03:50 AM

Javaspiは、JDKに組み込みのサービス発見(jiàn)メカニズムであり、Serviceloaderを介してインターフェイス指向の動(dòng)的拡張を?qū)g裝しています。 1.サービスインターフェイスを定義し、Meta-INF/Services/の下のインターフェイスのフルネームに命名されたファイルを作成し、実裝クラスの完全に適格な名前を記述します。 2。serviceloader.load()を使用して実裝クラスをロードすると、JVMは自動(dòng)的に構(gòu)成を読み取り、インスタンス化します。 3.インターフェイス契約は、設(shè)計(jì)中に明確にし、優(yōu)先順位と條件付き負(fù)荷をサポートし、デフォルトの実裝を提供する必要があります。 4。アプリケーションシナリオには、マルチペイチャネルアクセスとプラグインの確認(rèn)が含まれます。 5.パフォーマンス、クラスパス、例外分離、スレッドの安全性、バージョンの互換性に注意してください。 6。Java9では、Moduleシステムと組み合わせて提供できます。

Javaにインターフェイスを?qū)g裝する方法は? Javaにインターフェイスを?qū)g裝する方法は? Sep 18, 2025 am 05:31 AM

実裝キーワードを使用して、インターフェイスを?qū)g裝します。クラスは、インターフェイス內(nèi)のすべてのメソッドの特定の実裝を提供する必要があります。複數(shù)のインターフェイスをサポートし、メソッドが公開(kāi)されるようにコンマで區(qū)切られています。 Java 8の後のデフォルトおよび靜的メソッドは、書(shū)き直す必要はありません。

Java GenericsとWildcardsの理解 Java GenericsとWildcardsの理解 Sep 20, 2025 am 01:58 AM

javagenericsprovideCompile-timeTypeSafeTypeTypeTypeTypeTypeTypeTypeTypeTypeTypeTypeTypeTypeTypeTypeTyParaMetersonClasses、interfaces、and methods; wildcards(?、extendStype、?supertype)HeandnwondTypeswithFexibility.1.1.UnunboundCardCardCardCardCardCardCardCardCardCardCardの裝備

HTTPの永続的な接続の深い理解:同じソケットに複數(shù)のリクエストを送信するためのポリシーとプラクティス HTTPの永続的な接続の深い理解:同じソケットに複數(shù)のリクエストを送信するためのポリシーとプラクティス Sep 21, 2025 pm 01:51 PM

この記事では、同じTCPソケットで複數(shù)のHTTP要求を送信するメカニズム、つまりHTTP Persistent Connection(Keep-Alive)を詳細(xì)に説明します。この記事では、HTTP/1.xとHTTP/2プロトコルの違いを明確にし、永続的な接続に対するサーバー側(cè)のサポートの重要性と、接続を正しく処理する方法:応答ヘッダーを閉じる方法を強(qiáng)調(diào)しています。一般的なエラーを分析し、ベストプラクティスを提供することにより、開(kāi)発者が効率的で堅(jiān)牢なHTTPクライアントを構(gòu)築できるようにすることを目指しています。

Javaチュートリアル:ネストされたアレイリストを平らにし、その要素を配列に埋める方法 Javaチュートリアル:ネストされたアレイリストを平らにし、その要素を配列に埋める方法 Sep 18, 2025 am 07:24 AM

このチュートリアルでは、Javaに他のアレイリストを含むネストされたアレイリストを効率的に処理し、そのすべての內(nèi)部要素を単一の配列にマージする方法を詳しく説明しています。この記事では、Java 8ストリームAPIのフラットマップ操作を通じて2つのコアソリューションを提供します。最初にリストにフラット化してから配列を埋め、さまざまなシナリオのニーズを満たすために新しい配列を直接作成します。

Javaで呼び出しメソッドの名前を取得する方法は? Javaで呼び出しメソッドの名前を取得する方法は? Sep 24, 2025 am 06:41 AM

答えは、thread.currentthread()。getStackTrace()を使用してコールメソッド名を取得し、インデックス2から別のコールのソモメトッド名を取得することです。インデックス0はgetStackTraceであるため、1は現(xiàn)在のメソッドであり、2は発信者であり、2は例です。 インパクト。

See all articles