이 기사는 Kafka를 통합하는 Spring Boot의 예제 코드를 소개하고 모든 사람과 공유하고 직접 메모를 남깁니다.
시스템 환경
원격 서버에 구축 된 Kafka 서비스를 사용하십시오
통합 프로세스
1. 스프링 부팅 프로젝트를 만들고 관련 종속성을 추가하십시오.
<? xml version = "1.0"encoding = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0"xmlns : xsi = "http://www.w3.org/2001/xmlschema-instance" xsi : schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modeversion> 4.0.0 </modelversion> <groupid> com.laravelshao.springboot> <Artifactid> Spring-Boot-Integration-Kafka </artifactid> <snapshot </version> <packaging> jar </packaging> <name> Spring-Boot-Integration-Kafka </name> <description> 스프링 부츠를위한 데모 프로젝트 </description> <부모> groupid> <아티 팩트> 스프링 부트 스타터-팔렌트 </artifactid> <bersion> 2.0.0.release </version </version </version </version> <realiveativePath/> <!-저장소에서 부모 조회 부모-> </부모> <properties> <project.build.sourceencoding> utf-8 </project.build.sourceoding> <project.reporting.outputencoding> utf-8 </project.reporting.outputencoding> <java.version> 1.8 </java.version> </properties> <pectionency> <pectionency> <groupid> org.spramework.spramework.boot </GroupId> <artifactid> spring-boot> <!-Kafka-> <pectionement> <groupid> org.springframework.kafka </groupka </groupid> <atrifactid> spring-kafka </artifactid> </depectency> <prectionency> <groupid> org.springframework.boot </groupid> <arepifactid> spring-boot-sparter-json </artifactid> <groupid> org.springframework.boot </groupid> <artifactid> 스프링-부트-스타터-테스트 </artifactid> <cope> test </scope> </fectionency> </spections> <플러그인> <grugin> <groupid> org.springframework.boot> <Artifactid> Spring-Boot-Maven-Plugin </artifactid> </plugin> </plugins> </build> </project>
2. 구성 정보 추가 여기에서 YML 파일을 사용하십시오
Spring : Kafka : Bootstrap-Servers : XXXX : 9092 프로듀서 : Value-Serializer : org.springframework.kafka.support.serializer.jsonserializer consumer : group-id : test auto-offset-reset : early value-deserializer : org.springeriater .support.serialozer.serializer.serializer.serializer. 속성 : 스프링 : JSON : 신뢰할 수있는 : 패키지 : com.laravelshao.springboot.kafka
3. 메시지 객체를 만듭니다
공개 클래스 메시지 {개인 정수 ID; 개인 문자열 msg; public message () {} public message (정수 ID, 문자열 msg) {this.id = id; this.msg = msg; } public Integer getId () {return id; } public void setId (정수 ID) {this.id = id; } public String getMsg () {return msg; } public void setmsg (문자열 msg) {this.msg = msg; } @override public String toString () {return "message {" + "id =" + id + ", msg = '" + msg +'/'' + '}'; }}4. 프로듀서를 만듭니다
패키지 com.laravelshao.springboot.kafka; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.bean.beans.annotation.autowired; import org.springframework.kafka.core.kafkatemplate; org.springframework.stereotyp.component;/*** Shaoqinghua가 2018/3/23에 작성했습니다. */@componentpublic 클래스 프로듀서 {private static logger log = loggerfactory.getLogger (producer.class); @autowired 개인 kafkatemplate kafkatemplate; public void send (문자열 주제, 메시지 메시지) {kafkatemplate.send (주제, 메시지); log.info ( "producer-> topic : {}, message : {}", topic, message); }}5. 소비자를 만들고 @kafkalistener를 사용하여 주제에 주석을 달다
패키지 com.laravelshao.springboot.kafka; import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import org.slf4j.loggeractory; import org.springframework.kafka.annotation.kafkalistener; org.springframework.stereotyp.component;/*** Shaoqinghua가 2018/3/23에 작성했습니다. */@componentpublic 클래스 소비자 {private static logger log = loggerfactory.getLogger (소비자 .class); @kafkalistener (topics = "test_topic") public void arever (copberecord <string, message> consideRrecord) {log.info ( "소비자-> 주제 : {}, {}", ConsiderRecord.topic (), ConsideRecord.value (); }}6. 소비 테스트를 보내십시오
패키지 com.laravelshao.springboot; import com.laravelshao.springboot.kafka.message; import com.laravelshao.springboot.kafka.producer; import org.springframework.springApplication; import org.springframewort.boot.springoconoconocon org.springframework.context.applicationcontext; @SpringBootApplicationPublic Class IntegrationKafkaApplication {public static void main (String [] args)은 InterruptedException {ApplicationContext Context = sprashApplication.run (integrationKafkaApplication.class, classs); 생산자 프로듀서 = Context.getBean (Producer.class); for (int i = 1; i <10; i ++) {producer.send ( "test_topic", 새 메시지 (i, "test topic message"+i)); Thread.sleep (2000); }}}메시지 보내기를보고 교차로 메시지를 소비 할 수 있습니다.
예외 문제
사제화 예외 (사용자 정의 메시지 객체는 Kafka가 신뢰하는 패키지 경로 아래에 있지 않습니까?)?
[org.springframework.kafka.kafka.kafkalistenerendpointcontainer#0-0-c-1] 오류 org.springframework.kafka.listener.kafkamessagelistenercontainer $ listenerconsumer.719 컨테이너 예외
org.apache.kafka.common.errors.serializationException : 오프셋 9의 파티션 test_topic-0에 대한 오류 손실화 키/값. 필요한 경우 소비를 계속하려면 레코드를 찾아보십시오.
원인 : java.lang.illegalargumentexception : 클래스 'com.laravelshao.springboot.kafka.message'는 신뢰할 수있는 패키지에 없습니다 : [java.util, java.lang]. 이 수업이 사막화하기에 안전하다고 생각되면 그 이름을 제공하십시오. 직렬화가 신뢰할 수있는 소스에 의해서만 수행되는 경우, 모든 AL (*)을 신뢰할 수도 있습니다.
at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getClassIdtype (defaultjackson2javatypemapper.java:139)
at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype (defaultjackson2javatypemapper.java:113)
at org.springframework.kafka.support.serializer.jsondeserializer.deserialize (jsondeserializer.java:191)
at org.apache.kafka.clients.consumer.internals.fetcher.parserecord (fetcher.java:923)
at org.apache.kafka.clients.consumer.internals.fetcher.access $ 2600 (fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.fetcher $ partitionrecords.fetchrecords (fetcher.java:1100)
at org.apache.kafka.clients.consumer.internals.fetcher $ partitionrecords.access $ 1200 (fetcher.java:949)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchrecords (fetcher.java:570)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchedrecords (fetcher.java:531)
at org.apache.kafka.clients.consumer.kafkaconsumer.pollonce (kafkaconsumer.java:1146)
at org.apache.kafka.clients.consumer.kafkaconsumer.poll (kafkaconsumer.java:1103)
at org.springframework.kafka.listener.kafkamessagelistenercontainer $ listenerconsumer.run (kafkamessagelistenercontainer.java:667)
at java.util.concurrent.executors $ runnableadapter.call (executors.java:511)
at java.util.concurrent.futuretask.run (futuretask.java:266)
at java.lang.thread.run (Thread.java:745)
해결 방법 : Kafka가 신뢰하는 패키지 경로에 현재 패키지 추가
봄 : Kafka : 소비자 : 속성 : Spring : JSON : 신뢰할 수있는 : 패키지 : com.laravelshao.springboot.kafka
위는이 기사의 모든 내용입니다. 모든 사람의 학습에 도움이되기를 바랍니다. 모든 사람이 wulin.com을 더 지원하기를 바랍니다.