Dalam kerja, kami sering menghadapi senario perniagaan yang memerlukan pemprosesan mesej tak segerak Terdapat kaedah pemprosesan yang berbeza bergantung pada sifat mesej.
1. Mesej tidak bebas
Mesej yang tidak bebas biasanya mempunyai kebergantungan berjujukan Pada masa ini, mekanisme pemprosesan mesej akan merosot kepada mod pemprosesan baris gilir, dan hanya boleh tunggal -benang oleh seorang pengguna Memproses mesej.
2. Mesej adalah bebas sepenuhnya
Mesej bebas sepenuhnya boleh diproses secara serentak oleh berbilang pengguna (benang) pada masa yang sama, mencapai keupayaan pemprosesan serentak maksimum.
3. Mesej tidak sepenuhnya bebas
Biasanya ini adalah kes di mana mesej homolog (dari pengeluar yang sama) diperlukan untuk dipesan, dan susunan mesej heterogen adalah tidak relevan.
Pemprosesan mesej dalam senario ini akan menjadi agak rumit Untuk memastikan susunan mesej daripada sumber yang sama, adalah mudah untuk memikirkan mengikat urutan pengguna tetap kepada mesej daripada sumber yang sama tetapi mempunyai masalah besar.
Jika bilangan pengeluar adalah besar, bilangan utas terikat mungkin tidak mencukupi, sudah tentu, sumber utas boleh digunakan semula dan utas yang sama boleh diikat kepada berbilang sumber mesej untuk diproses masalah: interaksi sumber mesej antara.
Pertimbangkan senario berikut:
Pengeluar P1 menjana sejumlah besar mesej dan memasuki baris gilir dan ditugaskan kepada urutan pengguna C1 untuk diproses (C1 mungkin mengambil masa yang lama untuk diproses). kali ini, pengeluar P2 menjana Malangnya, mesej itu juga diberikan kepada urutan pengguna C1 untuk diproses
Kemudian pemprosesan mesej pengeluar P2 akan disekat oleh sejumlah besar mesej daripada P1, menyebabkan saling pengaruh antara P1 dan P2, dan juga Ketidakupayaan untuk menggunakan sepenuhnya rangkaian pengguna lain membawa kepada ketidakseimbangan.
Jadi, kita mesti mempertimbangkan untuk mengelakkan masalah seperti itu. Mencapai ketepatan masa pemprosesan penggunaan (secepat mungkin), pengasingan (mengelakkan campur tangan bersama) dan keseimbangan (memaksimumkan pemprosesan serentak)
Dalam pelaksanaan, akan ada dua mod, yang lebih mudah untuk difikirkan ialah penghantaran benang Model (mod PUSH), kaedah khusus biasanya seperti berikut:
1 Terdapat penghantar mesej global yang meninjau baris gilir untuk mendapatkan semula mesej.
2. Mengikut sumber mesej, hantarkannya ke urutan pengguna yang sesuai untuk diproses.
Mekanisme algoritma pengedaran boleh semudah Hash berdasarkan sumber mesej, atau sekompleks beban semasa setiap urutan pengguna, panjang baris gilir menunggu dan kerumitan mesej, dan boleh dipilih untuk diedarkan berdasarkan analisis yang komprehensif.
Hash Mudah pasti akan menghadapi masalah yang diterangkan dalam senario di atas, tetapi pengiraan pengedaran yang kompleks jelas sangat menyusahkan dan rumit untuk dilaksanakan, dan kecekapan tidak semestinya baik, dan sukar untuk mencapai keseimbangan yang sempurna dalam syarat baki.
Mod kedua menggunakan kaedah PULL, dan benang menarik atas permintaan Kaedah khusus adalah seperti berikut:
1 Sumber mesej terus meletakkan mesej yang dijana ke dalam baris gilir sementara sumber (seperti berikut) Setiap sesi yang ditunjukkan mewakili sumber mesej yang berbeza), dan kemudian meletakkan sesi ke dalam baris gilir menyekat untuk memberitahu urutan untuk diproses
2 tinjauan baris gilir pada masa yang sama dan bersaing untuk mesej (menjamin bahawa hanya satu urutan mengambil Pergi ke
3. Semak sama ada penunjuk baris gilir sedang diproses oleh urutan lain (pelaksanaan memerlukan penyegerakan pengesanan berdasarkan mesej asal yang sama pada peringkat urutan)
4. Jika ia tidak diproses oleh urutan lain, kemudian Tunjukkan status dalam pemprosesan tetapan kawasan penyegerakan, dan proses mesej dalam baris gilir sementara selepas keluar dari kawasan penyegerakan
5 selesai, akhirnya masukkan pemprosesan tetapan kawasan penyegerakan sekali lagi untuk menunjukkan bahawa status melahu
Berikut ialah sekeping kod untuk menerangkan proses pemprosesan benang penggunaan:
public void run() { try { for (AbstractSession s = squeue.take(); s != null; s = squeue.take()) { // first check any worker is processing this session? // if any other worker thread is processing this event with same session, just ignore it. synchronized (s) { if (!s.isEventProcessing()) { s.setEventProcessing(true); } else { continue; } } // fire events with same session fire(s); // last reset processing flag and quit current thread processing s.setEventProcessing(false); // if remaining events, so re-insert to session queue if (s.getEventQueue().size() > 0 && !s.isEventProcessing()) { squeue.offer(s); } } } catch (InterruptedException e) { LOG.warn(e.getMessage(), e); } }
Atas ialah kandungan terperinci Kaedah pemprosesan mesej tak segerak Springboot. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!