Ringkasan
Konsep Dasar
Makelar
Entitas server antrian pesan digunakan untuk memproses data
Vhost
Host pesan virtual yang dibuat oleh server RabbitMQ memiliki mekanisme izinnya sendiri. Beberapa vhost dapat dibuka dalam broker untuk isolasi izin untuk pengguna yang berbeda, dan VHost juga sepenuhnya terisolasi.
Produkor
Menghasilkan data untuk komunikasi pesan
saluran
Saluran pesan, beberapa saluran dapat dibuat di AMQP, setiap saluran mewakili tugas sesi.
menukarkan
langsung
Meneruskan pesan ke antrian yang ditentukan oleh routing-key
fanout
fanout
Meneruskan pesan ke semua antrian terikat mirip dengan cara penyiaran.
topik
topik
Meneruskan pesan sesuai dengan aturan. Aturan ini sebagian besar pencocokan pola, dan juga tampak lebih fleksibel.
antre
antre
mengikat
Ini mewakili hubungan antara sakelar dan antrian. Saat mengikat, ia hadir dengan parameter tambahan untuk mencocokkan kunci perutean.
Konsumen
Dengarkan antrian pesan untuk membaca data pesan
Tiga mode pertukaran (fanout, langsung, topik) Implementasi di bawah Springboot
Referensi Spring-Boot-Starter-AMQP di pom.xml
<dependency> <GroupId> org.springframework.boot </groupId> <ArTifactId> Spring-boot-starter-AMQP </RiTtifacTID> </dependency>
Tambahkan konfigurasi RabbitMQ
Spring: Rabbitmq: Host: Localhost Port: 5672 Nama Pengguna: Kata Sandi Tamu: Tamu
langsung
Dalam mode langsung, hanya antrian yang diperlukan untuk mendefinisikannya secara umum. Gunakan sakelar bawaan (DefaultExchange) tanpa mengikat sakelar.
@ConfigurationPublic kelas Rabbitp2pConfigure {public static final string queue_name = "p2p-queue"; @Bean Public Queue Queue () {return baru antrian (queue_name, true); }} @Runwith (springrunner.class) @springboottest (class = bootcoretestApplication.class)@slf4jpublic kelas rabbittest {@autowired private amqptemplate amqptemplate; / *** Kirim*/ @test public void sendLazy () melempar interruptedException {City City = New City (23455666l, "Direct_name", "Direct_Code"); amqptemplate.convertandsend (rabbitLazyConfigure.queue_name, kota); } / *** menerima* / @test public void menerima () melempar interruptedException {objek obj = amqptemplate.receiveandconvert (rabbitLazyConfigure.queue_name); Assert.notnull (obj, ""); log.debug (obj.tostring ()); }}Skenario yang berlaku: point-to-point
fanout
Mode fanout membutuhkan mengikat beberapa antrian ke sakelar yang sama
@ConfigurationPublic kelas RabbitFanoutConfigure {public static final string Exchange_name = "fanout-exchange"; public static final string fanout_a = "fanout.a"; public static final string fanout_b = "fanout.b"; public static final string fanout_c = "fanout.c"; @Bean public antreue amessage () {return baru antrian (fanout_a); } @Bean Public Queue BMessage () {return baru antrian (fanout_b); } @Bean Public Queue CMessage () {return baru antrian (fanout_c); } @Bean fanoutexchange fanoutexchange () {return fanoutexchange baru (Exchange_name); } @Bean Public Binding BindingExchangea (Antrian Amessage, FanOutExchange FanOutExchange) {return bindingBuilder.bind (amessage) .to (fanoutexchange); } @Bean Public Binding BindingExChangeB (Queue BMessage, FanOutExchange FanOutExchange) {return bindingBuilder.bind (bmessage) .to (fanoutexchange); } @Bean Public Binding BindingExChangec (Queue CMessage, FanOutExchange FanOutExchange) {return bindingBuilder.bind (cMessage) .to (fanoutexchange); }}Pengirim
@Slf4jpublic class pengirim {@Autowired private amqptemplate rabbittemplate; public void sendFanout (pesan objek) {log.debug ("Mulai kirim pesan fanout <" + pesan + ">"); rabbittemplate.convertandsend (rabbitfanoutconfigure.exchange_name, "", pesan); }}Kita dapat menggunakan @rabbitlistener untuk mendengarkan beberapa antrian untuk dikonsumsi
@Slf4j @rabbitlistener (queuees = {rabbitfanoutconfigure.fanout_a, rabbitfanoutconfigure.fanout_b, rabbitfanoutconfigure.fanout_c}); "log }} Skenario yang berlaku
-Game multi-pengguna online skala besar (MMO) dapat menggunakannya untuk menangani acara global seperti pembaruan peringkat
- Situs web berita olahraga dapat menggunakannya untuk mendistribusikan pembaruan skor ke klien seluler di dekat waktu nyata
- Sistem distribusi menggunakannya untuk menyiarkan berbagai negara bagian dan pembaruan konfigurasi
- Selama obrolan grup, ini digunakan untuk mendistribusikan pesan kepada pengguna yang berpartisipasi dalam obrolan grup.
topik
Pola ini relatif kompleks. Sederhananya, setiap antrian memiliki topik keprihatinannya sendiri. Semua pesan memiliki "judul". Exchange akan meneruskan pesan ke antrian yang topiknya menyangkut kecocokan fuzzy routeKey.
Saat mengikat, berikan topik yang dikhawatirkan antrian, seperti "topik.# ("# "Berarti 0 atau beberapa kata kunci, dan"*"berarti kata kunci.)
@ConfigurationPublic kelas RabbittopicConfigure {public static final string Exchange_name = "Topic-Exchange"; Topik String Akhir Statis Publik = "Topik"; string final statis public topic_a = "topic.a"; string final statis public topic_b = "topic.b"; @Bean Public Queue queuetopic () {return baru antrian (rabbittopicConfigure.topic); } @Bean antrian publik queuetopica () {return baru antrian (rabbittopicconfigure.topic_a); } @Bean antrian publik queuetopicb () {return baru antrian (rabbittopicconfigure.topic_b); } @Bean Public TopicexChange Exchange () {TopicexChange TopicexChange = TopicexChange baru (Exchange_Name); topicexchange.setDelayed (true); mengembalikan TopicexChange baru (Exchange_Name); } @Bean Public Binding BindingExchangetopic (Queue Queuetopic, Topicexchange Exchange) {return bindingBuilder.bind (queuetopic) .to (Exchange) .with (rabbittopicconfigure.topic); } @Bean Public Binding BindingExchangetopics (Queue Queuetopica, TopicexChange Exchange) {return bindingBuilder.bind (queuetopica) .to (Exchange) .with ("Topic.#"); }}Pada saat yang sama, dengarkan tiga antrian
@Slf4j @rabbitListener (queues = {rabbittopicconfigure.topic, rabbittopicconfigure.topic_a, rabbittopicconfigure.topic_b}) penerima kelas public {@rabbithandler <"batal public nequid (" Log. " }}Melalui pengujian yang dapat kita temukan
@Runwith (springrunner.class) @springboottest (class = bootcoretestApplication.class) kelas publik rabbittest {@autowired private amqptemplate rabbittemplate; @Test public void sendAll () {rabbittemplate.convertandsend (rabbittopicconfigure.exchange_name, "topic.test", "kirim semua"); } @Test public void sendtopic () {rabbittemplate.convertandsend (rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic, "kirim topik"); } @Test public void sendtopica () {rabbittemplate.convertandsend (rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic_a, "kirim topica"); }} Skenario yang berlaku
- Mendistribusikan data tentang lokasi geografis tertentu, seperti titik penjualan
- Tugas di belakang panggung diselesaikan oleh banyak pekerja, setiap pekerja yang bertanggung jawab untuk menangani tugas -tugas tertentu tertentu
- Pembaruan Harga Saham (dan jenis pembaruan data keuangan lainnya)
- Pembaruan berita yang melibatkan kategori atau tag (misalnya, untuk olahraga atau tim tertentu)
- Koordinasi berbagai jenis layanan di cloud
- Paket Perangkat Lunak Arsitektur/Sistem Terdistribusi, di mana setiap pembangun hanya dapat menangani satu arsitektur atau sistem tertentu.
Menunda antrian
Konsumsi Tertunda:
RETRY TERLAWAT:
Atur properti sakelar tunda ke true
@ConfigurationPublic kelas RabbitLazyConfigure {public static final string queue_name = "malas-queue-t"; public static final string Exchange_name = "Lazy-Exchange-t"; @Bean Public Queue Queue () {return baru antrian (queue_name, true); } @Bean Public DirectExchange DefaultExchange () {DirectExchange DirectExchange = New DirectExchange (Exchange_Name, true, false); DirectExchange.setDelayed (true); mengembalikan DirectExchange; } @Bean Binding Binding Binding () {return bindingBuilder.bind (queue ()). To (defaultExchange ()). Dengan (queue_name); }}Atur waktu tunda saat mengirim
@Slf4jpublic class pengirim {@Autowired private amqptemplate rabbittemplate; public void sendLazy (objek msg) {log.debug ("mulai kirim pesan malas <" + msg + ">"); rabbittemplate.convertandsend (rabbitLazyConfigure.exchange_name, rabbitLazyConfigure.queue_name, msg, pesan -> {message.getMessageProPerties (). setHeader ("X -DELAY", 10000); pesan kembali;}); }}Menyelesaikan
Silakan periksa dokumen resmi secara langsung untuk berbagai kasus penggunaan
Di atas adalah semua konten artikel ini. Saya berharap ini akan membantu untuk pembelajaran semua orang dan saya harap semua orang akan lebih mendukung wulin.com.