Cet article présente comment intégrer les messages d'envoi et de réception de Kafka dans le projet Springboot.
Kafka est un système de messages de publication de publication distribué à haut débit, avec les caractéristiques suivantes: Fournir une persistance de messages via la structure des données du disque d'O (1), qui peut maintenir des performances stables pendant longtemps même si le stockage des messages est des terabytes. Haut débit: même le matériel très ordinaire Kafka peut prendre en charge des millions de messages par seconde. Prend en charge le partitionnement des messages via des serveurs Kafka et des grappes de consommateurs. Prise en charge du chargement des données parallèles de Hadoop.
Installer Kafka
Étant donné que l'installation de Kafka nécessite la prise en charge de ZooKeeper, lors de l'installation de Windows, vous devez d'abord installer ZooKeeper, puis installer Kafka. Ci-dessous, je vous donnerai les étapes d'installation de Mac et les points à laquelle faire attention. La configuration de Windows n'est presque pas différente à l'exception des différents emplacements.
Brew Installer Kafka
Oui, c'est aussi simple. Vous pouvez le gérer avec une commande sur le Mac. Ce processus d'installation peut nécessiter un certain temps et il doit être lié à l'état du réseau. Il peut y avoir un message d'erreur dans le message de l'invite d'installation, tel que "Erreur: ne peut pas lier: / usr / local / share / doc / homebrew". Cela n'a pas d'importance, il sera automatiquement ignoré. Enfin, nous avons réussi lorsque nous avons vu ce qui était en dessous.
==> Résumé ðÿ º / usr / local / celar / kafka / 1.1.0: 157 fichiers, 47,8Mb
L'emplacement du fichier de configuration d'installation est le suivant, modifiez simplement le numéro de port en fonction de vos besoins.
Installé Zoopeeper et Kafka Emplacement / USR / Local / Cellar /
Fichier de configuration /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
Démarrer Zookeeper
Copiez le code comme suit: ./ bin / zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
Démarrer Kafka
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
Créez un sujet pour Kafka. Le sujet est nommé test. Vous pouvez le configurer au nom que vous souhaitez. Revenez en arrière et configurez-le correctement dans le code.
Copiez le code comme suit: ./ bin / kafka-topics --create --zookeeper localhost: 2181 --réplication-factor 1 --partitions 1 - Test topic
1. Résoudre les dépendances en premier
Nous ne mentionnerons pas les dépendances liées à Springboot. Les dépendances liées à Kafka ne reposent que sur un package d'intégration Spring-Kafka.
<dependency> <proupId> org.springframework.kafka </rombasid> <Artifactid> printemps-kafka </refactive> <version> 1.1.1.release </DERNIFRATION> </ Dependency>
Ici, nous allons d'abord afficher le fichier de configuration
#================== kafka ======================================== kafka.consumer.servers = 10.93.21.21: 2181kafka.consumer.enable.auto.commit = truekafka.consumer.Session.Timeout = 6000Kafka.Consumer.Auto.Commit.Interval = 100kafka.Consumer.Auto.Offsett .Reset = Dermterkafka.Consumer.Topic = TestKafka.Consumer.Group.id = TestKafka.Consumer.ConCurrency = 10kafka.Producer.Servers = 10.9 3.21.21: 9092kafka.producer.retries = 0kafka.producer.batch.size = 4096kafka.producer.linger = 1kafka.producer.buffer.memory = 40960
2. Configuration: producteur de Kafka
1) Déclarer la configuration et ouvrir la capacité de kafkatemplate via @configuration et @enablekafka.
2) Injecter la configuration de Kafka dans le fichier de configuration Application.Properties via @Value.
3) Générer du haricot, @Bean
package com.kangaroo.sentinel.collect.configuration; import java.util.hashmap; import java.util.map; import org.apache.kafka.clients.produner.producerconfig; import org.apache.kafka.common.serialization.stringerializer; importation; importation. org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enablekafka; importation; org.springframework.kafka.core.defaultkafkaproducerfactory; import org.springframework.kafka.core.kafkatemplate; import org.springframework.kafka.core.producerfactory; @ configuration @ enablekafkapublic class kafkaproducerconfig {. @Value ("$ {kafka.producer.servers}") Serveurs de chaîne privées; @Value ("$ {kafka.producer.retries}") private int ratries; @Value ("$ {kafka.producer.batch.size}") private int batchSize; @Value ("$ {kafka.producer.linger}") private int Linger; @Value ("$ {kafka.producer.buffer.memory}") private int buffermory; Map public <String, Object> produConConfigs () {map <string, object> props = new hashmap <> (); props.put (productonfig.bootstrap_servers_config, serveurs); propuls.put (productonfig.retries_config, raffe); propuls.put (productonfig.batch_size_config, batchSize); propuls.put (productonfig.linger_ms_config, linger); propuls.put (productonfig.buffer_memory_config, buffermory); props.put (productonfig.key_serializer_class_config, stringserializer.class); props.put (productonfig.value_serializer_class_config, stringserializer.class); return accessoires; } public productorfactory <string, string> producerFactory () {return new defaultKafkaproducerFactory <> (produCConfigs ()); } @Bean public kafKatemplate <string, string> kafKatemplate () {return new KafKatemplate <String, String> (producerFactory ()); }}Expérimentez notre producteur et écrivez un contrôleur. Want topic = test, key = key, envoyer un message
package com.kangaroo.sentinel.collect.controller; import com.kangaroo.sentinel.common.response.response; import com.kangaroo.sentinel.common.response.resultCode; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.kafka.core.kafkatemplate; import org.springframework.web.bind.annotation. javax.servlet.http.httpservletResponse; @ restController @ requestmapping ("/ kafka") public class CollectController {protected final logger logger = loggerfactory.getLogger (this.getClass ()); @Autowired Private KafKatemplate KafKatemplate; @RequestMapping (value = "/ send", méthode = requestMethod.get) Réponse publique Sendkafka (HttpServLetRequest Request, HttpServletResponse Response) {try {String Message = request.getParAmter ("Message"); logger.info ("kafka message = {}", message); kafkatemplate.send ("test", "key", message); Logger.info ("Envoyer Kafka avec succès."); Renvoie une nouvelle réponse (resultCode.success, "Envoyer Kafka avec succès", null); } catch (exception e) {logger.error ("Envoyer Kafka a échoué", e); return new Response (resultCode.Exception, "Envoyer Kafka a échoué", null); }}}3. Configuration: consommateur kafka
1) Déclarer la configuration et ouvrir la capacité de kafkatemplate via @configuration et @enablekafka.
2) Injecter la configuration de Kafka dans le fichier de configuration Application.Properties via @Value.
3) Générer du haricot, @Bean
package com.kangaroo.sentinel.collect.configuration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.stringdeserializer; import org.springframework.beans.factory.annotation.value; import; org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enablekafka org.springframework.kafka.config.kafkalistenonConainerFactory; import org.springframework.kafka.core.conconsumerfactory; import org.springframework.kafka.core.defaultkafkaconsumerfactory; org.springframework.kafka.listener.concurrentMessageListenerContainer; Importer java.util.hashmap; Importer java.util.map; @ configuration @ enablekafkapublic classe kafkaconsumerConfig {@Value ("$ {kafka.consumer.servers}); @Value ("$ {kafka.consumer.enable.auto.commit}") Boolean Private ENABLEAUTOCOMMIT; @Value ("$ {kafka.consumer.Session.Timeout}") Station privée SessionTimeout; @Value ("$ {kafka.consumer.auto.commit.interval}") String privé AutoCommitInterval; @Value ("$ {kafka.consumer.group.id}") private String GroupID; @Value ("$ {kafka.consumer.auto.offset.reset}") String privé AutoOffSetreset; @Value ("$ {kafka.conconsumer.concurrency}") private int hic demurrency; @Bean public kafkalistenercontainerfactory <concurrentMessageListenerContainer <String, String >> kafkalistenonConainerFactory () {concurrentkafkalistenonnerfactory <> (string> factory = new concurrentkafkalistenonainerfactory <> (); factory.setConsumerFactory (ConsumerFactory ()); factory.setConcurrency (concurrence); factory.getContainerProperties (). SetPollTimeout (1500); Retour Factory; } public ConsumerFactory <String, String> ConsumerFactory () {return new DefaultKafKaconsumerFactory <> (ConsumerConfigs ()); } public map <string, object> ConsumerConfigs () {map <string, object> propsmap = new hashmap <> (); PropSmap.put (ConsumerConfig.bootstrap_servers_config, serveurs); PropSmap.put (ConsumerConfig.enable_Auto_Commit_Config, ENABLAUTOCOMMIT); PropSmap.put (ConsumerConfig.Auto_Commit_Interval_MS_Config, AutoCommitInterval); PropSmap.put (ConsumerConfig.Session_timeout_MS_Config, SessionTimeout); PropSmap.put (ConsumerConfig.key_deserializer_class_config, stringdeserializer.class); PropSmap.put (ConsumerConfig.value_deserializer_class_config, stringdeserializer.class); PropSmap.put (ConsumerConfig.Group_ID_Config, GroupID); PropSmap.put (ConsumerConfig.auto_offset_reset_config, AutoOffSetRereset); return propsmap; } @Bean Public auditeur auditeur () {return nouvel écouteur (); }}Nouveau auditeur () génère un bean pour traiter les données lues à partir de Kafka. La démo de l'implémentation simple de l'auditeur est la suivante: il suffit de lire et d'imprimer les valeurs de clé et de message
@ L'attribut sujets de Kafkalistener est utilisé pour spécifier le nom du sujet Kafka. Le nom de sujet est spécifié par le producteur de messages, c'est-à-dire qu'il est spécifié par KafKatemplate lors de l'envoi d'un message.
Package com.kangaroo.sentinel.collect.configuration; import org.apache.kafka.clients.consumer.consumerCord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.Kafka Logger Logger.Anlot. = Loggerfactory.getLogger (this.getClass ()); @KafKAListener (sujets = {"test"}) public void écouter (ConsumerCord <?,?> Enregistre) {logger.info ("Kafka's Key:" + Record.key ()); Logger.info ("Valeur de Kafka:" + Record.Value (). ToString ()); }}Conseils:
1) Je n'ai pas présenté comment installer et configurer Kafka. Il est préférable d'utiliser un réseau IP entièrement lié lors de la configuration de Kafka, plutôt que localhost ou 127.0.0.1
2) Il est préférable de ne pas utiliser le propre gardien de Zoo de Kafka pour déployer Kafka, car il peut entraîner l'accès à l'inaccessibilité.
3) Théoriquement, le consommateur devrait lire Kafka via Zookeeper, mais ici nous utilisons l'adresse de Kafkaserver, pourquoi n'y sommes-nous pas entrés en profondeur?
4) Lors de la définition de la configuration du message de surveillance, la valeur de l'élément de configuration group_id_config est utilisée pour spécifier le nom du groupe de consommateurs. S'il y a plusieurs objets d'écoute dans le même groupe, un seul objet d'écouteur peut recevoir le message.
Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.