Kata pengantar
Karena kebutuhan bisnis, Strom dan Kafka perlu diintegrasikan ke dalam proyek Spring Boot, dan log output layanan lainnya ke topik berlangganan Kafka. Storm menangani topik secara real time untuk menyelesaikan pemantauan data dan statistik data lainnya. Namun, ada beberapa tutorial online. Yang ingin saya tulis hari ini adalah bagaimana mengintegrasikan Storm+Kafka ke boot musim semi, dan omong -omong, saya akan berbicara tentang jebakan yang saya temui.
Konfigurasi Penggunaan dan Konfigurasi Lingkungan
1. Versi Java JDK-1.8
2. Alat Kompilasi Menggunakan Ide-2017
3. Maven sebagai Manajemen Proyek
4.spring boot-1.5.8.release
Manifestasi permintaan
1. Mengapa Anda perlu berintegrasi ke dalam boot musim semi
Untuk menggunakan Spring Boot untuk mengelola berbagai layanan microser secara seragam dan menghindari beberapa konfigurasi desentralisasi secara bersamaan
2. Gagasan dan alasan khusus untuk integrasi
Gunakan boot musim semi untuk mengelola kacang yang dibutuhkan oleh kafka, badai, redis, dll., Kumpulkannya ke kafka melalui log layanan lainnya, dan kirim log untuk menyerbu secara real time, dan melakukan operasi pemrosesan yang sesuai saat Strom Bolt
Masalah yang ditemui
1. Tidak ada badai integrasi yang relevan saat menggunakan boot musim semi
2. Saya tidak tahu bagaimana memicu komitmen topolgy di Spring Boot
3. Saya mengalami masalah dengan numbis bukan klien localhost saat mengirimkan topologi
4. Kacang instan tidak dapat diperoleh melalui anotasi dalam baut badai untuk melakukan operasi yang sesuai
Larutan
Sebelum integrasi, kita perlu mengetahui metode dan konfigurasi startup boot musim semi yang sesuai (jika Anda membaca artikel ini, secara default, Anda telah mempelajari dan menggunakan badai, kafka, dan boot musim semi)
Ada beberapa contoh mengintegrasikan badai dalam boot musim semi di internet, tetapi karena kebutuhan yang sesuai, kita masih perlu berintegrasi.
Pertama Impor Paket JAR yang Diperlukan:
<dependency> <GroupId> org.apache.kafka </groupid> <ArtifactId> kafka-klien </artifactid> <version> 0.10.1.1 </version> </dependency> <sependency> <srupt-cloud-cloud> <t-cloud-cloud-cloud.cloud.cloud </groupid> <ArtiFacTid> <Art-cleud-cloud-coud-cloud <cloud-cloud-st-st-starter <ArtifactId> ZooKeeper </arttifactid> <Groupid> org.apache.zookeeperper </groupid> </eksklusi> <scuxcusion> <ArtifactId> Spring-boot-aktuator </artifactid> <groupid> org.springframework.boot </groupid> </exclusion> org. <GroupId> org.apache.kafka </groupId> </cexclusion> </eksklusi> </eksklusi> </dependency> <dependency> <groupid> org.springframework.kafka </groupid> <ArTifactid> Spring-kafka </artifactid> <scuxcusion> <ArTifactid> Spring-kafka </artifactid> <scuxcusion> <ArTifactid> KAFKA </artifactid> <sexclusion> <ArTifactid> <GroupId> org.apache.kafka </groupId> </cexclusion> </excupsions> </dependency> <dependency> <GroupId> org.springframework.data </proupid> <ArTifactId> </ArtifactId> </gruptid> <artifactid </version> </arttifactid> <pring> 2.5.0.release </Version> </arttifactid> <t ArtifactD> <TERFICES> 2.5.0. <GroupId> org.slf4j </groupid> <ArtifactId> slf4j-log4j12 </stifactid> </cexclusion> <ckscuxcusion> <ArtifactId> Commons-logging </arttifactid> <Roupid> commons no </groupid> </eksklusi> <seck exclusi> <rotifacid> commons no </groupid> </eksklusi> <screckSid> <RotiveCid> commons no </groupid> </eksklusi> <secklusi> <RotifacD> <GroupId> io.netty </groupId> </cexclusion> <cksuccusion> <t Artifactid> jackson-core-asl </arttifactid> <groupid> org.codehaus.jackson </groupid> </eksklusi> <ckscuplusion> <Artifactid> Kurator-Client </arttifactid> <croupsid> <croupachec.apache. <artifactId>jettison</artifactId> <groupId>org.codehaus.jettison</groupId> </exclusion> <exclusion> <artifactId>jackson-mapper-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-jaxrs</artifactId> <GroupId> org.codehaus.jackson </groupid> </eksklusi> <scuxcusion> <t Artifactid> snappy-java </stifactid> <groupid> org.xerial.snappy </groupid> <croupgon> <croupson> <Artifactid> jackson-xc </artifactid> <croupgon> <t Artifactid> Jackson-xc </arttifactid> <croupgon> <croupson> <croupson <scuxcusion> <ArtifactId> jambu </arttifactid> <groupid> com.google.guava </groupid> </eksklusi> <scuclusion> <Artifactid> Hadoop-mapreduce-core </arttifactid> <grouppid> <ArOKACE.HADOOP </Groupid> </Artifactid> <grouppid> <AroFACTIF> <SECIFACD> <SECCLUF> <GroupId> org.apache.zooKeeper </groupId> </cexclusion> <cksuccusion> <t Artifactid> servlet-api </stifactid> <groupid> javax.servlet </groupid> </eksklusi> </eksklusi> </dependensi> <dependency> <groupid> org.apache.zookeer> </dependensi> <dep dependency> <groupid> org.apache.zookeep <artifactId>zookeeper</artifactId> <version>3.4.10</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <ArtifactId> hbase-client </stifactid> <version> 1.2.4 </version> <celuctions> <scuxcusion> <RtifactId> log4j </artifactid> <groupid> log4j </groupid> <kelongsid> <crupexe> <Artifactid> ZooKeeper </groupid> <croupter> <croupacke. <scuxcusion> <ArtifactId> netty </arttifactid> <groupid> io.netty </groupid> </eksklusi> <scuclusion> <Artifactid> hadoop-common </arttifactid> <Rroupid> org.apache.hadoop </groupid> </eksklusi> <scuxclusion> <ArTifactid </guava </groupid> </eksklusi> <cexclusion> <Artifactid <GroupId> com.google.guava </groupid> </cksuccusion> </eksklusi> <scuxcusion> <t ArtifactId> Hadoop-annotasi </arttifactid> <groupid> org.apache.hadoop </groupid> <ckscuilpid> <scrupcid> <cartifactid> hadoop-yarny-common </art/exclusion> <scuxhactid> hadoop-yarn-common </art/exclusion> <scuxhactid> hadoop-yarn-common </art/exclusion </Exclusion> <scuclusion> <RtifactId> slf4j-gog4j12 </t Artifactid> <groupid> org.slf4j </groupid> </cexclusion> </cexcid </artifactid> <Artif-version> <AcoPACIP.APACIP.HADOOP </Groupid> <ArTifactid> <groupid> Hadoop.apache.hadoop </groupid> <ArTifactid> <grouptid> HAdoop.apache.hadoop </groupid> <ArTifactID> version </Cache. <Ecplusions> <scuclusion> <RtifactId> commons-logging </artifactid> <groupid> commons-logging </artifactid> <roupid> commons-logging </groupid> </eksklusi> <scuxclusion> <t Artifactid> Kurator </Artifactid> <groupid> <seluction> <t Artifactid> </ArtifactId <groupid> <groupid> <orgacache.curator.curator> </Artifactid <groupid> <groupid> org.apache.curator.curator> </ArtifactId <groupid> <groupid> org.apache.curator> <ArTifactId> Jackson-Mapper-Asl </arttifactid> <groupid> org.codehaus.jackson </groupid> </eksklusi> <scuclusion> <ArtifactId> Jackson-core- Asl </arttifactid> <groupid> org.codehaus.jackson </groupid> </exclusion> <groupid> org.codehaus.jackson </groupid> </exclusion> <groupid> org.codehaus.jackson </groupid> </eksklusi> <GroupId> log4j </groupid> </eksklusi> <scuxcusion> <RtifactId> snappy-java </arttifactid> <groupid> org.xerial.snappy </groupid> </eksklusi> <scuplusion> <tartifactid> zooKeeper </arttifactid> <croupder> <t ArtifactId> zookeeper </arttifactid> <groupid> <roupache.apache.apache. <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-auth</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>commons-lang</artifactId> <groupId>commons-lang</groupId> </Exclusion> <cksuccusion> <ArTifactId> slf4j-log4j12 </stifactid> <groupid> org.slf4j </sroupid> </eksklusi> <ckscuclusion> <t ArtifactId> SERVLET-API </artifactid> <groupid> JAVAX.Servlet </groupid> </artifactid> <groupid> JAVAX.Servlet </groupid> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-examples</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <ArtifactId> netty </arttifactid> <groupid> io.netty </proupid> </eksklusi> <scuxcusion> <t ArtifactId> jambu </artifactid> <groupid> com.google.guava </groupid> </eksklusi> <scxectuD> <ArtiFacTid> LOG4J </ARTIFACID> </Exclusion> <succlusion> <ArTifacTid> LOG4J </ArtacId> <ArtifactId> servlet-api </stifactid> <groupid> javax.servlet </proveDid> </cexclusion> </eksklusi> </dependency> <!-Storm-> <dependency> <groupid> org.apache.storm </groupid> <ArTifactid> </version </artfaction> <torm. {version> <Torm.version> <Torm.version> <TORT. <scope>${provided.scope}</scope> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </Exclusion> </Exclusive> </dependency> <dependency> <Groupid> org.apache.storm </groupid> <ArtifactId> Storm-kafka </artifactid> <version> 1.1.1 </version> <cklusion> <scuppeaf. <artifactid> Kafka-clients </artifactid> <croxache> <croupake. </Exclussions> </dependency>Paket JAR dihapus karena ada beberapa dependensi yang terkait dengan dependensi konstruksi proyek. Versi badai adalah 1.1.0 Spring Boot Terkait Ketergantungan
`` `Java
<!-Spring Boot-> <dependency> <GroupId> org.springframework.boot </groupid> <ArTifactId> Spring-Boot-starter </arttifactid> <celuctions> <scuxclusion> <roupid> org.springframework.boot </groupid> <Arttifactid> Spring-star-starter> <croping-l-startif.t-starifer </groupid> <ArTtifacTid> Spring-starter>-loot-starter. </Dependency> <dependency> <GroupId> org.springframework.boot </groupId> <ArtiFacTId> Spring-boot-starter-web </arttifactid> </dependency> <ArttiFAcD> <ArtiFID> <TROMPRED-STOM-STRINGSIT.BOOT </groupid> <ArtiFacTID> <ArtiFACTIF> SPRINGSIF-BOTTORTER.BOOT </GroupID> <ArtiFACTID> SPRINGSIF-BOTORTER.BOOT </GroupID> <ArtiFACTID> SPRING-BOT-BOTTORTER.BOOT </GroupID> <ArTtifacTID> SPRING-BOTTENTER <GroupId> org.springframework.boot </groupid> <ArtifactId> tes musim semi-boot-starter </arttifactid> <scope> test </seupope> </dependency> <sependency> <trotifsid> org. <groupId> org.mybatis.spring.boot </groupid> <ArtiFacTId> mybatis-spring-boot-starter </t Artifactid> <version> $ {mybatis-spring.version} </version> </dependency> <dependency> <groupid> org.springfring.boork.boot <ArTifactId> Spring-boot-konfigurasi-Prosesor </artifactid> <pilihan> true </opsional> </dependency>PS: Paket JAR Maven bukan yang paling ramping karena persyaratan penggunaan proyek. Ini hanya untuk referensi Anda.
Struktur Proyek:
file konfigurasi storing konfigurasi di lingkungan yang berbeda
Penyimpanan Build Kelas Implementasi Terkait Spring Boot seperti Bangun Nama
Saat memulai boot musim semi, kita akan menemukan
Bahkan, sebelum memulai integrasi, saya tahu sedikit tentang Storm, yang tidak berhubungan di awal. Kemudian, saya menemukan bahwa setelah berintegrasi ke dalam boot musim semi, saya tidak memiliki cara yang sesuai untuk memicu fungsi melakukan topolgy setelah memulai boot musim semi, jadi saya pikir setelah memulai boot musim semi, saya selesai. Akibatnya, saya menunggu selama setengah jam dan tidak ada yang terjadi sebelum saya menemukan bahwa fungsi memicu komit itu tidak diterapkan.
Untuk mengatasi masalah ini, ide saya adalah: Mulai Spring Boot-> Buat topik mendengarkan kafka dan mulai topolgy untuk menyelesaikan startup. Namun, untuk masalah seperti itu, Kafka mendengarkan topik akan berulang kali memicu topolgy, yang jelas bukan yang kita inginkan. Setelah menonton sebentar, saya menemukan bahwa Spring memiliki startup terkait dan menjalankan metode waktu tertentu setelah selesai. Ini adalah Juruselamat bagiku. Jadi gagasan memicu topolgy telah menjadi:
Mulai Boot Musim Semi -> Jalankan Metode Pemicu -> Lengkapi kondisi pemicu yang sesuai
Metode konstruksi adalah:
/** * @author leezer * @date 2017/12/28 * Secara otomatis mengirimkan topologi setelah musim semi dimuat **/ @configuration @componentpublic class autoload mengimplementasikan ApplicationListener <contextrefreshedEvent> {private static string brokerzkstr; Topik String Statis Pribadi; host string statis pribadi; port string statis pribadi; Public AutOLoad (@Value ("$ {Storm.BrokerZkStr}") String Brokerzkstr, @Value ("$ {ZooKeeper.host}") host string, @value ("$ {zooKeeper.port}) port string, @value (" $ {kafka.defaic {value ("$ {kafka.defaic {value (" $ {Kafka.defaic {"$ {KAFKAC (" $ {KAFKA. brokerzkstr; Host = host; Topik = Topik; Port = port; } @Override public void onApplicationEvent (contextrefreshedEvent event) {coba {// instantiate kelas topologibuilder. TOPOLOGIBUILDER TOPOLOGIBUILDER = TOPOLOGYBUILDER BARU (); // Atur simpul erupsi dan alokasikan nomor konkurensi. Nomor konkurensi akan mengontrol jumlah utas objek dalam cluster. Brokerhosts breakehosts = zkhosts baru (brokerzkstr); // Konfigurasikan topik untuk berlangganan Kafka, serta direktori dan nama node data di ZooKeeper spoutConfig = new spoutconfig (brokerhosts, topik, "/badai", "s32"); spoutconfig.scheme = skema baru new (string stringscheme ()); spoutconfig.zkservers = collections.singletonlist (host); spoutconfig.zkport = integer.parseint (port); // Baca spoutconfig.startoffsetTime = offsetRequest.latesttime (); Penerima kafkaspout = kafkaspout baru (spoutconfig); topologybuilder.setspout ("kafka-spout", penerima, 1) .setnumTasks (2); TopologyBuilder.setBolt ("Alarm-Bolt", New AlarmBolt (), 1) .setnumTasks (2) .shuffleGrouping ("kafka-spout"); Config config = new config (); config.setDebug (false); /* Tetapkan jumlah slot sumber daya yang ingin disita topologi di cluster badai. Slot sesuai dengan proses pekerja pada simpul pengawas. Jika jumlah tempat yang Anda alokasikan melebihi jumlah pekerja yang dimiliki simpul fisik Anda, pengajuan tersebut mungkin tidak berhasil. Bergabung dengan cluster Anda, sudah ada beberapa topologi di atasnya, dan ada 2 sumber daya pekerja yang tersisa. Jika Anda mengalokasikan 4 topologi untuk kode Anda, maka topologi ini dapat dikirimkan, tetapi setelah berkomitmen, Anda akan menemukan bahwa itu tidak berjalan. Dan ketika Anda membunuh beberapa topologi dan melepaskan beberapa slot, topologi Anda akan melanjutkan operasi normal. */ config.setnumWorkers (1); Cluster localcluster = new localcluster (); cluster.submittopology ("kafka-spout", config, topologybuilder.createTopology ()); } catch (Exception e) {E.PrintStackTrace (); }}}Catatan:
Saat memulai proyek, kesalahan berikut dapat dilaporkan karena menggunakan Tomcat tertanam untuk startup.
[Tomcat-startStop-1] ERROR oaccContainerBase - A child container failed during startjava.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]] at java.util.concurrent.futuretask.report (futuretask.java:122) ~ [?: 1.8.0_144] di java.util.concurrent.futuretask.get (futuretask.java:192) ~ [? org.apache.catalina.core.containerbase.startinternal (containerbase.java:939) [tomcat-embed-core-8.5.23.jar: 8.5.23] di org.apache.catalina.core.standardhost.Startinternal (standar. ] org.apache.catalina.core.containerbase $ startchild.call (containerbase.java:1419) [tomcat-embed-core-8.5.23.jar: 8.5.23] di org.apache.catalina.core.containerbase $ startchild.callbild. [Tomcat-embed-core-8.5.23.jar: 8.5.23] di java.util.concurrent.futuretask.run $$ capture (futuretask.java:266) [? java.util.concurrent.threadpoolexecutor.runworker (threadpoolexecutor.java:1149) [?: 1.8.0_144] di java.util.concurrent.threadpoolexecutor.runworker (threadpoolexecutor.java:1149) [: 8.4.4.4.4.4.10 java.util.concurrent.threadpoolexecutor $ worker.run (threadpoolexecutor.java:624) [?: 1.8.0_144] di java.lang.thread.run (thread.java:748) [?: 1.8.0_144]
Ini karena paket JAR yang diimpor yang sesuai memperkenalkan versi Servlet-API lebih rendah dari versi tertanam. Yang perlu kita lakukan adalah membuka ketergantungan Maven dan menghapusnya
<scuxclusion> <ArTifactId> servlet-api </stifactid> <groupid> javax.servlet </groupid> </eksklusi>
Lalu restart.
Dimungkinkan untuk melaporkan selama startup:
Salinan kode adalah sebagai berikut:
org.apache.storm.utils.nimbusleaderNotFoundException: tidak dapat menemukan pemimpin nimbus dari tuan rumah benih [localhost]. Apakah Anda menentukan daftar host nimbus yang valid untuk config nimbus.seeds? Di org.apache.storm.utils.nimbusclient.getConfiguredClientas (nimbusclient.java:90
Saya memikirkan masalah ini untuk waktu yang lama dan menemukan bahwa penjelasan online semuanya disebabkan oleh masalah konfigurasi badai, tetapi badai saya digunakan di server. Tidak ada konfigurasi yang relevan. Secara teori, kita juga harus membaca konfigurasi yang relevan di server, tetapi hasilnya tidak terjadi. Akhirnya, saya mencoba beberapa metode dan menemukan bahwa itu salah. Di sini saya menemukan bahwa ketika membangun cluster, Storm memberikan cluster lokal yang sesuai.
Cluster localcluster = new localcluster ();
Lakukan pengujian lokal. Jika Anda menguji secara lokal, gunakan untuk tes penempatan. Jika digunakan ke server, Anda perlu:
cluster.submittopology ("kafka-spout", config, topologybuilder.createTopology ()); // ditetapkan untuk: stormsubmitter.submittopology ("kafka-spout", config, topologybuilder.createTopology ());Melakukan pengajuan tugas;
Di atas memecahkan masalah di atas 1-3
Pertanyaan 4: Saya menggunakan instance kacang yang relevan di Bolt, saya menemukan bahwa saya tidak bisa mendapatkan contoh jika saya meletakkannya di musim semi menggunakan @Component: tebakan saya adalah ketika kita membangun topolgy komit, itu akan ada di:
Salinan kode adalah sebagai berikut:
TopologyBuilder.setBolt ("Alarm-Bolt", New AlarmBolt (), 1) .setnumTasks (2) .shuffleGrouping ("kafka-spout");
Baut eksekusi terkait:
@Override public void persiapan (peta stormconf, topologyContext konteks, outputCollector collector) {this.collector = collector; Stormlauncher stormlauncher = stormlauncher.getstormlauncher (); DatasePositorys = (alarmDataRepositorys) stormlauncher.getBean ("alarmDataRepositorys"); }Tanpa instantiasi baut, benang berbeda dan pegas tidak dapat diperoleh. (Saya tidak terlalu memahaminya di sini, jika seorang pria besar tahu, Anda dapat membagikannya)
Arti menggunakan boot pegas adalah bahwa objek rumit ini diperoleh. Masalah ini telah mengganggu saya sejak lama. Akhirnya, saya berpikir bahwa kita bisa mendapatkan instance melalui konteks Getbean dan tidak tahu apakah itu bisa berhasil, jadi saya mulai mendefinisikan:
Misalnya saya perlu menggunakan layanan di Bolt:
/*** @author leezer* @date 2017/12/27* Waktu kegagalan operasi penyimpanan **/ @service ("alarmDataRepositorys") kelas publik alarmDataPositorys memperluas redisbase mengimplementasikan ialarmDataRepositorys {swasta statis string final erro = "erro"; / *** @param Tipe Tipe* @Param Nilai Kunci Kunci* @Return Jumlah kesalahan **/ @Override Public String getErrnumFromRedis (tipe string, tombol string) {if (type == null || key == null) {return null; } else {valueoperations <string, string> valueoper = primaryStringReTemplate.opsForValue (); return valueoper.get (string.format ("%s:%s:%s", erro, type, key)); }} / *** @param Type Tipe kesalahan* @param Nilai tombol Kunci* @param Nilai tersimpan ** / @Override public void seterRnumToredIS (tipe string, tombol string, nilai string) {coba {valueoperations <string, string> valueoper = primerStringEntemplate.opsForValue (); valueoper.set (string.format ("%s:%s:%s", erro, type, key), value, kamus.apikeydayoflifecycle, timeunit.seconds); } catch (exception e) {logger.info (kamus.redis_error_prefix+string.format ("Key gagal menyimpan redis di %s", key)); }}Di sini saya menentukan nama kacang, dan ketika baut mengeksekusi persiapan: Gunakan metode Getbean untuk mendapatkan kacang yang relevan dan lengkapi operasi yang sesuai.
Kemudian Kafka berlangganan topik dikirim ke baut saya untuk pemrosesan terkait. Metode Getbean di sini adalah memulai definisi fungsi bootmain:
@SpringbootApplication@enableTransactionManagement@componentscan ({"service", "storm"})@enableMongorepositories (Basepackages = {"Storm"}))@propertieSource (value = {"classpath: service.property", "classpath: application.propert", "classpath: class.properties", "classpath: Application. = {"classpath: /configs/spring-hadoop.xml", "classpath: /configs/spring-hbase.xml"}) kelas publik stormlauncher memperluas springbootervletinitializer {// atur instance peluncur utas yang aman, static volatile private staticlauncherer Stormlauncherer {// instance peluncur peluncur privat static staticlauncherer; // atur konteks konteks private applicationContext; public static void main (string [] args) {springApplicationBuilder application = new springApplicationBuilder (stormlauncher.class); // application.web (false) .run (args); Metode ini adalah bahwa Spring Boot tidak memulai application.run (args); Stormlauncher S = New Stormlauncher (); s.setApplicationContext (application.context ()); setstormlauncher (s); } private static void setstormlauncher (stormlauncher stormlauncher) {stormlauncher.stormlauncher = stormlauncher; } public static stormlauncher getstormlauncher () {return stormlauncher; } @Override Protected SpringApplicationBuilder Configure (Aplikasi SpringApplicationBuilder) {return application.sources (stormlauncher.class); } / ** * Dapatkan konteks * * @return konteks aplikasi * / application publicContext getAppLicationContext () {return context; } /*** Atur konteksnya. * * @param appContext konteks */ private void setApplicationContext (ApplicationContext appContext) {this.context = appContext; } /*** Dapatkan instance bean melalui nama khusus. * * @param Nama nama * @return The Bean */ Public Object GetBean (nama string) {return context.getBean (name); } /*** Dapatkan kacang melalui kelas. * * @param <T> Parameter tipe * @param clazz the clazz * @return the bean */ public <t> t getbean (class <t> clazz) {return context.getBean (clazz); } / ** * Mengembalikan name dengan name yang ditentukan, dan clazz * * @param <t> Parameter tipe * @param Nama nama * @param clazz the clazz * @return the bean * / public <t> t getbean (nama string, kelas <t> clazz) {return context.getbean (nama, clazz); }Integrasi Storm dan Kafka ke Spring Boot telah berakhir. Saya akan memasukkan kafka terkait dan konfigurasi lainnya ke dalam github
Ngomong -ngomong, ada juga lubang kafkaclient di sini:
Async Loop meninggal! java.lang.nosuchmethoderror: org.apache.kafka.common.network.networksend.
Proyek ini akan melaporkan masalah klien Kafka, karena di Storm-Kafka, Kafka menggunakan versi 0.8, sedangkan Networksend adalah versi 0.9 atau lebih. Integrasi di sini harus konsisten dengan versi terkait Kafka yang Anda integrasikan.
Meskipun integrasi relatif sederhana, ada beberapa referensi. Selain itu, saya baru saja mulai bersentuhan dengan Storm, jadi saya pikir banyak. Saya juga akan merekamnya di sini.
Alamat Proyek - GitHub
Di atas adalah semua konten artikel ini. Saya berharap ini akan membantu untuk pembelajaran semua orang dan saya harap semua orang akan lebih mendukung wulin.com.