Aperçu
Concepts de base
Courtier
Entité du serveur de file d'attente de messages utilisé pour traiter les données
vhost
L'hôte de message virtuel créé par le serveur RabbitMQ a son propre mécanisme d'autorisation. Plusieurs Vhosts peuvent être ouverts dans un courtier pour l'isolement de la permission pour différents utilisateurs, et les VHosts sont également complètement isolés.
Produit
Générer des données pour la communication de messages
canal
Canal de message, plusieurs canaux peuvent être établis dans AMQP, chaque canal représente une tâche de session.
échange
direct
Transmettre les messages à la file d'attente spécifiée par la clé de routage
fanout
fanout
Transférer les messages à toutes les files d'attente liés est similaire à un moyen de diffusion.
sujet
sujet
Transférer les messages selon les règles. Cette règle est principalement correspondante de motifs, et elle semble également plus flexible.
file d'attente
file d'attente
obligatoire
Il représente la relation entre le commutateur et la file d'attente. Lors de la liaison, il est livré avec une clé de liaison de paramètres supplémentaire pour correspondre à la touche de routage.
Consommateur
Écoutez la file d'attente du message pour lire les données du message
Trois modes d'échange (Fanout, Direct, Topic) Implémentation sous Springboot
Référence Spring-boot-starter-amqp dans pom.xml
<dependency> <proupId> org.springframework.boot </rombasid> <ArtifactId> printemp-boot-starter-amqp </etefactId> </Dependency>
Ajouter la configuration de Rabbitmq
Spring: Rabbitmq: Hôte: Port localhost: 5672 Nom d'utilisateur: Mot de passe invité: invité
direct
En mode direct, seule la file d'attente est nécessaire pour la définir en général. Utilisez le commutateur intégré (DefaultExchange) sans lier le commutateur.
@Configurationpublic class rabbitp2pconfigure {public static final string queue_name = "p2p-queue"; @Bean Public Queue Queue () {return new Queue (queue_name, true); }} @Runwith (springrunner.class) @springboottest (classes = bootCoreTestApplication.class) @ slf4jpublic class rabbitTest {@autowired private amqptemplate amqptemplate; / ** * Send * / @Test public void Sendlazy () lance InterruptedException {City City = New City (234556666l, "Direct_name", "Direct_code"); Amqptemplate.ConvertandSend (RabbitlazyConfigure.Queue_name, ville); } / ** * recevoir * / @Test public void reçoile () lance InterruptedException {objet obj = amqptemplate.receivendConvert (RabbitlazyConfigure.queue_name); Assert.notnull (obj, ""); log.debug (obj.toString ()); }}Scénarios applicables: point à point
fanout
Le mode Fanout nécessite de lier plusieurs files d'attente au même commutateur
@ConfigurationPublic class rabbitfanoutConfigure {public static final string Exchange_name = "fanout-Exchange"; String final statique publique fanout_a = "fanout.a"; String final statique publique fanout_b = "fanout.b"; chaîne finale statique publique fanout_c = "fanout.c"; @Bean Public Queue AMESSAGE () {return new Queue (fanout_a); } @Bean Public Queue BMessage () {return new Queue (fanout_b); } @Bean Public Queue CMessage () {return new Queue (fanout_c); } @Bean public fanoutexchange fanoutexchange () {return new Fanoutexchange (Exchange_name); } @Bean Public Binding BindingExChangea (file d'attente AMESSAGE, FANOUTEXCHANGE FANOUTEXCHANGE) {return binkingBuilder.bind (amessage) .to (fanoutexchange); } @Bean Public Binding BindingExchangeB (file d'attente BMessage, fanoutexchange fanoutexchange) {return bindingBuilder.bind (bMessage) .to (fanoutexchange); } @Bean Public Binding BindingExchangec (file d'attente CMessage, fanoutexchange fanoutexchange) {return bindingBuilder.bind (cMessage) .to (fanoutexchange); }}Expéditeur
@ SLF4JPublic Class Sender {@autowired private amqptemplate labbitTemplate; public void SendFanout (message d'objet) {log.debug ("commencez le message fanout <" + message + ">"); RabbitTemplate.ConvertandSend (RabbitFanoutConfigure.exchange_name, "", message); }}Nous pouvons utiliser @rabbitListener pour écouter plusieurs files d'attente à consommer
@ SLF4J @ RabbitListener (files d'attente = {RabbitFanoutConfigure.fanout_a, RabbitfanoutConfigure.fanout_b, RabbitFanoutConfigure.fanout_c}) Récepteur public {@rabbithandler public Void ReceiveMessage (STRING MESSAGE) {Log.Debug ("Recoïd +"); }} Scénarios applicables
- Les jeux multi-utilisateurs en ligne (MMO) à grande échelle peuvent l'utiliser pour gérer les événements mondiaux tels que les mises à jour de classement
- Les sites Web d'actualités sportifs peuvent l'utiliser pour distribuer des mises à jour de score aux clients mobiles en temps quasi réel
- Le système de distribution l'utilise pour diffuser divers états et mises à jour de configuration
- Pendant le chat de groupe, il est utilisé pour distribuer des messages aux utilisateurs participant au chat de groupe.
sujet
Ce modèle est relativement complexe. Autrement dit, chaque file d'attente a son propre sujet de préoccupation. Tous les messages ont un "titre". Exchange transmettra les messages aux files d'attente dont les sujets concernent les matchs floues Routekey.
Lors de la liaison, fournissez une rubrique qui concerne la file d'attente, telle que "Topic. # (" # "Signifie 0 ou plusieurs mots clés, et" * "signifie un mot-clé.)
@ConfigurationPublic classe RabbitTopicConfigure {public static final String Exchange_name = "topic-Exchange"; public static final String topic = "topic"; Public Static Final String topic_a = "topic.a"; public static final String topic_b = "topic.b"; @Bean Public Queue QueueTopic () {return new Queue (RabbittopicConfigure.topic); } @Bean Public Queue QueuetOpica () {return new Queue (RabbitTopicConfigure.topic_a); } @Bean Public Queue QueueTopicB () {return new Queue (RabbitTopicConfigure.topic_b); } @Bean public topicexchange échange () {topicexchange topicexchange = new topicexchange (exchange_name); topicexchange.setdelayed (true); return new topicexchange (exchange_name); } @Bean Public Binding BindingExChangetopic (queue queuetopic, topicexchange échange) {return binkingBuilder.bind (queuetopic) .to (échange) .with (labbittopicconfigure.topic); } @Bean Public Binding BindingExChangetopics (queue queuetopica, topicexchange échange) {return bindingBuilder.bind (queuetopica) .to (échange) .with ("topic. #"); }}En même temps, écoutez trois files d'attente
@ Slf4j @ RabbitListener (files d'attente = {RabbittopicConfigure.topic, RabbittopicConfigure.topic_a, RabbittopicConfigure.topic_b}) Récepteur de classe publique {@rabbithandler public Void ReceiveMessage (Message à cordes) {Log.Debug ("a reçu <" + "); }}Grâce à des tests, nous pouvons trouver
@Runwith (springrunner.class) @springboottest (classes = bootCoreTestApplication.class) public class rabbitTest {@autowired private amqptemplate labbitTemplate; @Test public void Sendall () {RabbitTemplate.ConvertandSend (RabbitTopicConfigure.Exchange_name, "topic.test", "Send all"); } @Test public void SendTopic () {RabbitTemplate.ConvertandSend (RabbittopicConfigure.Exchange_name, RabbittopicConfigure.topic, "Envoyer un sujet"); } @Test public void SendTopica () {RabbitTemplate.ConvertandSend (RabbittopicConfigure.Exchange_name, RabbittopicConfigure.topic_a, "Envoyer topica"); }} Scénarios applicables
- Distribuer des données sur des emplacements géographiques spécifiques, tels que le point de vente
- Les tâches dans les coulisses effectuées par plusieurs travailleurs, chaque travailleur responsable de la gestion de certaines tâches spécifiques
- Mises à jour du cours des actions (et autres types de mises à jour de données financières)
- Mises à jour des nouvelles impliquant des catégories ou des étiquettes (par exemple, pour des sports ou des équipes spécifiques)
- Coordination de différents types de services dans le cloud
- Package logiciel basé sur l'architecture / système distribué, où chaque constructeur ne peut gérer qu'une seule architecture ou système spécifique.
File d'attente de retard
Consommation retardée:
Réglé retardé:
Définissez la propriété de retard de commutateur sur True
@Configurationpublic class rabbitlazyconfigure {public static final string queue_name = "lazy-queue-t"; public static final String Exchange_name = "Lazy-Exchange-T"; @Bean Public Queue Queue () {return new Queue (queue_name, true); } @Bean public DirectExChange DefaultExchange () {DirectExchange DirectExchange = new DirectExchange (Exchange_name, true, false); DirectExChange.setDelayed (true); return DirectExchange; } @Bean Public Binding Binding () {return bindingBuilder.bind (queue ()). To (defaultExchange ()). Avec (queue_name); }}Définissez le temps de retard lors de l'envoi
@ SLF4JPublic Class Sender {@autowired private amqptemplate labbitTemplate; public void Sendlazy (objet msg) {log.debug ("commencez à envoyer le message paresseux <" + msg + ">"); RabbitTemplate.ConvertandSend (RabbitlazyConfigure.Exchange_name, RabbitlazyConfigure.Queue_name, Msg, Message -> {message.getMessageProperties (). }}Finition
Veuillez vérifier directement les documents officiels pour divers cas d'utilisation
Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.