مقدمة
نظرًا لاحتياجات العمل ، يجب دمج Strom و Kafka في مشروع SPRING Boot ، وسجلات إخراج الخدمات الأخرى إلى موضوع اشتراك Kafka. تعالج العاصفة الموضوع في الوقت الحقيقي لإكمال مراقبة البيانات وإحصائيات البيانات الأخرى. ومع ذلك ، هناك عدد قليل من البرامج التعليمية عبر الإنترنت. ما أريد أن أكتبه اليوم هو كيفية دمج Storm+Kafka في Spring Boot ، وبالمناسبة ، سأتحدث عن المزالق التي واجهتها.
أدوات الاستخدام وتكوين البيئة
1. إصدار جافا JDK-1.8
2. أداة التجميع تستخدم IDEA-2017
3. مافن كإدارة المشروع
4.SPRING BOOT-1.5.8.RELEASE
مظاهر الطلب
1. لماذا تحتاج إلى الاندماج في صندوق الربيع
من أجل استخدام Boot Spring لإدارة مختلف الخدمات المجهرية بشكل موحد وتجنب التكوينات اللامركزية المتعددة في نفس الوقت
2. أفكار محددة وأسباب للتكامل
استخدم SPRING BOOT لإدارة الفاصوليا المطلوبة بواسطة Kafka و Storm و Redis ، وما إلى ذلك ، وجمعها إلى Kafka من خلال سجلات الخدمة الأخرى ، وإرسال سجلات إلى Storm في الوقت الفعلي ، وأداء عمليات المعالجة المقابلة عند Strom Bolt
واجهت المشاكل
1. لا توجد عاصفة تكامل ذات صلة عند استخدام حذاء الربيع
2. لا أعرف كيفية تشغيل الالتزام topolgy في Spring Boot
3. واجهت مشكلة مع Numbis وليس العميل LocalHost عند تقديم الطوبولوجيا
4. لا يمكن الحصول على الفول الذي تم إنشاءه من خلال التعليقات التوضيحية في الترباس العاصفة لأداء العمليات المقابلة
حل
قبل التكامل ، نحتاج إلى معرفة طريقة بدء تشغيل التمهيد الربيعي المقابل (إذا كنت تقرأ هذه المقالة ، افتراضيًا ، لقد تعلمت بالفعل واستخدمت العاصفة والكافكا والربيع)
هناك أمثلة قليلة على دمج العاصفة في Spring Boot على الإنترنت ، ولكن بسبب الاحتياجات المقابلة ، ما زلنا بحاجة إلى الاندماج.
أول استيراد حزمة الجرة المطلوبة:
<Rependency> <roupeD> org.apache.kafka </rougiD> <StifactId> kafka-clients </intifactid> <splection> 0.10.1.1 </version> </sependency> <redence> <roupiD> org.springframework.cloud </groupid> <StifactId> zookeeper </suntifactid> <roupiD> org.apache.zookeeper </rougeid> </section> <Sective> <tractid> spring-boot-actuator </artifactid> <traceId> org.springframework <roughId> org.apache.kafka </rougiD> </section> </section> </section> </reperency> <reperency> <roupiD> org.springframework.kafka </rougiD> </artifactid> spring-kafka </shantifactid> <Shansed> <roughid> org.apache.kafka </rougiD> </sective> </simplusions> </reperency> <sependency> <roupiD> org.springframework.data </rougiD> <StifactId> spring-data-hadoop </stifactid> <roughId> org.slf4j </rougiD> <StifactId> slf4j-log4j12 </stifactid> </esparusion> <Section> <StifactId> commons-logging </artifactid> </groupiD> logging </suplusiD> </qualusion> <rougiD> io.netty </rougiD> </section> <Section> <StifactId> Jackson-Core-Asl </stifactid> <roupiD> org.codehaus.jackson </groupid> </arganus> <Sective> <StifactId> jettison </stifactid> <rougiD> org.codehaus.jettison </rougiD> </section> <Sective> <StifactId> Jackson-Mapper-Asl </shantifactid> <roupiD> org.codehaus.jackson </groupiD> <sivactid> <roupiD> org.codehaus.jackson </rougiD> </section> <arcorusion> <StifactId> snappy-java </shintifactid> <roupiD> org.xerial.snappy </rougiD> </section> <Section> <SectifactId> jackson-xc </artifactid> </section> <Sective> <StifactId> guava </stifactid> <roupiD> com.google.guava </groupId> </section> <Section> <StifactId> hadoop-mapreduce-core </shifactid> <roupachid> org.apache.hadoop </rougeid> <StifactId> zookeeper </artifactId> <roupiD> org.apache.zookeeper </rougeid> </section> <Section> <roughid> org.apache.zookeeper </rougiD> <StifactId> zookeeper </shintifactid> <الإصدار> 3.4.10 </version> <siversions> <Section> <StifactId> slf4j-log4j12 </artifactId> org.slf4j </group <ProwEd> org.apache.hbase </rougiD> <StifactId> hBase-Client </stifactid> <الإصدار> 1.2.4 </version> <Section> <Section> <StifactId> log4j </sofactid> <roughid> org.apache.zookeeper </rougiD> </section> <arcorusion> <StifactId> netty </shintifactid> <rouckid> io.netty </rougiD> </assact> <Section> <StifactId> hadoop-common </stifactid> <StifactId> goaava </stifactid> <rougiD> com.google.guava </groupId> </section> </section> <Section> <roughid> org.apache.hadoop </rougiD> </section> <Section> <StifactId> slf4j-log4j12 </shintifactid> <roupiD> org.slf4j </groupid> </simplusion> <StifactId> hadoop-common </stifactid> <الإصدار> 2.7.3 </version> <siversions> <Section> <StifactId> logging commons </stifactid> <CropeId> commons-logging </suntifactid> <roughid> org.apache.curator </groupId> </section> <Section> <StifactId> Jackson-Mapper-Asl </stifactid> <roupiD> org.codehaus.jackson </rougeid> </section> <Section> </section> <Sective> <StifactId> log4j </stifactid> <rougiD> log4j </rougeid> </section> <Section> <roupiD> org.apache.zOokeePer </rougiD> </section> <arcorusion> <StifactId> guava </shintifactid> <rouckid> com.google.guava </rougiD> </assact> <Section> <SectifactId> hadoop-auth </artifactId> أو groupiD> <StifactId> commons-lang </stifactid> <roupiD> commons-lang </rougiD> </section> <Section> <StifactId> slf4j-log4j12 </shantifactid> <roupiD> org.slf4j </groupid> <GORETID> javax.servlet </rougiD> </section> </section> </sependency> <reperency> <roupiD> org.apache.hadoop </rougeid> <StifactID> hadoop-mapreduce-examples </artifactid> <ProupId> commons-logging </rougiD> </section> <Section> <StifactId> netty </suntifactid> <rouplyid> io.netty </rougeid> </assclusion> <StifactId> log4j </suntifactid> <rougiD> log4j </rougiD> </section> <Section> <StifactId> servlet-api </shintifactid> <roupiD> javax.servlet </groupid> </sistmar> </arvisions> </repency> <! <StifactId> Storm-core </shintifactid> <soph> $ {storm.version} </version> <scope> $ {support.scope} </scope> <Sexpasions> <Section> <rouciD> org.apache.logging.log4j </rougeid> <tractiD> log4j-slf4j-impl <StifactId> servlet-api </stifactid> <roupiD> javax.servlet </groupId> </section> </requortions> </sependency> <reperence> <roupiD> org.apache.storm </groupid> <StifactId> kafka-clients </artifactid> <roupled> org.apache.kafka </rougeid> </section> </requstions> </premency>تتم إزالة حزمة JAR نظرًا لوجود تبعيات متعددة تتعلق بتبادل بناء المشروع. إصدار العاصفة هو 1.1.0 التبعيات ذات الصلة بالحذاء الربيعي
`` `جافا
<!-Boot SPRING-> <REPERENCED> <VERLED> org.springframework.boot </groupID> <STIFACTID> spring-boot-starter </stifactid> <visplusions> <secaration> <roucid> org.springframework.boot </groupid> </reperency> <redency> <roupiD> org.springframework.boot </rougiD> <StifactId> Spring-boot-starter-web </frinsid> </premited> <redence> <roupiD> org.springframework.boot </groupid> <roupl> org.springframework.boot </groupId> <Stifactid> Spring-boot-starter-test </artifactid> <scope> test </scope> </reperence> <redenced> <roupid> org.springframework.boot </groupid> <Rependency> <roupeD> org.mybatis.spring.boot </rougiD> <intifactid> mybatis-spring-boot-starter </shintifactid> <splection> $ {mybatis-spring <StifactId> معالجات الربيع-boot-boot-processor </artifactid> <اختياري> صحيح </incortial> </perendency>ملاحظة: حزمة جرة Maven ليست الأكثر تبسيطًا بسبب متطلبات استخدام المشروع. إنه للرجوع إليه فقط.
هيكل المشروع:
تكوين ملفات التكوين في بيئات مختلفة
تخزين فئات التنفيذ ذات الصلة بحذاء الربيع مثل اسم البناء
عند بدء تشغيل SPRING BOOT سنجد
في الواقع ، قبل البدء في التكامل ، لم أكن أعرف القليل عن العاصفة ، التي لم تكن على اتصال في البداية. في وقت لاحق ، وجدت أنه بعد الاندماج في SPRING BOOT ، لم يكن لدي الطريقة المقابلة لإثارة وظيفة ارتكاب TOPOLGY بعد بدء BOOT Spring ، لذلك اعتقدت أنه بعد بدء Boot Boot ، لقد انتهيت. نتيجة لذلك ، انتظرت لمدة نصف ساعة ولم يحدث شيء قبل أن أجد أن وظيفة تشغيل الالتزام لم يتم تنفيذها.
لحل هذه المشكلة ، فكرتي هي: ابدأ SPRING BOOT-> إنشاء موضوع استماع Kafka وابدأ TopOlgy لإكمال بدء التشغيل. ومع ذلك ، لمثل هذه المشكلة ، فإن كافكا الاستماع إلى الموضوع سيؤدي مرارًا وتكرارًا إلى أن يؤدي إلى Topolgy ، وهو ما لا نريده من الواضح أنه ليس ما نريده. بعد المشاهدة لفترة من الوقت ، وجدت أن الربيع لديه بدء تشغيل ذي صلة وتنفيذ طريقة زمنية معينة بعد اكتماله. هذا منقذ لي. لذا أصبحت فكرة تشغيل Topolgy:
START Spring Boot -> تنفيذ طريقة المشغل -> أكمل حالة المشغل المقابلة
طريقة البناء هي:
/** * Author Leezer * date 2017/12/28 * إرسال طوبولوجيا تلقائيًا بعد تحميل الربيع **/ @configuration @componentpublic class autoload prections applicationlistener <Intextrefreshedevent> {private Static String brokerzkstr ؛ موضوع سلسلة ثابتة خاصة ؛ مضيف سلسلة ثابتة خاصة ؛ منفذ السلسلة الثابتة الخاصة ؛ Autoload العامة ( @value ("$ {storm.brokerzkstr}") سلسلة brokerzkstr ، @value ("$ {zookeeper.host}") string host ، value ("$ {{zookeeper.port}") String Port ، value ($ {kafka.def-topic}}} وسيط مضيف = مضيف ؛ الموضوع = الموضوع ؛ منفذ = منفذ ؛ } Override public void onapplicationEvent (حدث contextrefreshedevent) {try {// instantiate the TomologyBuilder Class. tomologybuilder topologybuilder = new topologybuilder () ؛ // قم بتعيين عقدة الثوران وقم بتخصيص رقم التزامن. سيتحكم رقم التزامن في عدد مؤشرات ترابط الكائن في الكتلة. BrokerHosts Brokehosts = Zkhosts new (BrokerZkstr) ؛ // قم بتكوين موضوع الاشتراك في Kafka ، بالإضافة إلى دليل عقدة البيانات والاسم في Zookeeper SpoutConfig = new spoutConfig (BrokerHosts ، Topic ، "/Storm" ، "S32") ؛ spoutConfig.Scheme = جديد SchemeAsmultisCheme (New StringsCheme ()) ؛ spoutConfig.zkservers = collections.singletonlist (Host) ؛ spoutConfig.zkport = integer.parseint (port) ؛ // read spoutConfig.StartoffSettime = OffSetRequest.LatestTime () ؛ kafkaspout جهاز استقبال = kafkaspout جديد (spoutConfig) ؛ topologybuilder.setspout ("kafka-Spout" ، جهاز الاستقبال ، 1) .SetNumTasks (2) ؛ topologybuilder.setbolt ("Alarm-Bolt" ، New Aravolt () ، 1) .SetNumTasks (2) .ShuffleGrouping ("Kafka-Spout") ؛ config config = جديد config () ؛ config.setDebug (false) ؛ /* قم بتعيين عدد فتحات الموارد التي يريد الهيكل الاستيلاء عليها في مجموعة العاصفة. فتحة تتوافق مع عملية العامل على عقدة المشرف. إذا تجاوز عدد المواقع التي تخصصها عدد العمال الذي تتمتع به العقدة المادية الخاصة بك ، فقد يكون التقديم غير ناجح. الانضمام إلى المجموعة الخاصة بك ، وهناك بالفعل بعض الهيكل على ذلك ، وهناك موارد عاملان متبقية. إذا قمت بتخصيص 4 طوبولوجيا إلى الكود الخاص بك ، فيمكن تقديم هذا الطوبولوجيا ، ولكن بعد الالتزام ، ستجد أنه لا يعمل. وعندما تقتل بعض الطوبولوجيا وإطلاق بعض الفتحات ، فإن طوبولوجياك ستستأنف التشغيل العادي. */ config.setnumworkers (1) ؛ cluster localcluster = new LocalCluster () ؛ cluster.submittopology ("kafka-spout" ، config ، topologybuilder.createTopology ()) ؛ } catch (استثناء e) {E.PrintStackTrace () ؛ }}}ملحوظة:
عند بدء المشروع ، قد يتم الإبلاغ عن الخطأ التالي لأنه يستخدم tomcat المضمن لبدء التشغيل.
[tomcat-startstop-1] خطأ oacccontainerbase-فشلت حاوية طفل أثناء startjava.util.concurrent.executionException: org.apache.catalina.lifecyclexception: فشل في بدء تشغيل المكون [tomcat] java.util.concurrent.futureTask.Report (futuretask.java:122) ~ [؟: 1.8.0_144] على java.util.concurrent.futureTask.get (futuretask.java:192) ~ [؟: 1.8.0_144] org.apache.catalina.core.containerbase.startinternal (ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar: 8.5.23] at org.apache.catalina.core.standardhost.startinternal (StandardHost.Java:872) [tomcat-embed-core-8.5.23.jar: 8.5.23] في org.apache.catalina.util.lifecyclebase.start (Lifecyclebase.java:150) org.apache.catalina.core.containerbase $ startchild.call (containerbase.java:1419) [tomcat-embed-core-8.5.23.jar: 8.5.23] في java.util.concurrent.futureTask.Run $$ capture (futuretask.java:266) java.util.concurrent.threadpoolexecutor.runworker (threadpoolexecutor.java:1149) java.util.concurrent.threadpoolexecutor $ worker.run (threadpoolexecutor.java:624) [؟: 1.8.0_144] في java.lang.thread.run (thread.java:748) [؟: 1.8.0_144]
وذلك لأن حزمة JAR المستوردة المقابلة تقدم إصدار Servlet-API أقل من الإصدار المدمج. كل ما نحتاج إلى فعله هو فتح تبعية Maven وإزالته
<Section> <StifactId> servlet-api </stifactid> <roupiD> javax.servlet </rougeid> </Section>
ثم أعد التشغيل.
من الممكن الإبلاغ أثناء بدء التشغيل:
نسخة الكود كما يلي:
org.apache.storm.utils.nimbusleadernotfoundexception: لم يستطع العثور على القائد Nimbus من مضيفي البذور [LocalHost]. هل حددت قائمة صالحة لمضيفي Nimbus لـ config nimbus.seeds؟ في org.apache.storm.utils.nimbusclient.getConfiguredclientas (nimbusclient.java:90
فكرت في هذه المشكلة لفترة طويلة ووجدت أن التفسيرات عبر الإنترنت كانت كلها ناتجة عن مشكلة تكوين العاصفة ، ولكن تم نشر العاصفة على الخادم. لا يوجد تكوين ذي صلة. من الناحية النظرية ، يجب أن نقرأ أيضًا التكوين ذي الصلة على الخادم ، ولكن النتيجة ليست كذلك. أخيرًا ، جربت عدة طرق ووجدت أنه كان خطأ. هنا وجدت أنه عند بناء المجموعة ، قدمت العاصفة المجموعة المحلية المقابلة.
cluster localcluster = new LocalCluster () ؛
إجراء الاختبارات المحلية. إذا كنت تختبر محليًا ، فاستخدمه في اختبارات النشر. إذا تم نشرها على الخادم ، فأنت بحاجة إلى:
cluster.submittopology ("kafka-spout" ، config ، topologybuilder.createTopology ()) ؛ // ثابتة إلى: stormsubmitter.submittopology ("kafka-spout" ، config ، tomologybuilder.createTopology ()) ؛إجراء تقديم المهمة ؛
ما سبق يحل المشكلات أعلاه 1-3
السؤال 4: أنا أستخدم مثيل الفول ذي الصلة في بولت ، وجدت أنه لا يمكنني الحصول على المثيل إذا وضعته في الربيع باستخدام component: أعتقد أنه عندما نبني topolgy الالتزام ، سيكون في:
نسخة الكود كما يلي:
topologybuilder.setbolt ("Alarm-Bolt" ، New Aravolt () ، 1) .SetNumTasks (2) .ShuffleGrouping ("Kafka-Spout") ؛
تنفيذ الترباس ذات الصلة:
Override public void إعداد (MAP StormConf ، SOMPOLOTYCONTEXT CONTEXT ، OUTPUTCOLLECTOR COLLECTOR) {this.collector = collector ؛ StormLauncher StormLauncher = StormLauncher.getStormLauncher () ؛ datarepositorys = (erastDatArepository) StormLauncher.getBean ("ARARARDDATAREPOSTORYS") ؛ }بدون إنشاء الترباس ، تختلف الخيوط ولا يمكن الحصول على الربيع. (لا أفهم ذلك كثيرًا هنا ، إذا كان رجل كبير يعلم ، يمكنك مشاركته)
معنى استخدام صندوق الربيع هو أنه يتم الحصول على هذه الكائنات المعقدة. هذه المشكلة قد أزعجتني لفترة طويلة. أخيرًا ، اعتقدت أنه يمكننا الحصول على مثيلات من خلال السياق GetBean ولا أعرف ما إذا كان يمكن أن ينجح ، لذلك بدأت في تحديد:
على سبيل المثال ، أحتاج إلى استخدام خدمة في بولت:
/*** Author Leezer* date 2017/12/27* وقت فشل عملية التخزين **/ @service ("ARARARDDATAREPOSTORYS") / *** param type type* @param value key key value* @return number of errors **/ Override public geterrnumfromredis (نوع السلسلة ، مفتاح السلسلة) {if (type == null || key == null) {return null ؛ } else {valuesOperations <string ، string> falleoper = primaryStringRediStemplate.OpSforValue () ؛ return valueoper.get (string.format ("٪ s: ٪ s: ٪ s" ، erro ، type ، key)) ؛ }} / *** param نوع الخطأ نوع* @param مفتاح القيمة* param قيمة القيمة المخزنة ** / Override public void seterrnumtoredis (نوع السلسلة ، مفتاح السلسلة ، قيمة السلسلة) {try {valueToRations <string ، string> valuePer = primaryStringStringRistemplate.OpSforValue () ؛ valuePer.set (string.format ("٪ s: ٪ s: ٪ s" ، erro ، type ، key) ، value ، dictionaries. } catch (استثناء e) {logger.info (Dictionaries.redis_error_prefix+string.format ("فشل المفتاح في تخزين redis في ٪ s" ، المفتاح)) ؛ }}هنا أحدد اسم الفول ، وعندما ينفذ Bolt: استخدم طريقة getBean للحصول على الفول ذي الصلة وإكمال العملية المقابلة.
ثم يتم إرسال موضوع الاشتراك Kafka إلى الترباس الخاص بي للمعالجة ذات الصلة. طريقة getBean هنا هي بدء تعريف وظيفة Bootmain:
@SpringBootApplication@enableTransActionManagement@componentscan ({"service" ، "storm"})@enablemongorepositories (basePackages = {"storm"})@propertySource (value = {"classpath: service.properties" ، "class.properties". {"classpath: /configs/spring-hadoop.xml" ، "classpath: /configs/spring-hbase.xml"}) يمتد StormLauncher من الطبقة العامة stormlauncher stormlauncher // قم بتعيين سياق Context Private ApplicationContext ؛ public static void main (string [] args) {SpringApplicationBuilder Application = new SpringApplicationBuilder (StormLauncher.class) ؛ // application.web (false) .run (args) ؛ هذه الطريقة هي أن SPRING BOOT لا يبدأ Application.run (args) ؛ StormLauncher s = New StormLauncher () ؛ S.SetApplicationContext (application.context ()) ؛ setStormLauncher (s) ؛ } private static void setStormLauncher (StormLauncher StormLauncher) {StormLauncher.StormLauncher = StormLauncher ؛ } static static static getStormLauncher () {return StormLauncher ؛ } override محمية springapplicationbuilder تكوين (تطبيق springapplicationBuilder) {return application.sources (StormLauncher.Class) ؛ } / ** * احصل على السياق * * @RETURN سياق التطبيق * / Application Conntext getApplicationContext () {return context ؛ } /*** اضبط السياق. * * param AppContext Context */ private void setapplicationContext (ApplicationContext AppContext) {this.context = AppContext ؛ } /*** احصل على مثيل الفول من خلال اسم مخصص. * * param اسم الاسم * @RETURN The Bean */ Public Object GetBean (اسم السلسلة) {return context.getBean (name) ؛ } /*** احصل على الفول من خلال الفصل. * * param <T> type parameter * param clazz the clazz * @return the bean */ public <t> t getBean (class <t> clazz) {return context.getBean (clazz) ؛ } / ** * إرجاع الفول المحدد بالاسم ، و clazz * * param <T> المعلمة النوع * param اسم اسم * param clazz the clazz * regh the bean * / public <t> t getBean (اسم السلسلة ، الفئة <T> clazz) {return context.getbean (name ، clazz) ؛ }انتهى تكامل العاصفة و Kafka إلى SPRING BOOT. سأضع الكافكا ذات الصلة والتكوينات الأخرى في جيثب
بالمناسبة ، هناك أيضًا حفرة kafkaclient هنا:
توفي حلقة Async! java.lang.nosuchmethoderror: org.apache.kafka.common.network.networksend.
سيقوم المشروع بالإبلاغ عن مشكلة عميل Kafka ، لأنه في Storm-Kafka ، يستخدم Kafka الإصدار 0.8 ، بينما Networksend هو الإصدار 0.9 أو أعلى. يجب أن يكون التكامل هنا متسقًا مع الإصدار المتعلق بالكافكا الذي تدمجه.
على الرغم من أن التكامل بسيط نسبيًا ، إلا أن هناك بعض المراجع. بالإضافة إلى ذلك ، لقد بدأت للتو في الاتصال بالعاصفة ، لذلك أعتقد الكثير. سوف أسجله أيضًا هنا.
عنوان المشروع - جيثب
ما سبق هو كل محتوى هذه المقالة. آمل أن يكون ذلك مفيدًا لتعلم الجميع وآمل أن يدعم الجميع wulin.com أكثر.