บทความนี้แนะนำรหัสตัวอย่างของ Spring Boot ที่รวม Kafka แบ่งปันกับทุกคนและฝากบันทึกไว้ด้วยตัวคุณเอง
สภาพแวดล้อมระบบ
ใช้บริการ Kafka ที่สร้างขึ้นบนเซิร์ฟเวอร์ระยะไกล
กระบวนการบูรณาการ
1. สร้างโครงการสปริงบูตและเพิ่มการพึ่งพาที่เกี่ยวข้อง:
<? xml version = "1.0" การเข้ารหัส = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/ XSI: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" <ArtIfactId> ฤดูใบไม้ผลิ-บูท-intlegration-kafka </artifactid> <cersion> 0.0.1-snapshot </เวอร์ชัน> <packaging> jar </packaging> <name> Spring-Boot-intlegration-kafka </name> <ArtIfactId> Spring-Boot-Starter-Parent </artifactid> <sersion> 2.0.0.release </เวอร์ชัน> <inelypath/> <!-ผู้ปกครองค้นหาจากที่เก็บ-> </parent> <properties> <project.build.sourceencoding> <Project.Reporting.OutputUntEning> UTF-8 </project.Reporting.UutputPutenCoding> <Java.Version> 1.8 </java.version> </premerties> <การพึ่งพา> <derctiTcter> <!-kafka-> <การพึ่งพา> <roupId> org.springframework.kafka </groupid> <ratifactid> Spring-Kafka </artifactid> </การพึ่งพาอาศัย> <การพึ่งพา> <roupid> org.springframework.boot <Effercy> <merctId> org.springframework.boot </groupId> <ratifactid> การทดสอบสปริง-สตาร์สตาร์เทสต์ </artifactid> <pope> ทดสอบ </cope> <ArtIfactId> Spring-Boot-Maven-Plugin </artifactid> </plugin> </plugins> </uffer
2. เพิ่มข้อมูลการกำหนดค่าใช้ไฟล์ YML ที่นี่
ฤดูใบไม้ผลิ: Kafka: Bootstrap-Servers: XXXX: 9092 ผู้ผลิต: Value-Serializer: org.springframework.kafka.support.serializer.jsonserializer ผู้บริโภค: กลุ่ม -ID: ทดสอบอัตโนมัติ org.springframework.kafka.support.serializer.jsondeserializer: ฤดูใบไม้ผลิ: JSON: เชื่อถือได้: แพ็คเกจ: com.laravelshao.springboot.kafka
3. สร้างวัตถุข้อความ
ข้อความระดับสาธารณะ {ID จำนวนเต็มส่วนตัว; ผงชูรสส่วนตัว ข้อความสาธารณะ () {} ข้อความสาธารณะ (ID จำนวนเต็ม, String msg) {this.id = id; this.msg = msg; } จำนวนเต็มสาธารณะ getId () {return id; } โมฆะสาธารณะ setId (ID จำนวนเต็ม) {this.id = id; } สตริงสาธารณะ getMSG () {return msg; } โมฆะสาธารณะ setMSG (สตริงผงชูรส) {this.msg = msg; } @Override สตริงสาธารณะ toString () {return "ข้อความ {" + "id =" + id + ", msg = '" + msg +'/'' + '}'; -4. สร้างโปรดิวเซอร์
แพ็คเกจ com.laravelshao.springboot.kafka; นำเข้า org.slf4j.logger; นำเข้า org.slf4j.loggerfactory; นำเข้า org.springframework.beans.factory.annotation.autowired; org.springframework.stereotype.component;/*** สร้างโดย Shaoqinghua เมื่อปี 2018/3/23 */@ComponentPublic Class Producer {logger แบบคงที่ส่วนตัว = loggerFactory.getLogger (producer.class); @autowired ส่วนตัว kafkatemplate kafkatemplate; โมฆะสาธารณะส่ง (หัวข้อสตริงข้อความข้อความ) {kafkatemplate.send (หัวข้อข้อความ); log.info ("ผู้ผลิต-> หัวข้อ: {}, ข้อความ: {}", หัวข้อ, ข้อความ); -5. สร้างผู้บริโภคและใช้ @kafkalistener เพื่อใส่คำอธิบายประกอบหัวข้อ
แพ็คเกจ com.laravelshao.springboot.kafka; นำเข้า org.apache.kafka.clients.consumer.consumerrecord; นำเข้า org.slf4j.logger; นำเข้า org.slf4j.loggerfactory; org.springframework.stereotype.component;/*** สร้างโดย Shaoqinghua เมื่อปี 2018/3/23 */@ComponentPublic คลาสผู้บริโภค {logger แบบคงที่ส่วนตัว = loggerFactory.getLogger (consumer.class); @kafkalistener (หัวข้อ = "test_topic") โมฆะสาธารณะได้รับ (cumserrecord <string, message> comverrecord) {log.info ("ผู้บริโภค-> หัวข้อ: {}, ค่า: {}", cumserrecord.topic (), consumerrecord.value ();); -6. ส่งการทดสอบการบริโภค
แพ็คเกจ com.laravelshao.springboot; นำเข้า com.laravelshao.springboot.kafka.message; นำเข้า com.laravelshao.springboot.kafka.producer; นำเข้า org.springframework.boot.springapplication; org.springframework.boot.autoconfigure.springbootapplication; นำเข้า org.springframework.context.applicationContext; @springbootaplicplicationPublic Integrationkafkaapplication args); ผู้ผลิตผู้ผลิต = context.getBean (producer.class); สำหรับ (int i = 1; i <10; i ++) {producer.send ("test_topic", ข้อความใหม่ (i, "ข้อความทดสอบข้อความ"+i)); Thread.sleep (2000); -คุณสามารถดูส่งข้อความและการบริโภคข้อความได้ในทางกลับกัน
ปัญหาข้อยกเว้น
ข้อยกเว้น Deserialization (วัตถุข้อความที่กำหนดเองไม่ได้อยู่ภายใต้เส้นทางแพ็คเกจที่เชื่อถือได้โดย Kafka)?
[org.springframework.kafka.kafkalistenerendpointContainer#0-0-c-1] ข้อผิดพลาด org.springframework.kafka.listener.kafkamessageListenercontainer $ ListenerConsumer.719
org.apache.kafka.common.errors.serializationException: ข้อผิดพลาด deserializing คีย์/ค่าสำหรับพาร์ติชัน test_topic-0 ที่ออฟเซ็ต 9 ถ้าจำเป็นโปรดค้นหาการบันทึกเพื่อดำเนินการต่อ
เกิดจาก: java.lang.illegalargumentException: คลาส 'com.laravelshao.springboot.kafka.message' ไม่ได้อยู่ในแพ็คเกจที่เชื่อถือได้: [java.util, java.lang] หากคุณเชื่อว่าคลาสนี้ปลอดภัยที่จะ deserialize โปรดระบุชื่อ หากการทำให้เป็นอนุกรมนั้นทำโดยแหล่งที่เชื่อถือได้เท่านั้นคุณยังสามารถเปิดใช้งานความน่าเชื่อถือทั้งหมด (*)
ที่ org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getclassidtype (defaultjackson2javatypemapper.java:139)
ที่ org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype (defaultjackson2javatypemapper.java:113)
ที่ org.springframework.kafka.support.serializer.jsondeserializer.deserialize (jsondeserializer.java:191)
ที่ org.apache.kafka.clients.consumer.internals.fetcher.parserecord (fetcher.java:923)
ที่ org.apache.kafka.clients.consumer.internals.fetcher.access $ 2,600 (fetcher.java:93)
ที่ org.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.fetchrecords (fetcher.java:1100)
ที่ org.apache.kafka.clients.consumer.internals.fetcher $ partitionRecords.access $ 1200 (fetcher.java:949)
ที่ org.apache.kafka.clients.consumer.internals.fetcher.fetchrecords (fetcher.java:570)
ที่ org.apache.kafka.clients.consumer.internals.fetcher.fetchedrecords (fetcher.java:531)
ที่ org.apache.kafka.clients.consumer.kafkaconsumer.pollonce (kafkaconsumer.java:1146)
ที่ org.apache.kafka.clients.consumer.kafkaconsumer.poll (kafkaconsumer.java:1103)
ที่ org.springframework.kafka.listener.kafkamessagelistenercontainer $ ListenerConsumer.run (kafkamessageListenercontainer.java:667)
ที่ java.util.concurrent.executors $ runnableadapter.call (executors.java:511)
ที่ java.util.concurrent.futuretask.run (futureTask.java:266)
ที่ java.lang.thread.run (thread.java:745)
วิธีแก้ปัญหา: เพิ่มแพ็คเกจปัจจุบันลงในแพ็คเกจพา ธ ที่เชื่อถือได้โดย Kafka
ฤดูใบไม้ผลิ: Kafka: ผู้บริโภค: คุณสมบัติ: ฤดูใบไม้ผลิ: JSON: เชื่อถือได้: แพ็คเกจ: com.laravelshao.springboot.kafka
ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่ามันจะเป็นประโยชน์ต่อการเรียนรู้ของทุกคนและฉันหวังว่าทุกคนจะสนับสนุน wulin.com มากขึ้น