Artikel ini memperkenalkan contoh kode boot musim semi mengintegrasikan kafka, bagikan dengan semua orang, dan tinggalkan catatan untuk diri Anda sendiri
Lingkungan sistem
Gunakan layanan kafka yang dibangun di server jarak jauh
Proses integrasi
1. Buat Proyek Boot Musim Semi dan tambahkan dependensi yang relevan:
<? Xml Version = "1.0" encoding = "UTF-8"?> <Project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema xsi: schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <ModelVersion> 4.0.0 </ModelVersion> <groupid> com.larafavelshon <ArtifactId> Spring-boot-integration-kafka </artifactid> <version> 0.0.1-Snapshot </version> <packaging> jar </packaging> <name> Spring-boot-Integration-kafka </name> <crection> Proyek Demo untuk Spring Boot </description> <tentrup> <groupding> <ArTifactId> Spring-Boot-Starter-Parent </artifactid> <version> 2.0.0.release </version> <relativePath/> <!-Pencarian orang tua dari repositori-> </parents> <properties> <Project.build.sourceEncoding> UTF-8 </project.build.source.sourceEncoding> UTF-8 </project.build.source.sourceCoding> UTF-8 </project.build.source <Project.Reporting.OutputEncoding> UTF-8 </project.reporting.outputEncoding> <Java.version> 1.8 </java.version> </Properties> </Artifactid> <TROMPOCID> <TROPBOCTIF> <TROPBOCTIF.BOOT.BOOT </groupid> <Artifacency> <RoGROPID> org.springframework.boot </groupid> <Artifactid> <!-kafka-> <dependency> <GroupId> org.springframework.kafka </groupid> <ArtifactId> Spring-kafka </artifactid> </dependency> <dependency> <RoupId> org.springframework.boot </groupid> <Arttifactid> SPRING-BOT-BOTONER-JOCT.JOT </groupid> <ArTtifactid> Spring-stas-boots <groupId> org.springframework.boot </groupid> <ArtifactId> spring-boot-starter-test </arttifactid> <scope> test </seupope> </dependency> </dependencies> <build> <dlugin> <llupin> <groupid> org.springframework.boot </groupid> <lugin> <groupid> <ArTifactId> spring-boot-maven-plugin </artifactid> </lugin> </lugin> </build> </poject>
2. Tambahkan informasi konfigurasi, gunakan file YML di sini
Musim Semi: Kafka: Bootstrap-Server: XXXX: 9092 Produser: Value-Serializer: org.springframework.kafka.support.serializer.jsonserializer konsumen: org.springfram.kringfring. Spring: Json: Tepercaya: Paket: com.laravelshao.springboot.kafka
3. Buat Objek Pesan
Pesan Kelas Publik {Private Integer ID; msg string pribadi; Pesan Publik () {} Pesan Publik (ID Integer, String msg) {this.id = id; this.msg = msg; } public integer getId () {return id; } public void setid (integer id) {this.id = id; } public String getMSG () {return msg; } public void setMSG (string msg) {this.msg = msg; } @Override public string toString () {return "message {" + "id =" + id + ", msg = '" + msg +'/'' + '}'; }}4. Buat produser
Paket com.laravelshao.springboot.kafka; impor org.slf4j.logger; impor org.slf4j.loggerFactory; impor org.springframework.beans.factory.annotation.Autowired; impor org.springframework.kafka.kafka.corecore org.springframework.stereotype.component;/*** Dibuat oleh Shaoqinghua pada 2018/3/23. */@Componentpublic class produser {private static logger log = loggerFactory.getLogger (produser.class); @Autowired private kafkatemplate kafkatemplate; public void send (string topic, pesan pesan) {kafkatemplate.send (topik, pesan); log.info ("produser-> topik: {}, pesan: {}", topik, pesan); }}5. Buat konsumen dan gunakan @kafkalistener untuk membuat anotasi topik
Paket com.laravelshao.springboot.kafka; impor org.apache.kafka.clients.consumer.consumeRecord; impor org.slf4j.logger; impor org.slf4j.loggerFactory; impor org.springframework.kafka.annotation; org.springframework.stereotype.component;/*** Dibuat oleh Shaoqinghua pada 2018/3/23. */@ComponentPublic Class Consumer {private static logger log = loggerFactory.getLogger (konsumen.class); @Kafkalistener (topik = "test_topic") public void menerima (konsumsi <string, pesan> konsumsi consumeRecord) {log.info ("konsumen-> topik: {}, value: {}", consumerrecord.topic (), konsumen konsumen (value ()); }}6. Kirim tes konsumsi
Paket com.laravelshao.springboot; impor com.laravelshao.springboot.kafka.message; impor com.laravelshao.springboot.kafka.producer; impor org.springframework.boot.springlication; impor org.springframboMework.boot org.springframework.context.applicationContext; @springbootApplicationPublic kelas integrasiKafkaApplication {public static void main (string [] args) melempar interruptedException {applicationContext context = springApplication.run (integrationkafkaApplication ,Clicass, argerclicass, argerplication); argAplication); Produser produser = context.getBean (produser.class); untuk (int i = 1; i <10; i ++) {produser.send ("test_topic", pesan baru (i, "Pesan topik uji"+i)); Thread.sleep (2000); }}}Anda dapat melihat mengirim pesan dan mengkonsumsi pesan secara bergantian
Masalah pengecualian
Pengecualian Deserialisasi (Objek Pesan Kustom tidak berada di bawah jalur paket yang dipercaya oleh Kafka)?
[org.springframework.kafka.kafkalerterenendpointcontainer#0-0-c-1] Kesalahan org.springframework.kafka.listener.kafkamessagelistenercontainer $ listenerConsumer.719 Pengecualian Kontainer Kontainer
org.apache.kafka.common.errors.serializationException: kesalahan deserializing kunci/nilai untuk partisi test_topic-0 pada offset 9. Jika diperlukan, silakan cari melewati catatan untuk melanjutkan konsumsi.
Disebabkan oleh: java.lang.illegalargumentException: kelas 'com.laravelshao.springboot.kafka.message' tidak ada dalam paket tepercaya: [java.util, java.lang]. Jika Anda yakin kelas ini aman untuk deserialize, berikan namanya. Jika serialisasi hanya dilakukan oleh sumber tepercaya, Anda juga dapat mengaktifkan kepercayaan semua (*).
di org.springframework.kafka.support.converter.defaultjackson2JAvatypeMapper.getClassidType (defaultJackson2JAvatypeMapper.java:139)
di org.springframework.kafka.support.converter.defaultjackson2JavatypeMapper.tojavatype (defaultJackson2JavatypeMapper.java:113)
di org.springframework.kafka.support.serializer.jsondeserializer.deserialize (jsondeserializer.java:191)
di org.apache.kafka.clients.consumer.internals.fetcher.parserecord (fetcher.java:923)
di org.apache.kafka.clients.consumer.internals.fetcher.access $ 2600 (fetcher.java:93)
di org.apache.kafka.clients.consumer.internals.fetcher $ partitionrecords.fetchrecords (fetcher.java:1100)
di org.apache.kafka.clients.consumer.internals.fetcher $ partitionrecords.access $ 1200 (fetcher.java:949)
di org.apache.kafka.clients.consumer.internals.fetcher.fetchrecords (fetcher.java:570)
di org.apache.kafka.clients.consumer.internals.fetcher.fetchedrecords (fetcher.java:531)
di org.apache.kafka.clients.consumer.kafkaconsumer.pollonce (kafkaconsumer.java:1146)
di org.apache.kafka.clients.consumer.kafkaconsumer.poll (kafkaconsumer.java:1103)
di org.springframework.kafka.listener.kafkamessagelistenercontainer $ listenerconsumer.run (kafkamessagelistenercontainer.java:667)
di java.util.concurrent.executors $ runnableadapter.call (executors.java:511)
di java.util.concurrent.futuretask.run (futuretask.java:266)
di java.lang.thread.run (thread.java:745)
Penanganan masalah: Tambahkan paket saat ini ke jalur paket yang dipercaya oleh Kafka
Musim Semi: Kafka: Konsumen: Properti: Musim Semi: JSON: tepercaya: Paket: com.laravelshao.springboot.kafka
Di atas adalah semua konten artikel ini. Saya berharap ini akan membantu untuk pembelajaran semua orang dan saya harap semua orang akan lebih mendukung wulin.com.