تقدم هذه المقالة مثال رمز التمهيد الربيعي الذي يدمج Kafka ، وشاركه مع الجميع ، واترك ملاحظة لنفسك
بيئة النظام
استخدم خدمات كافكا المبنية على الخوادم البعيدة
عملية التكامل
1. إنشاء مشروع التمهيد الربيع وأضف التبعيات ذات الصلة:
<؟ XSI: schemalocation = "http://maven.apache.org/pom/4.0.0 <StifactId> Spring-boot-integration-kafka </stifactid> <الإصدار> 0.0.1-snapshot </version> <cplaging> jar </placking> <name> spring-boot-integration-kafka </same> <StifactId> Spring-boot-starter-parent </stifactid> <soper> 2.0.0.release </version> <real-path/> <!-lookup parent from ropository-> </parent> <project> <project.build.sourceencoding> utf-8 </project.build.build.source <project.reporting.outputencoding> utf-8 </project.reporting.outputencoding> <java.version> 1.8 </java.vential> </properties> <ependencies> <reperency> <nevidency> org.springframework.boot </rougeid> <!-kafka-> <reperency> <roupiD> org.springframework.kafka </rougeid> <Stifactid> spring-kafka </stifactid> </sependency> <redence> <roupiD> org.springframework.boot </groupid> <Groper> org.springframework.boot </groupId> <StifactId> Spring-boot-starter-test </artifactid> <scope> اختبار </scope> </sependency> </reperence> <build> <clupins> <clupin> <StifactId> Spring-Boot-Maven-Plugin </stifactid> </sultwing> </sults> </build> </project>
2. أضف معلومات التكوين ، استخدم ملف YML هنا
الربيع: kafka: bootstrap-servers: xxxx: 9092 المنتج: القيمة serializer: org.springframework.kafka.support.serializer.jsonserializer المستهلك: مجموعة المعرفة: اختبار Auto-Reset-RESET: VALE-DESERIALIZER EPAND: org.springframework.kafka.support.serializer.jsondeserializer الخصائص: الربيع: JSON: موثوق به: حزم: com.laravelshao.springboot.kafka
3. إنشاء كائن رسالة
رسالة الفئة العامة {private integer id ؛ سلسلة خاصة MSG ؛ رسالة عامة () {} رسالة عامة (معرف Integer ، String msg) {this.id = id ؛ this.msg = msg ؛ } integer getId () {return id ؛ } public void setId (integer id) {this.id = id ؛ } السلسلة العامة getMsg () {return msg ؛ } public void setMsg (String msg) {this.msg = msg ؛ } Override public string toString () {return "message {" + "id =" + id + "، msg = '" + msg +'/'' + '}' ؛ }}4. إنشاء منتج
package com.laravelshao.springboot.kafka ؛ استيراد org.slf4j.logger ؛ استيراد org.slf4j.loggerfactory ؛ استيراد org.springframework.beans.factory.annotation.autowired org.springframework.stereotype.component ؛/*** تم إنشاؤه بواسطة Shaoqinghua في 2018/3/23. */@componentpublic class producer {private static logger log = loggerFactory.getLogger (propert.class) ؛ @autowired kafkatemplate kafkatemplate ؛ إرسال الفراغ العام (موضوع السلسلة ، رسالة الرسالة) {kafkatemplate.send (الموضوع ، الرسالة) ؛ log.info ("Propert-> topic: {} ، message: {}" ، topic ، message) ؛ }}5. إنشاء مستهلك واستخدم kafkalistener لتوضيح الموضوع
package com.laravelshao.springboot.kafka ؛ استيراد org.apache.kafka.clients.consumer.consumerrecord ؛ استيراد org.slf4j.logger ؛ استيراد org.slf4j.loggerfactory org.springframework.stereotype.component ؛/*** تم إنشاؤه بواسطة Shaoqinghua في 2018/3/23. */@componentpublic class consumer {private static logger log = loggerFactory.getLogger (consumer.class) ؛ kafkalistener (thispics = "test_topic") استلام الفراغ العام (الاستهلاك <string ، message> consumerRecord) {log.info ("consumer-> topic: {} ، value: {}" ، consumerRecord.topic () ، consumerRecord.value ()) ؛ }}6. إرسال اختبارات الاستهلاك
package com.laravelshao.springboot ؛ استيراد com.laravelshao.springboot.kafka.message ؛ استيراد com.laravelshao.springboot.kafka.producer ؛ استيراد org.springframework.boot.springapplication ؛ استيراد org.springframework.boot.autoconfigure.SpringBootApplication ؛ استيراد org.springframework.context.applicationContext ؛ springbootapplicationpublication integrationkafkaapplication {public static void Main ( springapplication.run (integrationkafkaapplication.class ، args) ؛ منتج المنتج = context.getBean (Proper.Class) ؛ لـ (int i = 1 ؛ i <10 ؛ i ++) {producer.send ("test_topic" ، رسالة جديدة (i ، "test topic message"+i)) ؛ thread.sleep (2000) ؛ }}}يمكنك رؤية إرسال الرسائل واستهلاك الرسائل بدورها
مشاكل الاستثناء
استثناء Deserialization (كائن الرسالة المخصص ليس ضمن مسار الحزمة الموثوق به من قبل كافكا)؟
[org.springframework.kafka.kafkalistenerendpointContainer#0-0-C-1] خطأ org.springframework.kafka.listener.kafkamessagelistenercontainer $ stistererconsumer.719 container container
org.apache.kafka.common.errors.SerializationException: خطأ مفتاح/قيمة خطأ في القسم test_topic-0 في الإزاحة 9. إذا لزم الأمر ، يرجى البحث عن السجل لمواصلة الاستهلاك.
سبب: java.lang.illegalargumentexception: فئة 'com.laravelshao.springboot.kafka.message' ليست في الحزم الموثوق بها: [java.util ، java.lang]. إذا كنت تعتقد أن هذا الفصل آمن للفرار ، فيرجى تقديم اسمه. إذا تم إجراء التسلسل فقط من خلال مصدر موثوق به ، فيمكنك أيضًا تمكين الثقة (*).
في org.springframework.kafka.support.converter.defaultjackson2javatypempper.getClassidType (defaultjackson2javatypempper.java:139)
في org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype (defaultjackson2javatypempper.java:113)
في org.springframework.kafka.support.serializer.jsondeserializer.deserialize (jsondeserializer.java:191)
at org.apache.kafka.clients.consumer.internals.fetcher.parserecord (fetcher.java:923)
at org.apache.kafka.clients.consumer.internals.fetcher.access $ 2600 (fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.FetchRecords (fotcher.java:1100)
at org.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.Access $ 1200 (fetcher.java:949)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchRecords (fotcher.java:570)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchedRecords (fotcher.java:531)
at org.apache.kafka.clients.consumer.kafkaconsumer.pollonce (Kafkaconsumer.java:1146)
at org.apache.kafka.clients.consumer.kafkaconsumer.poll (kafkaconsumer.java:1103)
في org.springframework.kafka.listener.kafkamessageListenerContainer $ listenerconsumer.run (kafkamessagelistenercontainer.java:667)
في java.util.concurrent.executors $ runnableadapter.call (Executors.java:511)
في java.util.concurrent.futureTask.run (FutureTask.java:266)
في java.lang.thread.run (thread.java:745)
الحل البديل: أضف الحزمة الحالية إلى مسار الحزمة الموثوق بها من قبل كافكا
الربيع: كافكا: المستهلك: الخصائص: الربيع: JSON: موثوق به: حزم: com.laravelshao.springboot.kafka
ما سبق هو كل محتوى هذه المقالة. آمل أن يكون ذلك مفيدًا لتعلم الجميع وآمل أن يدعم الجميع wulin.com أكثر.