Cet article présente l'exemple de code de Spring Boot en intégrant Kafka, partagez-le avec tout le monde et laissez une note par vous-même
Environnement système
Utilisez des services Kafka construits sur des serveurs distants
Processus d'intégration
1. Créez un projet Spring Boot et ajoutez des dépendances pertinentes:
<? xml version = "1.0" Encoding = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema-instance" XSI: ScheMalation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion> 4.0.0 </déraphid> <proupId> com.laravelshao.springboot </ grouped> <ArtefactId> Spring-Boot-Integration-Kafka </ ArfactId> <Dersion> 0.0.1-Snapshot </DERNIERSE> <MAPPADING> JAR </ Packaging> <Name> Spring-Boot-Integration-Kafka </ Name> <DR, Description> Projet Demi pour Spring Boot </ GroupID> <parepred> ORG.SPRINGFRAMEWORGE <ArtefactId> Spring-Boot-Starter-Parent </ ArfactId> <Dersion> 2.0.0.release </DERNIERS> <RelativePath /> <! - Recherche Parent du référentiel -> </parent> </ Properties> <Project.build.sourceencoding> utf-8 </project.build.sourceencoding> <project.reporting.outputencoding> utf-8 </project.reportting.outputencoding> <java.version> 1.8 </java.version> </properties> <dependces> <dependency> <proupId> org.springframework.boot </proupId> <artifactive> spring-boot-starter-starter </tifactID> <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-json</artifactId> </dependency> <dependency> <GroupId> org.springframework.boot </prôdId> <ArtifactId> Spring-Boot-starter-test </ artifactid> <ccope> test </cope> </dependency> </dpendance> <uild> </grouns> <Glugin> <proupId> org.springframework.boot </prounid> <ArtefactId> Spring-Boot-Maven-Plugin </ ArfactId> </Glugin> </Glugins> </Duild> </randing>
2. Ajouter des informations de configuration, utilisez le fichier yml ici
Spring: Kafka: Bootstrap-Servers: XXXX: 9092 Producteur: Value-Serializer: org.SpringFramework.Kafka.Support.Serializer.jSonSerializer Consumer: Group-ID: Test Auto-Offset-Reset: Value-Deserializer antérieure: Org.SpringFramework.Kafka.Support.Seririacer.jSiriaSizer Propriétés: Spring: JSON: Trust: Packages: com.Laravelshao.springboot.kafka
3. Créez un objet de message
Message de classe publique {ID entier privé; String privé msg; Message public () {} Message public (ID INTER, String msg) {this.id = id; this.msg = msg; } public Integer getID () {return id; } public void setid (INGER ID) {this.id = id; } public String getmsg () {return msg; } public void setmsg (String msg) {this.msg = msg; } @Override public String toString () {return "message {" + "id =" + id + ", msg = '" + msg +' / '' + '}'; }}4. Créer un producteur
package com.laravelshao.springboot.kafka; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.kafkired; import org.springframework.sterreotype.Component; / ** * créé par Shaoqinghua le 2018/3/23. * / @ ComponentPublic Class producteur {private static logger log = loggerfactory.getLogger (producer.class); @Autowired Private KafKatemplate KafKatemplate; public void Send (thème de chaîne, message de message) {kafkatemplate.send (thème, message); Log.info ("producteur-> sujet: {}, message: {}", thème, message); }}5. Créez un consommateur et utilisez @kafkalistener pour annoter le sujet
package com.laravelshao.springboot.kafka; import org.apache.kafka.clients.consumer.consumerCord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.kafka.annotation.kafkalistener; import org.springframework.sterreotype.Component; / ** * créé par Shaoqinghua le 2018/3/23. * / @ ComponentPublic Class Consumer {private static logger log = loggerfactory.getLogger (Consumer.Class); @KafKAListener (sujets = "test_topic") public void reçoit (ConsumerCord <String, Message> ConsumerCord) {Log.info ("Consumer-> Topic: {}, valeur: {}", ConsumerCord.Topic (), ConsumerCord.Value ()); }}6. Envoyer des tests de consommation
Package com.Laravelshao.springboot; import com.laravelshao.springboot.kafka.message; import com.laravelshao.springboot.kafka.producer; import org.springframework.boot.springapplication; import org.springframework. org.springframework.context.applicationContext; @SpringBootApplicationPublic Class IntegrationKafkaApplication {public static void main (String [] args) lance InterruptedException {ApplicationContext Context = SpringApplication.Run (IntegrationKafkaapplication.Class, Args); Producteur producteur = context.getBean (producer.class); pour (int i = 1; i <10; i ++) {producer.send ("test_topic", nouveau message (i, "message de test" + i)); Thread.Sleep (2000); }}}Vous pouvez voir envoyer des messages et consommer des messages à tour à tour
Problèmes d'exception
Exception de désérialisation (l'objet de message personnalisé n'est pas sous le chemin du package fidèle à Kafka)?
[org.springframework.kafka.kafkalisterendPointContainer # 0-0-C-1] Erreur org.springframework.kafka.Listener.KafkaSageListenerContainer $ écoutner.719 Exception conteneur de contener
org.apache.kafka.common.errors.serializationException: Erreur désérialisant la clé / valeur pour la partition test_topic-0 au décalage 9. Si nécessaire, veuillez consulter le dossier pour continuer la consommation.
Causée par: java.lang.illegalargumentException: la classe 'com.laravelshao.springboot.kafka.mesage' n'est pas dans les packages de confiance: [java.util, java.lang]. Si vous pensez que cette classe est sûre à désérialiser, veuillez fournir son nom. Si la sérialisation n'est effectuée que par une source de confiance, vous pouvez également activer la confiance tout (*).
sur org.springframework.kafka.support.converter.defaultjackson2javatypeMapper.getClassidType (defaultjackson2javatypeMapper.java:139)
sur org.springframework.kafka.support.converter.defaultjackson2javatypeMapper.tojavatype (defaultjackson2javatypemapper.java:113)
sur org.springframework.kafka.support.serializer.jSondeerializer.deserialize (jSondeerializer.java:191)
sur org.apache.kafka.clients.consumer.internals.fetcher.parserecord (fetcher.java:923)
sur org.apache.kafka.clients.consumer.internals.fetcher.access 2600 $ (fetcher.java:93)
sur org.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.fetchRecords (fetcher.java:1100)
sur org.apache.kafka.clients.consumer.internals.fetcher $ partitionrecords.access 1200 $ (fetcher.java:949)
sur org.apache.kafka.clients.consumer.internals.fetcher
sur org.apache.kafka.clients.consumer.internals.fetcher.fettedRecords (fetcher.java:531)
sur org.apache.kafka.clients.consumer.kafkaconsumer.pollonce (kafkaconsumer.java:1146)
à org.apache.kafka.clients.consumer.kafkaconsumer.poll (kafkaconsumer.java:1103)
sur org.springframework.kafka.Listener.KafkaSageListenerContainer $ evenerConsumer.run (KafkaSageListenerContainer.java:667)
sur java.util.concurrent.executors $ runnableadapter.call (exécuteurs.java:511)
sur java.util.concurrent.futuretask.run (futurask.java:266)
sur java.lang.thread.run (thread.java:745)
Solution: ajoutez le package actuel au chemin du package fidèle à Kafka
Spring: Kafka: Consumer: Propriétés: Spring: JSON: Trust: Packages: Com.Laravelshao.springboot.kafka
Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.