序文
ビジネスニーズのため、StromとKafkaはSpring Bootプロジェクトに統合する必要があり、その他のサービス出力ログはKafkaサブスクリプショントピックにログを出力する必要があります。ストームは、このトピックをリアルタイムで処理して、データ監視やその他のデータ統計を完了します。ただし、オンラインチュートリアルはほとんどありません。今日私が書きたいのは、Storm+KafkaをSpring Bootに統合する方法です。ところで、私が遭遇した落とし穴について話します。
使用ツールと環境構成
1。JavaバージョンJDK-1.8
2。コンピレーションツールでは、アイデア-2017を使用します
3。プロジェクト管理としてのMaven
4.Spring Boot-1.5.8.Release
需要症状
1。なぜスプリングブーツに統合する必要があるのですか
スプリングブートを使用してさまざまなマイクロサービスを均一に管理し、複数の分散構成を同時に回避するため
2。統合の具体的なアイデアと理由
スプリングブートを使用して、Kafka、Storm、Redisなどに必要な豆を管理し、他のサービスログを介してKafkaにそれらを収集し、ストロムボルトのときに対応する処理操作を実行して実行します
遭遇した問題
1.スプリングブートを使用する場合、関連する統合ストームはありません
2.スプリングブーツでトポルジーのコミットをトリガーする方法がわかりません
3.トポロジを送信する際にクライアントのローカルホストではなく、麻痺の問題に遭遇しました
4.インスタンス化された豆は、ストームボルトでの注釈を通じて取得して、対応する操作を実行することはできません
解決
統合の前に、対応するスプリングブートの起動方法と構成を知る必要があります(この記事を読んでいる場合、デフォルトでは、既にStorm、Kafka、Spring Bootを学習して使用しています)
インターネット上のSpring BootにStormを統合する例はほとんどありませんが、対応するニーズのため、統合する必要があります。
最初に必要なJARパッケージをインポートします:
<Dependency> groupid> org.apache.kafka </groupid> <artifactid> kafka-clients </artifactid> <version> 0.10.1.1 </version> </dependency> <sependency> <shiplency> <groupid> org.springframework.cloud </groupid> <artifactid> spring-startertrertremy-kafka> <explusion> <artifactid> zookeeper </artifactid> <groupid> org.apache.zookeeper </groupid> </exclusion> <explusion> <artifactid> spring-boot-actuator </artifactid> <groupid> org.springframework.boot </groupid </explusion> < <artifactid> kafka-clients </artifactid> <groupid> org.apache.kafka </groupid> </expurision> </exclusion> </explusion> </dependency> <redency> <groupid> org.springframework.kafka </groupidid> spring- kafka <Artifactid> kafka-clients </artifactid> <groupid> org.apache.kafka </groupid> </explusion> </exclusions> </dependency> <dependency> groupid> org.springframework.data </groupid> <artifactid> spring-hadoop </artifactid> < <explusions> <explusion> <groupid> org.slf4j </groupid> <artifactid> slf4j-log4j12 </artifactid> </exclusion> <excifactid> commons-logging </artifactid> <groupid> commons-logging </groupid> </explusion> </explusion> </</</</</</</</</</</</</</</</explusion <groupid> io.netty </groupid> </exclusion> <excifactid> jackson-core-asl </artifactid> <groupid> org.codehaus.jackson </groupid> </exclusion> <exclusion> <artifactid> artifactid> curius-cligus </artifactid> </explius. <artifactid> jettison </artifactid> <groupid> org.codehaus.jettison </groupid> </expurision> <explusion> <artifactid> </artifactid> <groupid> org.codehaus.jackson </groupid> </excifidid <groupid> org.codehaus.jackson </groupid> </exclusion> <explusion> <artifactid> snappy-java </artifactid> <groupid> org.xerial.snappy </groupid> </explusion> <excifactid> </exclusion> <explusion> <artifactid> guava </artifactid> <groupid> com.google.guava </groupid> </exclusion> <explusion> <artifactid> hadoop-mapreduce-client-core </artifactid> <groupid> org.apache.hadoop <Artifactid> ZooKeeper </artifactid> <groupId> org.apache.zookeeper </groupId> </exclusion> <exclusion> <artifactid> artifactid> <groupid> javax.servlet.servlet </groupid> </exclusion> </expench <groupid> org.apache.zookeeper </groupid> <artifactid> zookeeper </artifactid> <version> 3.4.10 </version> <explusions> <excifactid> slf4j4j12 </artifactid> </groupid> org. <groupid> org.apache.hbase </groupid> <artifactid> hbase-client </artifactid> <bersion> 1.2.4 </version> <explusions> <excifactid> log4j </artifactid> <groupid> log4j </groupid> </excifactid> <artifactid> zoekeeper <groupid> org.apache.zookeeper </groupid> </exclusion> <explusion> <artifactid> netty </artifactid> <groupid> io.netty </groupid> </exclusion> <artifactid> hadoop-common </artifactid> </exclusion. <artifactid> guava </artifactid> <groupid> com.google.guava </groupId> </expurision> </expurision> <explusion> <artifactid> hadoop-annotations </artifactid> <グループgroupid> org.apache.hadoop </groupid> <GroupId> org.apache.hadoop </groupid> </expurision> <exclusion> <artifactid> slf4j-log4j12 </artifactid> <groupid> org.slf4j </groupid> </explusion> </explusions> </dependency> <sheplency <artifactid> hadoop-common </artifactid> <バージョン> 2.7.3 </version> <explusions> <exclusion> <excifactid> commons-logging </artifactid> <groupid> commons-logging </artifactid> <groupid> commons-logging </groupid> </excifactid> <expactid> <curusion> <curusion> <groupid> org.apache.curator </groupid> </exclusion> <exclusion> <artifactid> jackson-mapper-asl </artifactid> <groupid> org.codehaus.jackson> </explusion> <excifactid> jackson> </artifactid> <グループ</exclusion> <exclusion> <artifactid> log4j </artifactid> <groupId> log4j </groupId> </expurision> <exclusion> <artifactid> snappy-java </artifactid> <groupid> org.xerial.snappy </groupid> </excifactid> <quartifactid> <artifactid> <groupid> org.apache.zookeeper </groupid> </exclusion> <explusion> <artifactid> guava </artifactid> <groupid> com.google.guava </groupid> </exclusion> <exclusion> <artifactid> artifactid> hadoop-auth </artifactid> <groupid> <artifactid> commons-lang </artifactid> <groupid> commons-lang </groupid> </expurision> <explusion> <artifactid> slf4j-log4j12 </artifactid> <groupid> org.slf4j </groupid> </exclusion> <exprusion> <GroupId> Javax.Servlet </groupId> </exclusion> </exclusion> </dependency> <dependency> groupid> org.apache.hadoop </groupid> <artifactid> hadoop-mapreduce-examples </artifactid> <バージョン> 2.7.3 </バージョン> <explusion> <explusion> <expactid> <groupId> commons-logging </groupid> </expurision> <explusion> <artifactid> netty </artifactid> <groupid> io.netty </groupid> </exclusion> <explusion> <artifactid> guava </artifactid> <groupid> com.google <artifactid> log4j </artifactid> <groupid> log4j </groupId> </explusion> <explusion> <artifactid> servet-api </artifactid> <groupid> javax.servlet </groupid> </exclusion> </explusions> </depence> < <artifactid> storm-core </artifactid> <version> $ {storm.version} </version> <scope> $ {redument.scope} </scope> <explusions> <exclusion> <exclusion> <groupid> org.apache.logging.log4j </groupid> <artifactid> log4j-slf4j-impl </</</</</</</</> <artifactid> servlet-api </artifactid> <groupid> javax.servlet </groupid> </explusion> </exclusions> </dependency> <依存関係> org.apache.storm </groupid> <artifactid> storm-kafka </artifactid> <バージョン> 1.1.1 </バージョン> < <artifactid> kafka-clients </artifactid> <groupid> org.apache.kafka </groupid> </explusion> </explusions> </dependency>プロジェクトの構築依存関係に関連する複数の依存関係があるため、JARパッケージは削除されます。ストームバージョンは1.1.0スプリングブート関連の依存関係です
`` Java
<! - スプリングブート - > <依存関係> <groupId> org.springframework.boot </groupid> <artifactid> spring-boot-starter </artifactid> <exclusion> <explusion> <groupid> org.springframework.boot </groupid> <artifactid> </sprugidid> </dependency> <dependency> groupid> org.springframework.boot </groupid> <artifactid> spring-boot-starter-web </artifactid> </dependency> <sependency> <groupid> org.springframework.boot </groupid> <artifactid> spring-boot-aop </artifactid> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupid> org.mybatis.spring.boot </groupid> <artifactid> mybatis-spring-boot-starter </artifactid> <version> $ {mybatis-spring.version} </version> </dependency> <seprodency> <seprodency> org.springframework.boot </groupid> <artifactid> spring-boot-configuration-processor </artifactid> <optional> true </optional> </dependency>PS:MavenのJARパッケージは、プロジェクトの使用要件のために最も合理化されていません。それはあなたの参照のためだけです。
プロジェクト構造:
さまざまな環境での構成構成ファイル
ストレージビルドスプリングブーツ関連の実装クラスは、ビルド名など
スプリングブートを開始すると、見つかります
実際、統合を開始する前に、私は最初に接触していなかったストームについてほとんど知りませんでした。後で、Spring Bootに統合した後、Spring Bootを起動した後にトポルジーをコミットする機能をトリガーする対応する方法がなかったので、Spring Bootを起動した後、私は終わったと思いました。その結果、私は30分待っていましたが、コミットをトリガーする機能が実装されていないことがわかった前に、何も起こりませんでした。
この問題を解決するために、私のアイデアは次のとおりです。スプリングブートスタート - >カフカリスニングトピックを作成し、トポルギーを開始してスタートアップを完了します。しかし、このような問題のために、カフカはトピックを繰り返し聞いてトポルギーを繰り返し引き起こしますが、これは明らかに私たちが望むものではありません。しばらく見ていた後、Springには関連するスタートアップがあり、完了後に特定の時間方法を実行することがわかりました。これは私にとって救い主です。したがって、トポルギーを引き起こすという考えは次のとおりです。
スプリングブートを起動 - >トリガーメソッドを実行 - >対応するトリガー条件を完了します
建設方法は次のとおりです。
/** * @author leezer * @date 2017/12/28 *スプリングが読み込まれた後にトポロジを自動的に送信する**/ @configuration @componentpublic class autoload applicationlistener <contextrefreshedevent> {private sthing string brokerzkstr;プライベート静的文字列トピック。プライベート静的文字列ホスト。プライベート静的文字列ポート。 public Autoload(@value( "$ {storm.brokerzkstr}")String brokerzkstr、 @value( "$ {zookeeper.host}")string host、@value( "$ {zookeeper.port}")string port、 @value( "$ {kafka.default wrokerzttring}") ")") host = host;トピック=トピック; port = port; } @Override public void onapplicationEvent(contextrefreshedevent event){try {// TopologyBuilderクラスをインスタンス化します。 TopologyBuilder TopologyBuilder = new TopologyBuilder(); //噴火ノードを設定し、並行性番号を割り当てます。並行性番号は、クラスター内のオブジェクトのスレッドの数を制御します。 BrokerHosts BrokeHosts = new Zkhosts(Brokerzkstr); // kafkaサブスクリプションのトピックと、Zookeeperのデータノードディレクトリと名前を構成しますspoutconfig = new SpoutConfig(BrokerHosts、Topic、 "/storm"、 "s32"); spoutconfig.scheme = new schemeasmultischeme(new Stringscheme()); spoutconfig.zkservers = collections.singletonlist(host); spoutconfig.zkport = integer.parseint(port); // spoutconfig.startoffsettime = offsetRequest.latesttime()を読むKafkaspout Receiver = new Kafkaspout(SpoutConfig); TopologyBuilder.Setspout( "Kafka-Spout"、Receiver、1).setnumtasks(2); topologyBuilder.setBolt( "alarm-bolt"、new alAmbolt()、1).setnumtasks(2).shufflegrouping( "kafka-spout"); config config = new config(); config.setDebug(false); /*トポロジーがストームクラスターで押収したいリソーススロットの数を設定します。スロットは、スーパーバイザーノードのワーカープロセスに対応します。割り当てられるスポットの数が、物理ノードが持っている労働者の数を超えると、提出が失敗する可能性があります。クラスターに参加して、すでにいくつかのトポロジーがあり、2つの労働者リソースが残っています。 4つのトポロジーをコードに割り当てると、このトポロジを提出できますが、コミットした後、実行されていないことがわかります。トポロジーを殺してスロットをリリースすると、トポロジーは通常の操作を再開します。 */ config.setnumworkers(1); localcluster cluster = new localcluster(); cluster.submittopology( "kafka-spout"、config、topologybuilder.createTopology()); } catch(Exception e){e.printstacktrace(); }}}注記:
プロジェクトを開始するとき、スタートアップに埋め込まれたTomcatを使用しているため、次のエラーが報告される場合があります。
[Tomcat-StartStop-1]エラーoacccontainerbase-startjava.util.concurrent.executionexceptionで失敗した子のコンテナ:org.apache.catalina.lifecycleexception:component [starnderengine [tomcat] .standardhost [] .tomcatembededdedcontextext [] .tomcatembededdeddeddedcontextex java.util.concurrent.futuretask.report(futuretask.java:122)〜[?:1.8.0_144] at java.util.concurrent.futuretask.get(futuretask.java:192)〜[::1.8.0_144] at org.apache.catalina.core.containerbase.startinternal(containerbase.java:939)[tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.standardhost.startinternal(Standardhost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.util.lifecyclebase.start(lifecyclebase.java:150)[tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.containerbase $ startchild.call(containerbase.java:1419)[tomcat-embed-core-8.5.23.jar:8.5.23] atorg.apache.catalina.core.core.core.containerbase [tomcat-embed-core-8.5.23.jar:8.5.23] at java.util.concurrent.futuretask.run $$ capture(futuretask.java:266)[?:1.8.0_144] at java.util.concurrent.futuretask.run(1.8.java) java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1149)[?:1.8.0_144] at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1149)[? java.util.concurrent.threadpoolexecutor $ worker.run(threadpoolexecutor.java:624)[?:1.8.0_144] at java.lang.thread.run(thread.java:748)[?:1.8.0_1444]]
これは、対応するインポートされたJARパッケージが、組み込みバージョンよりも低いサーブレットAPIバージョンを導入するためです。私たちがする必要があるのは、メイベン依存関係を開いて削除することだけです
<Exclusion> <Artifactid> servlet-api </artifactid> <groupId> javax.servlet </groupId> </exclusion>
その後、再起動します。
起動中に報告することができます:
コードコピーは次のとおりです。
org.apache.storm.utils.nimbusleadernotfoundexception:シードホスト[Localhost]のリーダーニンバスを見つけることができませんでした。 confignimbus.seeds?at org.apache.storm.tils.nimbusclient.getConfiguredClientas(nimbusclient.java:90のnimbusホストの有効なリストを指定しましたか?
私はこの問題について長い間考えていましたが、オンラインでの説明はすべてストーム構成の問題によって引き起こされていることを発見しましたが、私のストームはサーバーに展開されています。関連する構成はありません。理論的には、サーバー上の関連する構成も読み取る必要がありますが、結果はそうではありません。最後に、いくつかの方法を試してみましたが、それが間違っていることがわかりました。ここでは、クラスターを構築するときに、ストームが対応するローカルクラスターを提供することがわかりました。
localcluster cluster = new localcluster();
ローカルテストを実行します。ローカルでテストしている場合は、展開テストに使用してください。サーバーに展開された場合、次のことが必要です。
cluster.submittopology( "kafka-spout"、config、topologybuilder.createTopology()); //固定:stormsubmitter.submittopology( "kafka-spout"、config、topologybuilder.createTopology());
タスクの提出を実施します。
上記は上記の問題を解決します1-3
質問4:私はBoltで関連するBeanインスタンスを使用しています。@Componentを使用して春にそれを置くと、インスタンスを取得できないことがわかりました。
コードコピーは次のとおりです。
topologyBuilder.setBolt( "alarm-bolt"、new alAmbolt()、1).setnumtasks(2).shufflegrouping( "kafka-spout");
実行ボルト関連:
@Override public void prepare(Map StormConf、TopologyContext Context、outputCollector Collector){this.collector = collector; Stormlauncher stormlauncher = stormlauncher.getstormlauncher(); datarepositorys =(alarmdatarepositorys)stormlauncher.getbean( "alarmdatarepositorys"); }ボルトをインスタンス化することなく、スレッドは異なり、スプリングを取得できません。 (私はここでそれをあまり理解していません、大きな男が知っていれば、あなたはそれを共有できます)
スプリングブートを使用することの意味は、これらの複雑なオブジェクトが取得されることです。この問題は長い間私を悩ませてきました。最後に、コンテキストを通じてインスタンスを取得できると思ったので、それが機能するかどうかわからないので、定義し始めました。
たとえば、ボルトでサービスを使用する必要があります。
/*** @Author Leezer* @Date 2017/12/27*ストレージ操作失敗時間/ *** @Param Type* @Paramキーキー値* @returnエラー番号**/ @Override public String geterrnumfromfromfromfromredis(string type、string key){if(type == null || key == null){return null; } else {valueoperations <string、string> valueoper = primarystringRedistemplate.opsforvalue(); Return ValueOper.get(String.Format( "%S:%S:%S"、Erro、Type、Key)); }} / *** @paramタイプエラータイプ* @paramキーキー値* @param値格納値** / @Override public void seterrnumtoredis(文字列タイプ、文字列キー、文字列値){try {valueoperations <string、string> valueoper = primarystringredistemplate.opforvalue(); valueOper.set(String.Format( "%S:%S:%S"、ERRO、TYPE、KEY)、Value、Dictionaries.ApikeyDayoflifecycle、TimeUnit.seconds); } catch(Exception e){logger.info(dictionaris.redis_error_prefix+string.format( "keyはredisを%sに保存できなかった"、key)); }}ここでは、Beanの名前を指定し、ボルトが実行されるときに準備します。GetBeanメソッドを使用して関連する豆を取得し、対応する操作を完了します。
次に、Kafkaサブスクライブトピックが関連する処理のためにボルトに送信されます。ここでのGetBeanの方法は、ブートメイン関数定義を開始することです。
@springbootapplication@enabletransactionmanagement@componentscan({"service"、 "storm"})@enablemongorepositories(basepackages = {"storm"})@propertysource({"classpath:service.properties"、 "classpath:application.porperties"、 "classpath:smorpath:storppath" "classpath:/configs/spring-hadoop.xml"、 "classpath:/configs/spring-hbase.xml"})Public Class StormlauncherはSpringbootservletinitializer {//安全なスレッドランチャーインスタンスを設定します。 //コンテキストプライベートApplicationContextコンテキストを設定します。 public static void main(string [] args){springApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class); // application.web(false).run(args);この方法は、Spring Bootがapplication.run(args)を開始しないことです。 StormLauncher s = new Stormlauncher(); S.SetApplicationContext(application.context()); SetStormLauncher(s); } private static void setstormlauncher(stormlauncher stormlauncher){stormlauncher.stormlauncher = stormlauncher; } public static stormlauncher getstormlauncher(){return stormlauncher; } @Override Protected SpringApplicationBuilder Configure(SpringApplicationBuilder Application){return application.Sources(StormLauncher.class); } / ** * contextを取得 * * @returnアプリケーションコンテキスト * / public ApplicationContext getApplicationContext(){return Context; } /***コンテキストを設定します。 * * @Param AppContext Context */ private void setApplicationContext(applicationContext appContext){this.context = appContext; } /***カスタム名を介してインスタンスBeanを取得します。 * * @param name name * @return the Bean */ public object getBean(string name){return Context.getBean(name); } /***クラスから豆を取得します。 * * @param <t>タイプパラメーター * @param clazz the clazz * @return the Bean */ public <t> t getBean(class <t> clazz){return context.getbean(clazz); } / ** *指定されたBeanを名前で返し、Clazz * * @param <t> The Type Parameter * @Param Name * @Param Clazz the Clazz * @return the Bean * / public <t> t getBean(string name、class <t> clazz){return context.getbean(name、clazz); }StormとKafkaからSpring Bootの統合が終了しました。関連するKafkaおよびその他の構成をGithubに入れます
ちなみに、ここにはカフカクリエントピットもあります:
Asyncループは死にました! java.lang.nosuchmethoderror:org.apache.kafka.common.network.networksend。
Storm-KafkaではKafkaがバージョン0.8を使用し、NetworksEndはバージョン0.9以上であるため、このプロジェクトはKafkaクライアントの問題を報告します。ここでの統合は、統合するKafka関連バージョンと一致する必要があります。
統合は比較的簡単ですが、参照はほとんどありません。さらに、私はストームと接触し始めたばかりなので、よく考えています。ここでも録音します。
プロジェクトアドレス-Github
上記はこの記事のすべての内容です。みんなの学習に役立つことを願っています。誰もがwulin.comをもっとサポートすることを願っています。