この記事では、Kafkaを統合するSpring Bootの例を紹介し、全員と共有し、自分にメモを残してください
システム環境
リモートサーバー上に構築されたKafkaサービスを使用します
統合プロセス
1.スプリングブートプロジェクトを作成し、関連する依存関係を追加します。
<?xml version = "1.0" encoding = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns:xsi = "http://www.w3.org/2001/xmlschema-instance <http://www.w3.org/2001 xsi:schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.0.sdsd"> <modeleversion> 4.0.0 </modelversion> <グループ</groupidboot> <artifactid> spring-boot-integration-kafka </artifactid> <version> 0.0.1-snapshot </version> <packaging> jar </packaging> <name> spring-boot-integration-kafka </name> <説明<artifactid> spring-boot-starter-parent </artifactid> <version> 2.0.0.release </version> <relativepath/> <! - lookup parent from repository-> </parent> <properties> <build.sourceencoding> utf-8 </project.build.SourceEncoding> <project.reporting.outputencoding> utf-8 </project.reporting.outputencoding> <java.version> 1.8 </java.version> </properties> <dependencies> <shiplency> <mist depthency> <! - kafka-> <dependency> <groupid> org.springframework.kafka </groupid> <artifactid> spring-kafka </artifactid> </dependency> <依存関係> <groupid> <groupid> org.springframework.boot </groupid> <artifactid> spring-boot-starter-test </artifactid> <scope>テスト</scope> </dependency> </dependency> <blubins> <プラグイン> <groupid> org.springframework.boot </groupid> <artifactid> spring-boot-maven-plugin </artifactid> </plugin> </plugins> </build> </project>
2.構成情報を追加して、YMLファイルをここで使用します
春:kafka:bootstrap-servers:xxxx:9092プロデューサー:value-serializer:org.springframework.kafka.support.serializer.jsonserializer Consumer:Group-ID:Test auto-or-set-reset:初期のValue-deserializer: org.springframework.kafka.support.serializer.jsondeserializerプロパティ:spring:json:信頼:パッケージ:com.laravelshao.springboot.kafka
3.メッセージオブジェクトを作成します
パブリッククラスメッセージ{private integer id;プライベート文字列MSG; public message(){} public message(integer id、string msg){this.id = id; this.msg = msg; } public Integer getId(){return id; } public void setid(integer id){this.id = id; } public string getMsg(){return msg; } public void setmsg(string msg){this.msg = msg; } @Override public String toString(){return "message {" + "id =" + id + "、msg = '" + msg +'/'' + '}'; }}4.プロデューサーを作成します
パッケージcom.laravelshao.springboot.kafka; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.Annotation; org.springframework.stereotype.component;/*** Shaoqinghuaが2018/3/23に作成。 */@componentPublicクラスプロデューサー{private static logger log = loggerfactory.getLogger(rosicer.class); @autowiredプライベートKafkatemplate Kafkatemplate; public void send(string topic、message message){kafkatemplate.send(トピック、メッセージ); log.info( "producer-> topic:{}、message:{}"、topic、message); }}5.消費者を作成し、@kafkalistenerを使用してトピックに注釈を付けます
パッケージcom.laravelshao.springboot.kafka; import org.apache.kafka.clients.consumer.consumerrecord; Import org.slf4j.logger; import org.slf4j.loggeractory; Import org.spramework.kafka.annotation.kafkalistener; org.springframework.stereotype.component;/*** Shaoqinghuaが2018/3/23に作成。 */@componentpublic class Consumer {private static logger log = loggerFactory.getLogger(Consumer.Class); @kafkalistener(topics = "test_topic")public void receive(consumerrecord <string、message> consumerrecord){log.info( "consumer-> topic:{}、value:{}"、consumerrecord.topic()、consumerrecord.value()); }}6.消費テストを送信します
パッケージcom.laravelshao.springboot;インポートcom.laravelshao.springboot.kafka.message;インポートcom.laravelshao.springboot.kafka.producer; Import org.springframework.boot.springApplication; Import; org.springframework.boot.autoconfigure.springbootapplication; Import org.springframework.context.applicationcontext; @springbootapplicationクラス統合{public staticid main(string [] args = appecatredexectextectectectectectectectectectectectext SpringApplication.run(IntegrationKafkaApplication.class、args);プロデューサープロデューサー= context.getBean(producer.class); for(int i = 1; i <10; i ++){producer.send( "test_topic"、new Message(i、 "test topic message"+i)); Thread.Sleep(2000); }}}メッセージを送信してメッセージを順番に消費できます
例外の問題
Deserialization Exception(カスタムメッセージオブジェクトは、Kafkaによって信頼されているパッケージパスの下にありません)?
[org.springframework.kafka.kafkalistenerendpointcontainer#0-0-C-1]エラーorg.springframework.kafka.listener.kafkamessageListenercontainer $ sienderconsumer.719コンテナの例外
org.apache.kafka.common.errors.SerializationException:オフセット9でパーティションTest_topic-0のキー/値の脱気を脱ぐことで、必要に応じて、消費を継続するためにレコードを過ぎてください。
原因:java.lang.illegalargumentexception:クラス 'com.laravelshao.springboot.kafka.message'は信頼できるパッケージにありません:[java.util、java.lang]。このクラスが脱必要にされると思われる場合は、その名前を提供してください。シリアル化が信頼できるソースによってのみ行われる場合、すべての信頼を有効にすることもできます(*)。
org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getClassidType(defaultjackson2javatypemapper.java:139)
org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype(defaultjackson2javatypemapper.java:113)
org.springframework.kafka.support.serializer.jsondeserializer.deserialize(jsondeserializer.java:191)
atorg.apache.kafka.clients.consumer.internals.fetcher.parserecord(fetcher.java:923)
atorg.apache.kafka.clients.consumer.internals.fetcher.access $ 2600(fetcher.java:93)
atorg.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.fetchRecords(fetcher.java:1100)
atorg.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.access $ 1200(fetcher.java:949)
atorg.apache.kafka.clients.consumer.internals.fetcher.fetchrecords(fetcher.java:570)
atorg.apache.kafka.clients.consumer.internals.fetcher.fetchedRecords(fetcher.java:531)
atorg.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer.java:1146)
atorg.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:1103)
org.springframework.kafka.listener.kafkamessagelistenercontainer $ shulideconsumer.run(kafkamessagelistenercontainer.java:667)
java.util.concurrent.executors $ runnableadapter.call(executors.java:511)
java.util.concurrent.futuretask.run(futuretask.java:266)
at java.lang.thread.run(thread.java:745)
回避策:カフカが信頼しているパッケージパスに現在のパッケージを追加する
春:カフカ:消費者:プロパティ:春:JSON:信頼:パッケージ:com.laravelshao.springboot.kafka
上記はこの記事のすべての内容です。みんなの学習に役立つことを願っています。誰もがwulin.comをもっとサポートすることを願っています。