概要
基本概念
ブローカ
データの処理に使用されるメッセージキューサーバーエンティティ
vhost
RabbitMQサーバーによって作成された仮想メッセージホストには、独自の許可メカニズムがあります。ブローカーで複数のVHOSTを開き、さまざまなユーザーの許可を隔離し、VHOSTも完全に分離されます。
Productor
メッセージ通信のためのデータを生成します
チャネル
メッセージチャネル、AMQPで複数のチャネルを確立できます。各チャネルはセッションタスクを表します。
交換
直接
ルーティングキーによって指定されたキューへの転送メッセージ
ファンアウト
ファンアウト
すべてのバインドされたキューにメッセージを転送することは、ブロードキャストの方法に似ています。
トピック
トピック
ルールに従ってメッセージを転送します。このルールは主にパターンマッチングであり、柔軟性も柔軟に見えるように見えます。
列
列
バインディング
これは、スイッチとキューの関係を表します。バインディングの場合、ルーティングキーに合わせて追加のパラメーターバインディングキーが付属しています。
消費者
メッセージキューを聞いてメッセージデータを読み取る
3つのExchangeモード(ファンアウト、ダイレクト、トピック)の実装がスプリングブート下にあります
POM.xmlの参照Spring-boot-starter-amqp
<Dependency> groupId> org.springframework.boot </groupid> <artifactid> spring-boot-starter-amqp </artifactid> </dependency>
rabbitmq構成を追加します
春:rabbitmq:host:localhostポート:5672ユーザー名:ゲストパスワード:ゲスト
直接
ダイレクトモードでは、一般的に定義するにはキューのみが必要です。スイッチをバインドせずに、内蔵スイッチ(DefaultExchange)を使用します。
@configurationPublic class rabbitp2pconfigure {public static final string queue_name = "p2p-queue"; @bean public queue queue(){return new queue(queue_name、true); }} @runwith(springrunner.class)@springboottest(classes = bootcoretestapplication.class)@slf4jpublic class rabbittest {@autowired private amqptemplate amqptemplate; / *** send*/ @test public void sendlazy()throws arturnedexception {city city = new City(2345566666l、 "direct_name"、 "direct_code"); amqptemplate.convertandsend(rabbitlazyconfigure.que_name、city); } / ***受信* / @test public void receive()throws arturnedexception {object obj = amqptemplate.receiveandConvert(rabbitlazyconfigure.queue_name); assert.notnull(obj、 ""); log.debug(obj.tostring()); }}適用可能なシナリオ:ポイントツーポイント
ファンアウト
ファンアウトモードでは、複数のキューを同じスイッチにバインドする必要があります
@configurationPublic class rabbitfanoutconfigure {public static final string exchange_name = "fanout-exchange"; public static final string fanout_a = "fanout.a"; public static final string fanout_b = "fanout.b"; public static final string fanout_c = "fanout.c"; @bean public queue amessage(){return new queue(fanout_a); } @bean public queue bmessage(){return new queue(fanout_b); } @bean public queue cmessage(){return new queue(fanout_c); } @bean public fanoutexchange fanoutexchange(){return new fanoutexchange(exchange_name); } @bean public binding binding bindingexchangea(queue amessage、fanoutexchange fanoutexchange){return bindingbuilder.bind(amessage).o(fanoutexchange); } @bean public binding binding bindingexchangeb(queue bmessage、fanoutexchange fanoutexchange){return bindingbuilder.bind(bmessage).to(fanoutexchange); } @bean public binding binding bindingexchangec(queue cmessage、fanoutexchange fanoutexchange){return bindingbuilder.bind(cmessage).to(fanoutexchange); }}送信者
@slf4jpublic class sender {@autowired private amqptemplate rabbittemplate; public void sendfanout(object message){log.debug( "Fanoutメッセージを送信<" +メッセージ + ">"); rabbittemplate.convertandsend(rabbitfanoutconfigure.exchange_name、 ""、message); }}@rabbitlistenerを使用して、複数のキューを聞いて消費できます
@slf4j @rabbitlistener(queues = {rabbitfanoutconfigure.fanout_a、rabbitfanoutconfigure.fanout_b、rabbitfanoutconfigure.fanout_c})public class receiver {@rabbithandler public void bug( " + demessage(Stringメッセージ) ">"); }}適用可能なシナリオ
- 大規模なマルチユーザーオンライン(MMO)ゲームを使用して、ランキングアップデートなどのグローバルイベントを処理できます
- スポーツニュースWebサイトは、それを使用してスコアアップデートをほぼリアルタイムでモバイルクライアントに配布できます
- 流通システムはそれを使用してさまざまな状態と構成の更新をブロードキャストします
- グループチャット中に、グループチャットに参加するユーザーにメッセージを配布するために使用されます。
トピック
このパターンは比較的複雑です。簡単に言えば、各キューには独自の懸念事項があります。すべてのメッセージには「タイトル」があります。 Exchangeは、TopicsがFuzzy Matter RouthKeyに関係するキューにメッセージを転送します。
バインディングする場合、「トピック」など、キューが懸念しているトピックを提供します。( "#"は0または複数のキーワードを意味し、「*」はキーワードを意味します。
@configurationPublic class rabbittopicconfigure {public static final string exchange_name = "topic-exchange"; public static final string topic = "topic"; public static final string topic_a = "topic.a"; public static final string topic_b = "topic.b"; @bean public queue queetopic(){return new queue(rabbittopicconfigure.topic); } @bean public queue queetopica(){return new queue(rabbittopicconfigure.topic_a); } @bean public queue queetopicb(){return new queue(rabbittopicconfigure.topic_b); } @bean public topicexchange endged(){topicexchange topicexchange = new topicexchange(exchange_name); topicexchange.setdelayed(true);新しいtopicexchange(exchange_name)を返します。 } @bean public binding binding bindingexchangetopic(queue Queetopic、topicexchange Exchange){return bindingbuilder.bind(Queuetopic).to(Exchange).with(rabbittopicconfigure.topic); } @bean public binding binding bindingexchangetopics(queue queetopica、topicexchange Exchange){return bindingbuilder.bind(queuetopica).to(exchange).with( "topic。#"); }}同時に、3つのキューを聞いてください
@slf4j @rabbitlistener(queues = {rabbittopicconfigure.topic、rabbittopicconfigure.topic_a、rabbittopicconfigure.topic_b})パブリッククラス受信者{@rabbithandler public void receivessage(skise string messag) }}テストを通じて見つけることができます
@runwith(springrunner.class)@springboottest(classes = bootcoretestapplication.class)public class rabbittest {@autowired private amqptemplate rabbittemplate; @test public void sendall(){rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name、 "topic.test"、 "all"); } @test public void sendtopic(){rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name、rabbittopicconfigure.topic、 "send topic"); } @test public void sendtopica(){rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name、rabbittopicconfigure.topic_a、 "send topica"); }}適用可能なシナリオ
- 販売ポイントなどの特定の地理的場所に関するデータを配布する
- 複数の労働者によって完了した舞台裏のタスク、特定の特定のタスクの処理を担当する各労働者
- 株価の更新(およびその他の種類の財務データの更新)
- カテゴリやタグを含むニュースアップデート(たとえば、特定のスポーツやチームなど)
- クラウド内のさまざまな種類のサービスの調整
- 各ビルダーが特定のアーキテクチャまたはシステムのみを処理できる分散アーキテクチャ/システムベースのソフトウェアパッケージ。
遅延キュー
消費の遅れ:
遅延再試行:
スイッチ遅延プロパティをtrueに設定します
@configurationpublic class rabbitlazyconfigure {public static final string queue_name = "lazy-queue-t"; public static final string exchange_name = "lazy-exchange-t"; @bean public queue queue(){return new queue(queue_name、true); } @bean public directExchange defaultExchange(){directExchange directExchange = new DirectExchange(Exchange_Name、true、false); DirectExchange.setDelayed(true); DirectExchangeを返します。 } @bean public binding binding(){return bindingbuilder.bind(queue())。to(defaultexchange())。with(queue_name); }}送信時に遅延時間を設定します
@slf4jpublic class sender {@autowired private amqptemplate rabbittemplate; public void sendlazy(object msg){log.debug( "lazyメッセージを送信<" + msg + ">"); rabbittemplate.convertandsend(rabbitlazyconfigure.exchange_name、rabbitlazyconfigure.queue_name、msg、message-> {message.getMessageProperties()。 }}仕上げる
さまざまなユースケースについては、公式文書を直接確認してください
上記はこの記事のすべての内容です。みんなの学習に役立つことを願っています。誰もがwulin.comをもっとサポートすることを願っています。