คำนำ
เนื่องจากความต้องการทางธุรกิจ Strom และ Kafka จำเป็นต้องรวมเข้ากับโครงการ Spring Boot และบันทึกการส่งออกบริการอื่น ๆ ในหัวข้อการสมัครสมาชิก Kafka พายุจัดการหัวข้อแบบเรียลไทม์เพื่อตรวจสอบข้อมูลและสถิติข้อมูลอื่น ๆ อย่างไรก็ตามมีบทเรียนออนไลน์น้อย สิ่งที่ฉันต้องการเขียนในวันนี้คือวิธีการรวมพายุ+คาฟคาเข้ากับสปริงบูตและโดยวิธีการที่ฉันจะพูดถึงข้อผิดพลาดที่ฉันพบ
เครื่องมือการใช้งานและการกำหนดค่าสภาพแวดล้อม
1. Java เวอร์ชัน JDK-1.8
2. เครื่องมือรวบรวมใช้ IDEA-2017
3. Maven เป็นผู้บริหารโครงการ
4.Spring boot-1.5.8 รีลีส
การรวมตัวกัน
1. ทำไมคุณต้องรวมเข้ากับ Spring Boot
เพื่อใช้สปริงบูตเพื่อจัดการไมโครเซิร์ตต่างๆอย่างสม่ำเสมอและหลีกเลี่ยงการกำหนดค่ากระจายอำนาจหลายครั้งในเวลาเดียวกัน
2. ความคิดและเหตุผลเฉพาะสำหรับการบูรณาการ
ใช้ Spring Boot เพื่อจัดการถั่วที่ Kafka, Storm, Redis ฯลฯ รวบรวมไว้ที่ Kafka ผ่านบันทึกบริการอื่น ๆ และส่งบันทึกไปยังพายุแบบเรียลไทม์และดำเนินการประมวลผลที่สอดคล้องกันเมื่อ Strom Bolt
พบปัญหา
1. ไม่มีพายุรวมที่เกี่ยวข้องเมื่อใช้สปริงบูต
2. ฉันไม่รู้วิธีเรียกใช้ topolgy ในฤดูใบไม้ผลิบูต
3. ฉันพบปัญหากับ numbis ไม่ใช่ลูกค้า localhost เมื่อส่งโทโพโลยี
4. ถั่วอินสแตนซ์ไม่สามารถรับได้ผ่านคำอธิบายประกอบในพายุโบลต์เพื่อดำเนินการที่สอดคล้องกัน
สารละลาย
ก่อนการรวมเราจำเป็นต้องทราบวิธีการเริ่มต้นและการตั้งค่าสปริงเริ่มต้นที่สอดคล้อง
มีตัวอย่างบางอย่างของการรวมพายุในการบูตฤดูใบไม้ผลิบนอินเทอร์เน็ต แต่เนื่องจากความต้องการที่สอดคล้องกันเรายังคงต้องรวมเข้าด้วยกัน
ก่อนนำเข้าแพ็คเกจ JAR ที่จำเป็น:
<การพึ่งพา> <roupId> org.apache.kafka </groupid> <ratifactid> kafka-clients </artifactid> <persion> 0.10.1.1 </sention> </การพึ่งพา> <การพึ่งพา <exclusion> <ratifactId> Zookeeper </artifactId> <roupId> org.apache.zookeeper </groupId> </exclusion> <Eclusion> <ArtIfactId> Spring-Boot-Actuator </artifactid> <ArtIfactId> Kafka-clients </artifactid> <roupId> org.apache.kafka </groupId> </การยกเว้น> </การยกเว้น> </exclusion> <ArtIfactId> kafka-clients </artifactid> <roupId> org.apache.kafka </groupId> </การยกเว้น> </การยกเว้น> </การพึ่งพา> <การพึ่งพา> <roupid> org.springframework.data </groupid> <EXCLUSIONS> <EXCLUSION> <SloupID> org.slf4j </groupId> <ratifactId> SLF4J-LOG4J12 </ArtifactId> </exclusion> <exclusion> <roupId> io.netty </groupId> </eparlius> <exclusion> <ratifactId> Jackson-Core-asl </artifactid> <scolusion> org.codehaus.jackson </groupid> </exclusion> <exclusion> <ArtIfactId> Jettison </artifactId> <roupId> org.codehaus.jettison </groupId> </exclusion> <exclusion> <ratifactid> jackson-mapper-asl </artifactid> <sclusion> org.codehaus.jackson <RoupID> org.codehaus.jackson </groupId> </exclusion> <exclusion> <ratifactId> snappy-java </artifactid> <sergision> org.xerial.snappy </groupid> </exclusion> </sclipusion> <exclusion> <ratifactId> Guava </artifactid> <scolusion> com.google.guava </groupId> </exclusion> <exclusion> <ArtIfactId> ZooKeeper </artifactId> <soolid> org.apache.zookeeper </groupId> </การยกเว้น> <exclusion> <sartifactid> servlet-api </artifactid> <sderctiD> <cerperency> <ArtIfactId> ZooKeeper </artifactId> <persion> 3.4.10 </Sersion> <evercrusions> <eArcusion> <ratifactId> SLF4J-LOG4J12 </artifactId> <roupId> org.slf4j </groupid> </excliusion> <ArtIfactId> hbase-client </artifactid> <persion> 1.2.4 </เวอร์ชัน> <evercrusions> <exclusion> <mentifactId> log4j </artifactid> <soolsion> log4j </groupid> </exclusion> <EXCLUSION> <ARTIFACTID> NETTY </ArtIfactId> <soo.netty </groupId> </exclusion> <exclusion> <ratifactid> hadoop-common </artifactid> <RoupID> com.google.guava </groupId> </exclusion> </การยกเว้น> <Eclusion> <ArtIfactId> Hadoop-Annotations </artifactid> <roupId> org.apache.hadoop </groupid> </exclusion> </eparliusion> <exclusion> <ratifactId> SLF4J-LOG4J12 </artifactId> <roupId> org.slf4j </groupId> </การยกเว้น> </exclusions> <EXCLUSIONS> <EXCLUSION> <ARTIFACTID> Commons-Logging </artifactId> <scurusId> Commons-logging </artifactid> <s/GroupId> Commons-logging </groupId> </exclusion> <ArtIfactId> Jackson-Mapper-Asl </artifactId> <roupId> org.codehaus.jackson </groupId> </exclusion> <exclusion> <ratifactid> Jackson-Core-asl </artifactid> <roupId> log4j </groupId> </excusion> <exclusion> <ratifactId> snappy-java </artifactid> <saultId> org.xerial.snappy </groupId> </exclusion> <exclusion> <ArtIfactId> Guava </artifactId> <scuraId> com.google.guava </groupId> </exclusion> <exclusion> <ratifactid> hadoop-auth </artifactid> <sclusion> org.apache.hadoop </groupid> </sclipusion> <exclusion> <ratifactId> SLF4J-LOG4J12 </artifactId> <roopId> org.slf4j </groupId> </exclusion> <exclusion> <RoupID> org.apache.hadoop </groupId> <ratifactid> hadoop-mapreduce- ตัวอย่าง </artifactid> <sersion> 2.7.3 </version> <eplusions> <exclusion> <ArtIfactId> netty </artifactid> <roupId> io.netty </groupId> </exclusion> <exclusion> <atifactId> Guava </artifactid> <roupId> com.google.guava </groupid> </exclusion> <exclusion> <ratifactId> servlet-api </artifactid> <sroupid> javax.servlet </groupId> </การยกเว้น> </การยกเว้น> </การพึ่งพาอาศัย> <!-พายุ-> <การพึ่งพา <Scope> $ {ให้บริการ SSCOPE} </scope> <exclusions> <Eclusision> <scolusion> <roupId> org.apache.logging.log4j </groupid> <ratifactid> log4j-slf4j-impl </artifactid> </exclusion> </sclipusion> </eplusions> </serdency> <การพึ่งพา> <roupId> org.apache.storm </groupId> <ratifactid> Storm-Kafka </artifactid> <Sersion> 1.1.1 </SecurSIOSIONS> </excusision> </exclusions> </dependency>แพ็คเกจ JAR จะถูกลบออกเนื่องจากมีการพึ่งพาหลายครั้งที่เกี่ยวข้องกับการพึ่งพาการก่อสร้างโครงการ เวอร์ชันพายุคือ 1.1.0 สปริงที่เกี่ยวข้องกับการพึ่งพา
`` `java
<!-สปริงบูท-> <การพึ่งพา> <roupId> org.springframework.boot </groupid> <ratifactid> Spring-Boot-Starter </artifactid> <exclusions> </sderctency> <predency> <roupId> org.springframework.boot </groupid> <ratifactid> Spring-Boot-Starter-Web </artifactid> </dependency> <RoupID> org.springframework.boot </groupid> <ratifactid> การทดสอบสปริง-สตาร์เทสต์ </artifactid> <scope> ทดสอบ </cope> </predency> <dermentrency> <การพึ่งพา> <roupId> org.mybatis.spring.boot </groupid> <ratifactid> mybatis-spring-boot-starter </artifactid> <version> $ {mybatis-spring.version} <ArtIfactId> Spring-Boot-Configuration-Processor </artifactid> <plorial> True </pontainal> </การพึ่งพาอาศัย> </การพึ่งพา>PS: แพ็คเกจ Jar ของ Maven ไม่ได้มีความคล่องตัวมากที่สุดเนื่องจากข้อกำหนดการใช้งานโครงการ มันมีไว้สำหรับการอ้างอิงของคุณเท่านั้น
โครงสร้างโครงการ:
การกำหนดค่าการกำหนดค่าไฟล์การกำหนดค่าในสภาพแวดล้อมที่แตกต่างกัน
การจัดเก็บข้อมูลการใช้งานสปริงที่เกี่ยวข้องกับการใช้งานเช่นชื่อบิลด์
เมื่อเริ่มต้นการบูตฤดูใบไม้ผลิเราจะพบ
ในความเป็นจริงก่อนที่จะเริ่มการรวมฉันรู้เพียงเล็กน้อยเกี่ยวกับพายุซึ่งไม่ได้ติดต่อในตอนแรก ต่อมาฉันพบว่าหลังจากรวมเข้ากับการบูตฤดูใบไม้ผลิฉันไม่ได้มีวิธีที่สอดคล้องกันในการเรียกใช้ฟังก์ชั่นของการกระทำ topolgy หลังจากเริ่มต้นการบูตฤดูใบไม้ผลิดังนั้นฉันคิดว่าหลังจากเริ่มต้นการบูตฤดูใบไม้ผลิฉันก็เสร็จ เป็นผลให้ฉันรอครึ่งชั่วโมงและไม่มีอะไรเกิดขึ้นก่อนที่ฉันจะพบว่าฟังก์ชั่นของการกระตุ้นการกระทำนั้นไม่ได้ถูกนำมาใช้
ในการแก้ปัญหานี้ความคิดของฉันคือ: เริ่มต้นฤดูใบไม้ผลิ boot-> สร้างหัวข้อการฟัง kafka และเริ่มต้น Topolgy เพื่อเริ่มต้นการเริ่มต้น อย่างไรก็ตามสำหรับปัญหาดังกล่าว Kafka การฟังหัวข้อจะก่อให้เกิด topolgy ซ้ำ ๆ ซึ่งเห็นได้ชัดว่าไม่ใช่สิ่งที่เราต้องการ หลังจากดูสักพักฉันพบว่าฤดูใบไม้ผลิมีการเริ่มต้นที่เกี่ยวข้องและดำเนินการวิธีการเวลาที่แน่นอนหลังจากเสร็จสิ้น นี่คือพระผู้ช่วยให้รอดสำหรับฉัน ดังนั้นความคิดในการกระตุ้น topolgy จึงกลายเป็น:
เริ่มต้นสปริงบูต -> ดำเนินการวิธีทริกเกอร์ -> กรอกเงื่อนไขทริกเกอร์ที่เกี่ยวข้อง
วิธีการก่อสร้างคือ:
/** * @author Leezer * @date 2017/12/28 * ส่งทอพอโลยีโดยอัตโนมัติหลังจากสปริงโหลด **/ @configuration @componentPublic คลาส AutoLoad ใช้แอปพลิเคชันแอปพลิเคชัน <contextrefreshedEvent> หัวข้อสตริงคงที่ส่วนตัว; โฮสต์สตริงคงที่ส่วนตัว; พอร์ตสตริงคงที่ส่วนตัว Public AutoLoad (@Value ("$ {storm.brokerzkstr}") สตริง Brokerzkstr, @Value ("$ {zookeeper.host}") สตริงโฮสต์, @Value ("$ {zookeeper.port}" โฮสต์ = โฮสต์; หัวข้อ = หัวข้อ; พอร์ต = พอร์ต; } @Override โมฆะสาธารณะ onapplicationEvent (เหตุการณ์ contextrefreshedevent) {ลอง {// อินสแตนซ์คลาส topologybuilder Topologybuilder Topologybuilder = New Topologybuilder (); // ตั้งค่าโหนดการระเบิดและจัดสรรหมายเลขพร้อมกัน หมายเลขพร้อมกันจะควบคุมจำนวนเธรดของวัตถุในคลัสเตอร์ BrokerHosts BrokeHosts = New Zkhosts (Brokerzkstr); // กำหนดค่าหัวข้อสำหรับการสมัครสมาชิก KAFKA เช่นเดียวกับไดเรกทอรี Data Node และชื่อใน Zookeeper SpoutConfig = New SpoutConfig (BrokerHosts, หัวข้อ, "/Storm", "S32"); spoutconfig.scheme = ใหม่ Schemeasmultischeme (ใหม่ stringscheme ()); spoutconfig.zkservers = collections.singletonlist (โฮสต์); spoutconfig.zkport = integer.parseint (พอร์ต); // อ่าน spoutconfig.startoffsettime = OffsetRequest.latestTime (); kafkaspout receiver = new kafkaspout (spoutconfig); Topologybuilder.setspout ("kafka-spout", ตัวรับสัญญาณ, 1) .setnumtasks (2); Topologybuilder.setbolt ("Alarm-Bolt", New Alarmbolt (), 1) .setNumTasks (2). shuffleGrouping ("kafka-spout"); config = new config (); config.setDebug (เท็จ); /* ตั้งค่าจำนวนสล็อตทรัพยากรที่โทโพโลยีต้องการยึดในคลัสเตอร์พายุ สล็อตสอดคล้องกับกระบวนการของคนงานบนโหนดหัวหน้างาน หากจำนวนสปอตที่คุณจัดสรรเกินจำนวนคนงานโหนดทางกายภาพของคุณมีการส่งอาจไม่สำเร็จ การเข้าร่วมคลัสเตอร์ของคุณมีทอพอโลยีอยู่แล้วและมีทรัพยากรคนงานเหลืออยู่ 2 แหล่ง หากคุณจัดสรรโทโพโลยี 4 ตัวให้กับรหัสของคุณโทโพโลยีนี้สามารถส่งได้ แต่หลังจากกระทำแล้วคุณจะพบว่ามันไม่ทำงาน และเมื่อคุณฆ่าทอพอโลยีและปล่อยช่องบางส่วนทอพอโลยีของคุณจะกลับมาทำงานตามปกติ */ config.setNumworkers (1); LocalCluster Cluster = new LocalCluster (); cluster.submittopology ("kafka-spout", config, topologybuilder.createTopology ()); } catch (exception e) {e.printstacktrace (); -บันทึก:
เมื่อเริ่มต้นโครงการอาจมีการรายงานข้อผิดพลาดต่อไปนี้เนื่องจากใช้ Tomcat ฝังตัวสำหรับการเริ่มต้น
[tomcat-startstop-1] ข้อผิดพลาด oacccontainerbase-ภาชนะเด็กล้มเหลวในระหว่าง startjava.util.concurrent.executionexception: org.apache.catalina.lifecycleexception: ล้มเหลวในการเริ่มต้น java.util.concurrent.futuretask.Report (futureTask.java:122) ~ [?:?: 1.8.0_144] ที่ java.util.concurrent.futuretask.get (futureTask.java:192) ~ [ org.apache.catalina.core.containerbase.startinternal (containerbase.java:939) [tomcat-embed-core-8.5.23.jar: 8.5.23] ที่ org.apache.catalina.core.standardhost.startinternal (Standardhost.java [Tomcat-embed-core-8.5.23.jar: 8.5.23] ที่ org.apache.catalina.util.lifecyclebase.start (Lifecyclebase.java:150) org.apache.catalina.core.containerbase $ startchild.call (containerbase.java:1419) [tomcat- embed-core-8.5.23.jar: 8.5.23] ที่ org.apache.catalina.core.containerbase $ startchild.call [tomcat-embed-core-8.5.23.jar: 8.5.23] ที่ java.util.concurrent.futuretask.run $$ Capture (FutureTask.java:266) [?:? java.util.concurrent.threadpoolexecutor.runworker (Threadpoolexecutor.java:1149) [?: 1.8.0_144] ที่ java.util.concurrent.threadpoolexecutor.runworker java.util.concurrent.threadpoolexecutor $ worker.run (threadpoolexecutor.java:624) [?: 1.8.0_144] ที่ java.lang.thread.run (thread.java:748) [?: 1.8.0_144
นี่เป็นเพราะแพ็คเกจ JAR ที่นำเข้าที่สอดคล้องกันแนะนำเวอร์ชัน servlet-API ต่ำกว่ารุ่นที่ฝังอยู่ สิ่งที่เราต้องทำคือเปิดการพึ่งพา Maven และลบออก
<EXCLUSION> <ARTIFACTID> Servlet-API </ArtIfactId> <roupId> javax.servlet </groupId> </exclusion>
จากนั้นรีสตาร์ท
เป็นไปได้ที่จะรายงานระหว่างการเริ่มต้น:
การคัดลอกรหัสมีดังนี้:
org.apache.storm.utils.nimbusleadernotfoundexception: ไม่สามารถหาผู้นำ Nimbus จากโฮสต์เมล็ดพันธุ์ [localhost] คุณระบุรายการที่ถูกต้องของโฮสต์ Nimbus สำหรับ config nimbus.seeds หรือไม่ที่ org.apache.storm.utils.nimbuslient.getConfiguredClientas (nimbusClient.java:90
ฉันคิดถึงปัญหานี้เป็นเวลานานและพบว่าคำอธิบายออนไลน์ทั้งหมดเกิดจากปัญหาการกำหนดค่าพายุ แต่พายุของฉันถูกปรับใช้บนเซิร์ฟเวอร์ ไม่มีการกำหนดค่าที่เกี่ยวข้อง ในทางทฤษฎีเราควรอ่านการกำหนดค่าที่เกี่ยวข้องบนเซิร์ฟเวอร์ แต่ผลลัพธ์ไม่ใช่กรณี ในที่สุดฉันก็ลองหลายวิธีและพบว่ามันผิด ที่นี่ฉันพบว่าเมื่อสร้างคลัสเตอร์พายุให้คลัสเตอร์ท้องถิ่นที่สอดคล้องกัน
LocalCluster Cluster = new LocalCluster ();
ทำการทดสอบในท้องถิ่น หากคุณกำลังทดสอบในพื้นที่ให้ใช้สำหรับการทดสอบการปรับใช้ หากปรับใช้กับเซิร์ฟเวอร์คุณต้อง:
cluster.submittopology ("kafka-spout", config, topologybuilder.createTopology ()); // จับจ้องไปที่: stormsubmitter.submittopology ("kafka-spout", config, topologybuilder.createtopology ();ดำเนินการส่งงาน;
ข้างต้นแก้ปัญหาข้างต้น 1-3
คำถามที่ 4: ฉันใช้อินสแตนซ์ถั่วที่เกี่ยวข้องใน Bolt ฉันพบว่าฉันไม่สามารถรับอินสแตนซ์ได้ถ้าฉันใส่มันในฤดูใบไม้ผลิโดยใช้ @component: ฉันเดาว่าเมื่อเราสร้างการกระทำ topolgy มันจะอยู่ใน:
การคัดลอกรหัสมีดังนี้:
Topologybuilder.setbolt ("Alarm-Bolt", New Alarmbolt (), 1) .setNumTasks (2). shuffleGrouping ("kafka-spout");
การดำเนินการสลักเกลียวที่เกี่ยวข้อง:
@Override โมฆะสาธารณะเตรียม (MAP StormConf, บริบท TopologyContext, OutputCollector Collector) {this.collector = collector; Stormlauncher Stormlauncher = Stormlauncher.getStormlauncher (); DataRepositorys = (AlarmDatarePositorys) Stormlauncher.getBean ("AlarmDatarePositorys"); -หากไม่มีการสร้างอินสแตนซ์ของสลักเกลียวเธรดจะแตกต่างกันและไม่สามารถรับสปริงได้ (ฉันไม่เข้าใจมากที่นี่ถ้ามีชายร่างใหญ่รู้คุณสามารถแบ่งปันได้)
ความหมายของการใช้สปริงบูตคือวัตถุที่ซับซ้อนเหล่านี้ได้รับ ปัญหานี้ทำให้ฉันลำบากมานาน ในที่สุดฉันคิดว่าเราจะได้รับอินสแตนซ์ผ่านบริบท GetBean และไม่รู้ว่ามันสามารถใช้งานได้หรือไม่ดังนั้นฉันจึงเริ่มกำหนด:
ตัวอย่างเช่นฉันต้องใช้บริการใน Bolt:
/*** @author Leezer* @date 2017/12/27* การดำเนินการจัดเก็บข้อมูลล้มเหลวเวลา **/ @บริการ ("AlarmDatarePositorys") ระดับสาธารณะ AlarmDatarePositorys ขยายการดำเนินการ redisbase ดำเนินการ ialarmdatarepositorys / *** @param ประเภทประเภท* @param คีย์คีย์ค่า* @return หมายเลขของข้อผิดพลาด **/ @Override สตริงสาธารณะ geterrnumfromredis (ประเภทสตริง, คีย์สตริง) {ถ้า (ประเภท == null || key == null) {return null; } else {valueOperations <string, string> valueOper = primarystringredistemplate.opsforvalue (); return valueoper.get (string.format ("%s:%s:%s", erro, ประเภท, คีย์)); }} / *** ประเภทข้อผิดพลาดประเภท @param* @param คีย์คีย์ค่า* @param ค่าที่เก็บค่าไว้ ** / @Override โมฆะสาธารณะ seterRnumToredis (ประเภทสตริง, คีย์สตริง, ค่าสตริง) {ลอง {ค่า <string, string> valueOper = primaryStringRedistemplate.opSforValue (); ValueOper.Set (String.format ("%S:%S:%S", Erro, ประเภท, คีย์), ค่า, พจนานุกรม, Aapikeydayoflifecycle, TimeUnit.seconds); } catch (exception e) {logger.info (Dictionaries.redis_error_prefix+string.format ("คีย์ล้มเหลวในการจัดเก็บ redis ใน %s", คีย์)); -ที่นี่ฉันระบุชื่อของถั่วและเมื่อ Bolt ดำเนินการเตรียม: ใช้วิธี getBean เพื่อรับถั่วที่เกี่ยวข้องและดำเนินการที่เกี่ยวข้องให้เสร็จสมบูรณ์
จากนั้นหัวข้อสมาชิก Kafka จะถูกส่งไปยัง Bolt ของฉันสำหรับการประมวลผลที่เกี่ยวข้อง วิธีการของ GetBean ที่นี่คือการเริ่มต้นคำจำกัดความฟังก์ชัน bootmain:
@springbootapplication@enableTransactionManagement@componentscan ({"บริการ", "storm"})@enableMongorePositories (basepackages = {"storm"})@propertySource (value = {"classpath: service.properties" "classpath: /configs/spring-hadoop.xml", "classpath: /configs/spring-hbase.xml"}) คลาสสาธารณะ Stormlauncher ขยาย SpringbootservletInitializer {// ตั้งค่าพายุ Stormlauncher // ตั้งค่าบริบทบริบท Private ApplicationContext; โมฆะคงที่สาธารณะหลัก (String [] args) {SpringApplicationBuilder Application = New SpringApplicationBuilder (Stormlauncher.class); // application.web (false) .run (args); วิธีนี้คือสปริงบูตไม่เริ่มแอปพลิเคชันรัน (args); Stormlauncher S = New Stormlauncher (); S.SetApplicationContext (application.context ()); SetStormlauncher; } โมฆะคงที่ส่วนตัว SetStormlauncher (Stormlauncher Stormlauncher) {Stormlauncher.stormlauncher = Stormlauncher; } Stormlauncher สาธารณะคงที่ getStormlauncher () {return stormlauncher; } @Override Protected SpringApplicationBuilder กำหนดค่า (แอปพลิเคชัน SpringApplicationBuilder) {return application.Sources (stormlauncher.class); } / ** * รับบริบท * * @return บริบทแอปพลิเคชัน * / Public ApplicationContext GetApplicationContext () {กลับบริบท; } /*** ตั้งค่าบริบท * * @param appcontext context */ โมฆะส่วนตัว setApplicationContext (ApplicationContext AppContext) {this.context = appContext; } /*** รับถั่วอินสแตนซ์ผ่านชื่อที่กำหนดเอง * * @param ชื่อชื่อ * @return ถั่ว */ วัตถุสาธารณะ getBean (ชื่อสตริง) {return context.getBean (ชื่อ); } /*** รับถั่วผ่านชั้นเรียน * * @param <t> พารามิเตอร์ประเภท * @param clazz the clazz * @return the bean */ public <t> t getBean (คลาส <t> clazz) {return context.getBean (clazz); } / ** * ส่งคืนถั่วที่ระบุตามชื่อและ clazz * * @param <t> พารามิเตอร์ประเภท * @param ชื่อชื่อ * @param clazz clazz * @return ถั่ว * / สาธารณะ <t> t getBean (ชื่อสตริง, คลาส <t> clazz) -การรวมพายุและคาฟคาเข้ากับสปริงบู๊ตสิ้นสุดลงแล้ว ฉันจะใส่ Kafka ที่เกี่ยวข้องและการกำหนดค่าอื่น ๆ ลงใน GitHub
ยังมีหลุม kafkaclient ที่นี่:
Async Loop เสียชีวิต! java.lang.nosuchmethoderror: org.apache.kafka.common.network.networksend
โครงการจะรายงานปัญหาไคลเอนต์ Kafka เนื่องจากใน Storm-Kafka, Kafka ใช้เวอร์ชัน 0.8 ในขณะที่ Networksend เป็นรุ่น 0.9 หรือสูงกว่า การรวมที่นี่จะต้องสอดคล้องกับเวอร์ชันที่เกี่ยวข้องกับ KAFKA ที่คุณรวมเข้าด้วยกัน
แม้ว่าการบูรณาการจะค่อนข้างง่าย แต่ก็มีการอ้างอิงน้อย นอกจากนี้ฉันเพิ่งเริ่มติดต่อกับพายุดังนั้นฉันจึงคิดว่าเยอะมาก ฉันจะบันทึกที่นี่
ที่อยู่โครงการ - GitHub
ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่ามันจะเป็นประโยชน์ต่อการเรียนรู้ของทุกคนและฉันหวังว่าทุกคนจะสนับสนุน wulin.com มากขึ้น