В этой статье представлен пример кода Spring Boot, интегрирующей Kafka, поделитесь ею со всеми и оставьте для себя записку
Системная среда
Используйте услуги kafka, построенные на удаленных серверах
Процесс интеграции
1. Создайте проект Spring Boot и добавьте соответствующие зависимости:
<? xml version = "1.0" Encoding = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema-instance" xsi: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <doliderversion> 4.0.0 </modelversion> <groupid> com.laravelshao.springboot </modelversion> <groupid> com.laravelshao.springboot </modelversion> <groupid> com.laravelshao. <artifactId>spring-boot-integration-kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spring-boot-integration-kafka</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <Artifactid> Spring-Boot-Starter-parent </artifactid> <sersive> 2.0.0.release </version> <venterativePath/> <!-Поиск родителя из репозитория-> </parent> <properties> <project.build.sourceencoding> utf-8 </project.build.sourceending> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-json</artifactId> </dependency> <dependency> <groupId> org.springframework.boot </GroupId> <ArtifactId> Spring-boot-starter-test </artifactid> <cracpe> test </scope> </dependency> </depertive> <buld> <blicins> <blicin> <groupid> org.spramework.boot </GroupId> <ratifactid> spring-boot-maven> promaven> plugvin-pramebrame.bout </GroupId> <pracin> promaven> plug. </plugin> </plugins> </build> </project>
2. Добавить информацию о конфигурации, используйте файл YML здесь
Весна: KAFKA: Bootstrap-Servers: XXXX: 9092 Производитель: Serializer: org.springframework.kafka.support.serializer.jsonerializ org.springframework.kafka.support.serializer.jsondeserializer.
3. Создайте объект сообщения
Общедоступное сообщение {Private Integer ID; частная строка MSG; public message () {} public message (integer id, string msg) {this.id = id; this.msg = msg; } public integer getId () {return id; } public void setId (Integer id) {this.id = id; } public String getMsg () {return msg; } public void setMsg (String msg) {this.msg = msg; } @Override public String toString () {return "сообщение {" + "id =" + id + ", msg = '" + msg +'/'' + '}'; }}4. Создать продюсера
пакет com.laravelshao.springboot.kafka; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; импорт org.springformwork.kafka.core.kafkateplate; org.springframework.stereotype.component;/*** Создано Shaoqinghua 2018/3/23. */@ComponentPublic Class Produceer {private Static Logger log = loggerFactory.getLogger (производительность.class); @Autowired частный kafkatemplate kafkatemplate; public void Send (строка тема, сообщение сообщения) {kafkatemplate.send (тема, сообщение); log.info ("Производитель-> Тема: {}, сообщение: {}", тема, сообщение); }}5. Создайте потребителя и используйте @kafkalistener, чтобы аннотировать тему
пакет com.laravelshao.springboot.kafka; import org.apache.kafka.clients.consumer.consumercord; import org.slf4j.logger; import org.slf4j.loggerfactory; импорт org.spramework.kafka.annotation.kafkalistener; org.springframework.stereotype.component;/*** Создано Shaoqinghua 2018/3/23. */@ComponentPublic Class Consumer {Private Static Logger log = loggerFactory.getLogger (consumer.class); @Kafkalistener (themics = "test_topic") public void recee (conmerceercord <string, message> consumerRecord) {log.info ("Consumer-> Тема: {}, значение: {}", ConsumerRecord.topic (), ConsumerRecord.Value ()); }}6. Отправить тесты потребления
Пакет com.laravelshao.springboot; import com.laravelshao.springboot.kafka.message; импорт com.laravelshao.springboot.kafka.producer; импорт org.springframework.boot.springapplication; импорт; org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.applicationcontext; @springbootapplicationpublic class integrationkafkaapplication {public void main (string [] args) trrow SpringApplication.run (IntegrationKafkaApplication.class, args); Производитель продюсер = context.getbean (Produce.class); for (int i = 1; i <10; i ++) {produce.send ("test_topic", новое сообщение (i, "Тематическое сообщение"+i)); Thread.sleep (2000); }}}Вы можете увидеть отправку сообщений и потреблять сообщения по очереди
Проблемы исключения
Исключение Deserialization (пользовательский объект сообщения не находится под пути пакета, доверяя Kafka)?
[org.springframework.kafka.kafkalistenerendpointcontainer#0-0-0-C-1] Ошибка org.springframework.kafka.listener.kafkamessagelistenercontainer $ listererconsumer.719 Исключение контейнера
org.apache.kafka.common.errors.serializationException: Ошибка Deserialization Key/значение для разделения TEST_TOPIC-0 В Offset 9. При необходимости обращайтесь к прошлой записи, чтобы продолжить потребление.
Вызвано: java.lang.illegalargumentException: класс 'com.laravelshao.springboot.kafka.message' не в доверенных пакетах: [java.util, java.lang]. Если вы считаете, что этот класс безопасен для десериализации, пожалуйста, укажите его название. Если сериализация выполняется только доверенным источником, вы также можете позволить доверять All (*).
на org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getClassidtype (defaultjackson2javatypemapper.java:139)
на org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype (defaultjackson2javatypemapper.java:113)
at org.springframework.kafka.support.serializer.jsondeserializer.deserialize (jsondeserializer.java:191)
at org.apache.kafka.clients.consumer.internals.fetcher.parserecord (fetcher.java:923)
at org.apache.kafka.clients.consumer.internals.fetcher.access $ 2600 (fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.fetchRecords (fetcher.java:1100)
at org.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.access $ 1200 (fetcher.java:949)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchRecords (fetcher.java:570)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchedRecords (fetcher.java:531)
at org.apache.kafka.clients.consumer.kafkaconsumer.pollonce (kafkaconsumer.java:1146)
at org.apache.kafka.clients.consumer.kafkaconsumer.poll (kafkaconsumer.java:1103)
на org.springframework.kafka.listener.kafkamessagelistenercontainer $ allusterconsumer.run (kafkamessagelistenercontainer.java:667)
на java.util.concurrent.executors $ runnableadapter.call (experators.java:511)
на java.util.concurrent.futuretask.run (FutureTask.java:266)
на java.lang.thread.run (Thread.java:745)
Обходной путь: добавьте текущий пакет в путь пакета, которым доверяет Кафка
Весна: Кафка: Потребитель: Свойства: Весна: JSON: Доверенный: Пакеты: com.laravelshao.springboot.kafka
Выше всего содержание этой статьи. Я надеюсь, что это будет полезно для каждого обучения, и я надеюсь, что все будут поддерживать Wulin.com больше.