


Pelan pelaksanaan untuk pengeluar Spring Embeddedkafka untuk menunggu pengesahan pengguna
Oct 15, 2025 am 11:36 AMArtikel ini bertujuan untuk menyelesaikan masalah bagaimana pengeluar menunggu pengguna untuk mengesahkan mesej di Spring EmbeddedKafka Test Scenario. Oleh kerana pengeluar dan pengguna Kafka adalah bebas, `Acks` hanya memastikan bahawa broker menerima dan berterusan mesej, dan tidak ada kaitan dengan pengguna. Oleh itu, logik tersuai diperlukan untuk melaksanakan fungsi pengeluar yang menunggu pengesahan pengguna. Artikel ini akan memperkenalkan idea dan kaedah untuk melaksanakan fungsi ini.
Dalam persekitaran EmbeddedKafka musim bunga, memastikan mesej yang dihantar oleh pengeluar diproses dengan betul dan diakui oleh pengguna adalah keperluan umum, terutama dalam ujian integrasi. Walau bagaimanapun, reka bentuk Kafka itu sendiri memusnahkan pengeluar dan pengguna. Konfigurasi ACKS pada sisi pengeluar hanya mengawal mekanisme pengesahan di sisi broker dan tidak dapat secara langsung melaksanakan fungsi pengeluar yang menunggu pengesahan pengguna. Oleh itu, kita perlu memperkenalkan mekanisme tambahan untuk mencapai matlamat ini.
Idea Teras: Memperkenalkan Mekanisme Penyegerakan Negeri Pertengahan
Oleh kerana pengeluar dan pengguna bebas, kami memerlukan cara bagi pengguna untuk memberitahu pengeluar selepas memproses mesej. Pendekatan yang sama adalah memperkenalkan kedai negeri bersama, sebagai contoh:
- Dikongsi bersama ConcurrentHashMap: Sesuai untuk persekitaran ujian JVM tunggal, mudah dan cekap.
- Redis atau storan luaran yang lain: Sesuai untuk persekitaran ujian yang diedarkan dan lebih berskala.
Langkah Pelaksanaan:
-
Bahagian Pengeluar:
- Sebelum menghantar mesej, menghasilkan ID unik (seperti UUID).
- Hantar ID kepada pengguna sebagai sebahagian daripada tajuk mesej atau badan mesej.
- Simpan ID dalam peta yang menunggu pengesahan dan tetapkan masa tamat.
- Periksa secara berkala sama ada ID dikeluarkan dari peta. Jika ia tidak dikeluarkan selepas tamat tempoh, pemprosesan mesej dianggap telah gagal.
-
Sisi Pengguna:
- Setelah menerima mesej itu, ekstrak ID dalam mesej.
- Selepas mesej diproses, ID dikeluarkan dari kedai negeri yang dikongsi.
- Call AccidenedGement.AckNowledge () untuk mengesahkan bahawa mesej telah dimakan.
Kod Contoh (menggunakan ConcurrentHashMap):
import org.springframework.kafka.core.kafkatemplate; import org.springframework.kafka.support.acknowledgment; import org.springframework.stereotype.component; import java.util.uuid; import java.util.concurrent.concurrenthashmap; import java.util.concurrent.timeunit; @Component Kelas Awam MessageHandler { Kafkatemplate akhir swasta <string string> kafkatemplate; Final ConcurrentHashMap Final <string boolean> ProcessedMessages = new ConcurrentHashMap (); Public MessageHandler (Kafkatemplate <string string> KafKatemplate) { this.kafkatemplate = kafKatemplate; } public void SendMessage (String Topic, String Message) melemparkan Pengecualian { String messageId = uuid.randomuuid (). ToString (); ProcessedMessages.put (MessageId, false); // tambahkan ke peta menunggu pengesahan kafKatemplate.send (topik, mesej, mesej) .get (); // pastikan mesej itu dihantar dengan jayanya // tunggu pengesahan pengguna dan tetapkan Timeout WaitforFirmation (MessageId, 5, TimeUnit.Seconds); } public void consumeMessage (string messageId, mesej rentetan, pengakuan pengakuan) { Cuba { // memproses mesej ... System.out.println ("Mesej yang dimakan:" Mesej); ProcessedMessages.Remove (MessageId); // Keluarkan id, menunjukkan bahawa ia telah disahkan pengakuan.AckNowledge (); } menangkap (pengecualian e) { // Mengendalikan pengecualian ... } } swasta void waitfonfirmation (string messageId, lama tamat, unit timeunit) melemparkan pengecualian { starttime panjang = System.CurrentTimemillis (); manakala (ProcessedMessages.ContainsKey (MessageId)) { jika (System.CurrentTimemillis () - startTime> unit.tomillis (timeout)) { Buang pengecualian baru ("Tamat masa menunggu pengesahan mesej:" MesejID); } Thread.sleep (100); // Tidur sebentar untuk mengelakkan penggunaan CPU yang berlebihan} } }</string></string></string>
Konfigurasikan Kafkalistener:
import org.springframework.kafka.annotation.kafkalistener; import org.springframework.kafka.support.acknowledgment; import org.springframework.stereotype.component; @Component kelas awam Kafkaconsumer { MessageHandler finalHandler swasta; public kafkaconsumer (MessageHandler MessageHandler) { this.messageHandler = MessageHandler; } @KafKalistener (topik = "your_topic", groupId = "your_group_id") public void dengar (rentetan mesej, mesej rentetan, pengakuan pengakuan) { MessageHandler.ConsumeMessage (MessageId, mesej, pengakuan); } }
Perkara yang perlu diperhatikan:
- Mekanisme masa tamat: Masa tamat yang munasabah mesti ditetapkan untuk mengelakkan menunggu selama -lamanya.
- Pengendalian Pengecualian: Di sisi pengguna, kegagalan pemprosesan mesej perlu dikendalikan dengan betul, seperti log semula atau rakaman log ralat.
- Keistimewaan ID Mesej: Pastikan ID Mesej adalah unik di seluruh sistem.
- Isu Konvensyen: Jika beberapa pengeluar menghantar mesej pada masa yang sama, isu-isu konkurensi perlu dipertimbangkan, seperti menggunakan peta selamat thread atau kunci yang diedarkan.
Meringkaskan:
Walaupun Kafka sendiri tidak menyediakan mekanisme untuk pengeluar untuk menunggu pengesahan pengguna, kita dapat mencapai fungsi ini dengan memperkenalkan mekanisme penyegerakan negara pertengahan. Kod sampel di atas menyediakan penyelesaian pelaksanaan berdasarkan concurrenthashmap. Anda boleh memilih penyelesaian yang lebih sesuai berdasarkan keperluan sebenar, seperti menggunakan Redis atau storan luaran yang lain. Kuncinya adalah untuk mewujudkan keadaan bersama antara pengeluar dan pengguna untuk menyegerakkan status pemprosesan mesej. Dengan cara ini, kebolehpercayaan ujian integrasi dapat diperbaiki dengan berkesan untuk memastikan mesej diproses dengan betul.
Atas ialah kandungan terperinci Pelan pelaksanaan untuk pengeluar Spring Embeddedkafka untuk menunggu pengesahan pengguna. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undress AI Tool
Gambar buka pakaian secara percuma

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Stock Market GPT
Penyelidikan pelaburan dikuasakan AI untuk keputusan yang lebih bijak

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Gunakan parameter -CP untuk menambah balang ke ClassPath, supaya JVM dapat memuatkan kelas dan sumber dalamannya, seperti Java -Cplibrary.Jarcom.Example.Main, yang menyokong pelbagai balang yang dipisahkan oleh titik -titik komisul atau kolon, dan juga boleh dikonfigurasikan melalui pembolehubah persekitaran kelas atau manifest.mf.

UseFile.createenewfile () tocreateafileonlyifitdoesneteTexist, mengelakkan penulisan; 2.preferfiles.createFile () darinio.2formodern, safeFileCreationThatFailSiffileexists;

Javaspi adalah mekanisme penemuan perkhidmatan terbina dalam JDK, dan melaksanakan pengembangan dinamik berorientasikan antara muka melalui ServicEloader. 1. Tentukan antara muka perkhidmatan dan buat fail yang dinamakan dengan nama penuh antara muka di bawah Meta-INF/Services/, dan tulis nama yang berkelayakan sepenuhnya kelas pelaksanaan; 2. Gunakan serviceLoader.load () untuk memuatkan kelas pelaksanaan, dan JVM secara automatik akan membaca konfigurasi dan instantiate; 3. Kontrak antara muka harus dijelaskan semasa reka bentuk, keutamaan sokongan dan pemuatan bersyarat, dan menyediakan pelaksanaan lalai; 4. Senario aplikasi termasuk akses saluran pelbagai pembayaran dan pengesahan pemalam; 5. Perhatikan prestasi, classpath, pengasingan pengecualian, keselamatan benang dan keserasian versi; 6. Dalam Java9, menyediakan boleh digunakan dalam kombinasi dengan sistem modul.

JavagenericsprovideCompile-timetypesafetyandeliminatecastingingbyallowingtypeparametersonclass, antara muka, andmethods; wildcards (?,? Extendstype ,? supertype) handleunknowntypeswithflexxibility.usoSoRdoRderWildwildwhoRderWildwildwildwildwilddwherwherwilderwilderwilderwilderwildloundwilder .1

Gunakan kata kunci untuk melaksanakan antara muka. Kelas perlu menyediakan pelaksanaan khusus semua kaedah dalam antara muka. Ia menyokong pelbagai antara muka dan dipisahkan oleh koma untuk memastikan kaedahnya adalah umum. Kaedah lalai dan statik selepas Java 8 tidak perlu ditulis semula.

Artikel ini meneroka mekanisme menghantar banyak permintaan HTTP pada soket TCP yang sama, iaitu, sambungan berterusan HTTP (Keep-Alive). Artikel ini menjelaskan perbezaan antara protokol HTTP/1.X dan HTTP/2, menekankan pentingnya sokongan pelayan untuk sambungan yang berterusan, dan bagaimana untuk mengendalikan sambungan dengan betul: Tepung Response Headers. Dengan menganalisis kesilapan biasa dan menyediakan amalan terbaik, kami berhasrat untuk membantu pemaju membina pelanggan HTTP yang cekap dan mantap.

Tutorial ini terperinci bagaimana untuk memproses arraylists bersarang dengan cekap yang mengandungi arraylists lain di Java dan menggabungkan semua elemen dalamannya ke dalam satu array. Artikel ini akan menyediakan dua penyelesaian teras melalui operasi flatmap API Java 8 Stream: pertama meratakan ke dalam senarai dan kemudian mengisi array, dan secara langsung mewujudkan array baru untuk memenuhi keperluan senario yang berbeza.

Gunakan kelas Properties untuk membaca fail konfigurasi Java dengan mudah. 1. Masukkan config.properties ke dalam direktori sumber, muatkannya melalui getClassLoader (). 2. Jika fail berada di laluan luaran, gunakan FileInputStream untuk memuatkannya. 3. Gunakan GetProperty (Key, DefaultValue) untuk mengendalikan kekunci yang hilang dan memberikan nilai lalai untuk memastikan pengendalian pengecualian dan pengesahan input.
