Este artigo apresenta o Código de Exemplo de Botagem da Primavera Integrando Kafka, compartilhe -o com todos e deixe uma nota para você
Ambiente do sistema
Use serviços Kafka construídos em servidores remotos
Processo de integração
1. Crie um projeto de inicialização de primavera e adicione dependências relevantes:
<? xml versão = "1.0" coding = "utf-8"?> <Projeto xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.ww3.org/2001/xmlschaMance xsi: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/Maven-4.0.0.xsd"> <modelVersion> 4.0.0 </modelversion> <proupId> com.larshshshshshshshshshsha.schao.0.0.0 </modelversion> <vroupid> com.larshshshshshshshshsshshsha.laravel.sclavel.clelavel.0.0.0 </modelversion> <broupId> com.larshshsha </ArtifactId> <Versão> 0.0.1-SNAPSHOT </Version> <batyaging> jar </catching> <name> spring-boot-integração-kafka </name> <Descrição Projeto Demo para Spring Boot </Descrição <roux> <puridid> org.sringfringfringfringfringfringfringfringfringfringfringfringfringfringfringfringfringfringfringfringfringfringfringFringFringFringFringFringfring </name> <Descrição> Projeto para a primavera </descrição <TRAFACTID> Spring-boot-Starter-Parent </ArtifactId> <versão> 2.0.0.reLease </siers> <RelityPath/> <!-Lookup Parent from Repository-> </parent> </projecties> <projeto.build.sourcencoding> utf-8 </projecties> <Project.Reporting.OutputEncoding> utf-8 </project.reporting.outputencoding> <nava.version> 1.8 </java.version> </sperties> </dependências> <spendency> <voundid> org.springframework.BOOT </GroupId> <stringer Spring- Spring- Spring- <!-kafka-> <pendency> <puperiD> org.springframework.kafka </groupid> <stifactId> spring-kafka </stutifactId> </dependency> <pendence> <purbring> org.springframework.boot </groupid> <stifactId> <pring-bringtern <GrupId> org.springframework.boot </proupId> <TROTIFACTID> Spring-boot-starter-test </ArtifactId> <cope> teste </scope> </dependency> </dependências> <bilt> <clugins> <lugin> <rugnid> org.springFramework.Boot </Gruperid> <TarifactId> Spring-boot-maven-plugin </stifactId> </flugin> </plugins> </fruct> </project>
2. Adicione informações de configuração, use o arquivo YML aqui
Spring: Kafka: Bootstrap-Servers: XXXX: 9092 Produtor: Value-Serializer: org.springframework.kafka.support.serializer.jSonserializer Consumer: Group-Id: Auto-offset-RESET: Anteriorizer.Perializer: ORG.SPRINGFRAFRAFRAFTWORKSOFROFTWORK.KETRAFT.KRAFTWORK.KRAFTWORK.KRAFTWORK.KRAFTWORK.KRAFTWORK. Propriedades: Spring: JSON: Confiável: Pacotes: com.laravelshao.springboot.kafka
3. Crie um objeto de mensagem
Public class Mensagem {ID inteiro privado; msg de sequência privada; public message () {} public mensagem (ID inteira, string msg) {this.id = id; this.msg = msg; } public integer getId () {return id; } public void SetId (ID inteiro) {this.id = id; } public string getMsg () {return msg; } public void setmsg (string msg) {this.msg = msg; } @Override public string tostring () {return "message {" + "id =" + id + ", msg = '" + msg +'/'' ' +'} '; }}4. Crie um produtor
pacote com.laravelshao.springboot.kafka; importar org.slf4j.logger; importar org.slf4j.loggerFactory; importar org.springframework.beans.factory.annotation.autowired; importar org.springframework.kafka.core.kafkkkkkkat.autowired; importar; org.springframework.tereotype.component;/*** Criado por Shaoqinghua em 2018/3/23. */@ComponentPublic Class Producer {private Static Logger Log = LoggerFactory.getLogger (Producer.class); @Autowired Private Kafkatemplate kafkatemplate; public void send (tópico da string, mensagem de mensagem) {kafkatemplate.send (tópico, mensagem); log.info ("Produtor-> tópico: {}, mensagem: {}", tópico, mensagem); }}5. Crie um consumidor e use @kafkalistener para anotar o tópico
pacote com.laravelshao.springboot.kafka; importar org.apache.kafka.clients.consumer.consumerRecord; importar org.slf4j.logger; importação org.slf4j.loggerFactory; importar org.springframework.kafka.annotation.kkalist; org.springframework.tereotype.component;/*** Criado por Shaoqinghua em 2018/3/23. */@ComponentPublic Classe Consumer {Log de logger estático privado = LoggerFactory.getLogger (consumer.class); @Kafkalistener (tópicos = "test_topic") public void recebe (ConsumerRecord <string, message> consumerCord) {log.info ("consumer-> tópico: {}, valor: {}", consumerCord.topic (), consumerCord.value ()); }}6. Enviar testes de consumo
pacote com.laravelshao.springboot; importar com.laravelshao.springboot.kafka.message; importação com.laravelshao.springboot.kafka.produces; importingring.springframework.springApplication; importação; org.springframework.context.applicationContext; @springbootapplicationpublic Classe integrationkafkaApplication {public static void main (string [] args) lança interruptedException {ApplicationContext context = springApplication.run (integrationkapplication.class, args); Produtor produtor = context.getBean (produtor.class); for (int i = 1; i <10; i ++) {produtor.send ("test_topic", nova mensagem (i, "mensagem de tópico de teste"+i)); Thread.sleep (2000); }}}Você pode ver enviar mensagens e consumir mensagens por sua vez
Problema de exceção
Exceção de deserialização (o objeto de mensagem personalizado não está no caminho do pacote confiável por Kafka)?
[org.springframework.kafka.kafkalistenerendpointContainer#0-0-C-1] Erro org.springframework.kafka.listener.kafkamessagelisterContainer $ ouviceConsumer.719 Exceção de contêineres
org.apache.kafka.common.errors.SerializationException: Erro Desserializando a chave/valor para o Partition test_topic-0 no deslocamento 9. Se necessário, procure o registro para continuar o consumo.
Causado por: java.lang.illegalargumentException: a classe 'com.laravelshao.springboot.kafka.message' não está nos pacotes confiáveis: [java.util, java.lang]. Se você acredita que esta classe é segura para deseralizar, forneça o nome. Se a serialização for feita apenas por uma fonte confiável, você também poderá permitir a confiança de todos (*).
em org.springframework.kafka.support.converter.defaultjackson2javatypeMapper.getclassidtype (defaultjackson2javatypeMapper.java:139)
em org.springframework.kafka.support.converter.defaultjackson2javatypeMapper.tojavatype (defaultjackson2JavatyPeMapper.java:113)
em org.springframework.kafka.support.serializer.jSondSerializer.deserialize (JSondeseserializer.java:191)
em org.apache.kafka.clients.consumer.internals.fetcher.parserecord (fetcher.java:923)
em org.apache.kafka.clients.consumer.internals.fetcher.access $ 2600 (fetcher.java:93)
em org.apache.kafka.clients.consumer.internals.fetcher $ partitionrecords.fetchrecords (fetcher.java:1100)
em org.apache.kafka.clients.consumer.internals.fetcher $ partitionrecords.access $ 1200 (fetcher.java:949)
em org.apache.kafka.clients.consumer.internals.fetcher.fetchrecords (fetcher.java:570)
em org.apache.kafka.clients.consumer.internals.fetcher.fetchedRecords (fetcher.java:531)
em org.apache.kafka.clients.consumer.kafkaconsumer.pollonce (kafkaconsumer.java:1146)
em org.apache.kafka.clients.consumer.kafkaconsumer.poll (kafkaconsumer.java:1103)
em org.springframework.kafka.listenener.kafkamessagelisterContainer $ ouvidos
em java.util.concurrent.executores $ runnableAdapter.call (executores.java:511)
em java.util.concurrent.futureTask.run (futureTask.java:266)
em java.lang.thread.run (thread.java:745)
Solução alternativa: adicione o pacote atual ao caminho do pacote confiável por Kafka
Primavera: Kafka: Consumidor: Propriedades: Primavera: JSON: Confiável: Pacotes: com.laravelshao.springboot.kafka
O exposto acima é todo o conteúdo deste artigo. Espero que seja útil para o aprendizado de todos e espero que todos apoiem mais o wulin.com.