Este artículo presenta el código de ejemplo de Boot de primavera que integra kafka, compártelo con todos y deja una nota para ti mismo
Entorno del sistema
Utilice los servicios de Kafka construidos en servidores remotos
Proceso de integración
1. Cree un proyecto de arranque de primavera y agregue dependencias relevantes:
<? xml versión = "1.0" encoding = "utf-8"?> <Project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschemainstance" xsi: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <Modelversion> 4.0.0 </modelversion> <proupid> com.laravelsha.springboot </groupid> <artifactid> spring-boot-integration-kafka </artifactid> <versever> 0.0.1-snapshot </versever> <compafaging> jar </paquete> <name> spring-boot-integration-kafka </name> <scription> Proyecto de demostración para inicio de arranque </solding> <entremid> org.springframework.boot </groupid> <ArifactID> Spring-Boot-Starter-Parent </artifactid> <versión> 2.0.0.release </verversion> <relativePath/> <!-Buscar padre desde repositorio-> </rent> <properties> <ject.Build.sourceEncoding> utf-8 </project.build.sourceencoding <Project.Reporting.outputencoding> UTF-8 </project.rePorting.outputEncoding> <java.version> 1.8 </java.version> </propiences> <pendencies> <pendency> <proupId> org.springFrame.Boot </groupId> <Atifactid> Spring-Boot-Starter </ArtifactId> </artfactiD> </artfactid> </art. < <MoupRoD> org.springframework.boot </groupid> <artifactid> spring-boot-starter-test </arfactid> <cope> test </spope> </pendency> </dependencias> <construcción> <gotins> <glugin> <grupiD> org.springframeWork.Boot </groupid> <artifactID> spring-boot-saven-plugin </artifactid> </glugin> </glugins> </build> </proyecto>
2. Agregue información de configuración, use el archivo YML aquí
Spring: Kafka: Bootstrap-Servers: xxxx: 9092 Productor: Value-Serializer: org.springframework.kafka.support.serializer.jsonserializer consumidor: grupo-id: id de compensación automática:-deseserial de valor anterior: org.springframework.kafka.suppport.serializer.serializer.serializer.Jserializer.jserializer.Jserializer. Propiedades: Spring: JSON: Confianza: PAQUETES: Com.laravelshao.springboot.kafka
3. Cree un objeto de mensaje
Mensaje de clase pública {ID de entero privado; msg de cadena privada; Mensaje público () {} public Message (ID de Integer, String Msg) {this.id = id; this.msg = msg; } public integer getId () {return id; } public void setid (ID de entero) {this.id = id; } public String getMsg () {return msg; } public void setmsg (string msg) {this.msg = msg; } @Override public string toString () {return "Mensaje {" + "id =" + id + ", msg = '" + msg +'/'' + '}'; }}4. Crea un productor
paquete com.laravelshao.springboot.kafka; import org.slf4j.logger; import org.slf4j.loggerFactory; import org.springframework.beans.factory.annotation.aUtoWired; import org.springframework.stereotype.component;/*** creado por Shaoqinghua el 2018/3/23. */@ComponentPublic Class Producer {private static logger log = loggerFactory.getLogger (productor.class); @AUTOWIREDIREDIRD KAFKATEMPLATE KAFKATEMPLATE; public void send (topic de cadena, mensaje de mensaje) {kafkatemplate.send (tema, mensaje); log.info ("Producer-> Topic: {}, Mensaje: {}", tema, mensaje); }}5. Crea un consumidor y usa @kafkalistener para anotar el tema
paquete com.laravelshao.springboot.kafka; import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import og.slf4j.loggerfactory; import og.springfframework.kafka.annotation.kafkalistener; import org.springframework.stereotype.component;/*** creado por Shaoqinghua el 2018/3/23. */@ComponentPublic Class Consumer {private static logger log = loggerFactory.getLogger (consumidor.class); @Kafkalistener (topics = "test_topic") public void recibe (consultorRecord <String, Message> ConsumerRecord) {log.info ("Consumer-> Topic: {}, valor: {}", consumeRecord.topic (), consumecord.value ()); }}6. Enviar pruebas de consumo
paquete com.laravelshao.springboot; import com.laravelshao.springboot.kafka.message; import com.laravelshao.springboot.kafka.producer; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigonfonfonfonfonfonfigure.springapatation; org.springframework.context.applicationContext; @springbootapplicationPublic Class IntegrationKafkaApplication {public static void main (string [] args) lanza interruptedException {ApplicationContext context = SpringApplication.run (IntegrationKafkaApplication.class, args); Productor productor = context.getBean (productor.class); para (int i = 1; i <10; i ++) {producer.send ("test_topic", nuevo mensaje (i, "Mensaje de tema de prueba"+i)); Thread.sleep (2000); }}}Puede ver enviar mensajes y consumir mensajes a su vez
Problemas de excepción
Excepción de deserialización (¿El objeto de mensaje personalizado no está en la ruta del paquete confiable por Kafka)?
[org.springframework.kafka.kafkalistenerendpointcontainer#0-0-c-1] Error org.springframework.kafka.listener.kafkamessageListenerContainer $ oyearconsumer.719 Contenedor Excepción del contenedor
org.apache.kafka.common.errors.serializationException: Error de deserialización de la clave/valor para la partición test_topic-0 en la compensación 9. Si es necesario, busque más allá del registro para continuar el consumo.
Causado por: java.lang.iLLEGALARGUNMEXCEENT: La clase 'com.laravelshao.springboot.kafka.message' no está en los paquetes de confianza: [java.util, java.lang]. Si cree que esta clase es segura para deserializar, proporcione su nombre. Si la serialización solo es realizada por una fuente confiable, también puede habilitar la confianza de todos (*).
en org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getClassidType (defaultjackson2javatypemapper.java:139)
en org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype (defaultjackson2javatypemapper.java:113)
en org.springframework.kafka.support.serializer.jsondeserializer.deserialize (jsondeserializer.java:191)
en org.apache.kafka.clients.consumer.internals.fetcher.parserecord (fetcher.java:923)
en org.apache.kafka.clients.consumer.internals.fetcher.access $ 2600 (fetcher.java:93)
en org.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.fetchRecords (fetcher.java:1100)
en org.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.access $ 1200 (fetcher.java:949)
en org.apache.kafka.clients.consumer.internals.fetcher.fetchRecords (fetcher.java:570)
en org.apache.kafka.clients.consumer.internals.fetcher.fetchedRecords (fetcher.java:531)
en org.apache.kafka.clients.consumer.kafkaconsumer.pollonce (kafkaconsumer.java:1146)
en org.apache.kafka.clients.consumer.kafkaconsumer.poll (kafkaconsumer.java:1103)
en org.springframework.kafka.listener.kafkamessageListenerContainer $ KurcherConsumer.run (kafkamessageListenerContainer.java:667)
en java.util.concurrent.executors $ runnableadapter.call (ejecutors.java:511)
en java.util.concurrent.futuretask.run (futuretask.java:266)
en java.lang.thread.run (Thread.java:745)
Solución alternativa: Agregue el paquete actual a la ruta del paquete confiable por Kafka
Primavera: Kafka: Consumidor: Propiedades: Primavera: JSON: Confianza: PACELES: com.laravelshao.springboot.kafka
Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.