머리말
비즈니스 요구로 인해 STROM 및 KAFKA는 Spring Boot 프로젝트에 통합되어야하며 기타 서비스 출력 로그는 Kafka 구독 주제에 통합되어야합니다. Storm은 데이터 모니터링 및 기타 데이터 통계를 완료하기 위해 주제를 실시간으로 처리합니다. 그러나 온라인 자습서는 거의 없습니다. 내가 오늘 쓰고 싶은 것은 Storm+Kafka를 Spring Boot에 통합하는 방법이며, 그건 그렇고, 내가 만난 함정에 대해 이야기 할 것입니다.
사용 도구 및 환경 구성
1. Java 버전 JDK-1.8
2. 컴파일 도구는 아이디어 -2017을 사용합니다
3. 프로젝트 관리로서의 Maven
4.spring boot-1.5.8. release
수요 증상
1. 왜 스프링 부츠에 통합해야합니까?
스프링 부트를 사용하여 다양한 마이크로 서비스를 균일하게 관리하고 동시에 여러 분산 구성을 피하기 위해
2. 특정 아이디어와 통합 이유
Spring Boot를 사용하여 Kafka, Storm, Redis 등이 요구하는 Bean을 관리하고 다른 서비스 로그를 통해 Kafka로 수집하고 실시간으로 폭풍으로 로그를 보내고 Strom Bolt시 대응하는 처리 작업을 수행하십시오.
문제가 발생했습니다
1. 스프링 부팅을 사용할 때 관련 통합 폭풍이 없습니다.
2. 스프링 부츠에서 Commit Topolgy를 트리거하는 방법을 모르겠습니다.
3. 토폴로지를 제출할 때 Client LocalHost가 아닌 Numbis와 문제가 발생했습니다.
4. 스톰 볼트의 주석을 통해 인스턴스형 콩을 얻을 수 없습니다.
해결책
통합 전에 해당 Spring Boot 스타트 업 방법 및 구성을 알아야합니다 (이 기사를 읽고있는 경우 기본적으로 이미 Storm, Kafka 및 Spring Boot를 배우고 사용했습니다).
인터넷에서 Spring Boot에서 Storm을 통합하는 예는 거의 없지만 해당 요구 때문에 여전히 통합해야합니다.
먼저 필요한 JAR 패키지를 가져옵니다.
<pectionency> <groupId> org.apache.kafka </groupid> <artifactid> kafka-clients </artifactid> <버전> 0.10.1.1 </version> </fectionency> <prection> <groupId> org.springframework.cloud </groupid> spring-cloud-starters </artifactid> </artifactid> <Artifactid> Zookeeper </artifactid> <groupid> org.apache.zookeeper </groupid> </exclusion> <artifactid> Spring-Boot-Actuator </artifactid> <groupid> org.springframework.boot.boot.boot.spramework.spramework.spramework.spramework <groupid> org.apache.kafka </groupid> </exclusion> </제외> </제외> </depectency> <prectionency> <groupId> org.springframework.kafka </groupId> <artifactid> Spring-Kafka </artifactid> <Artifactid> Kafka-clients> <groupid> org.apache.kafka </groupid> </groupid> </exclusion> </exclusions> </depectency> <pectomency> <groupId> org.springframework.data </groupid> <artifactid> spring-data-hadoop </artifactid> <bersion> 2.5.0.release </version> <groupd> org.sl> org.sl> <Artifactid> slf4j-log4j12 </artifactid> </exclusion> <exclusion> <artifactid> commons-logging </artifactid> <groupId> commons-logging </groupId> </exclusion> <ArifactID> netty </artifactId> <groupActid> </groupid> </fircusion> <artifactid> Jackson-core-asl </artifactid> <groupid> org.codehaus.jackson </groupid> </groupid> </exclusion> <artifactid> curator-client </artifactid> <groupid> org.apache.curator </groupid> </excrusion> <artifactid> jettison </artifactid> <groupid> org.codehaus.jettison </groupid> </groupid> </exclusion> <exclusion> <artifactid> Jackson-mapper-asl </artifactid> <groupid> org.codehaus.jackson </groupid> </exclusion> <arequactid> jackson-jaxrs </artifactid> <groups.jack </org </제외> <제외> <아티 팩트> Snappy-Java </artifactid> <groupid> org.xerial.snappy </groupid> </exclusion> <arevictid> <areifactid> jackson-xc </artifactid> <groupid> org.codehaus.jackson </guplusion> <artifact> <groupid> com.google.guava </groupid> </exclusion> <제외> <artifactid> hadoop-mapreduce-client-core </artifactid> <groupid> org.apache.hadoop </groupid> </groupId> <areifactid> Zookeeper </artifactid> org.apache.zooke.zooke.zooke.zooke. <제외> <artifactid> servlet-api </artifactid> <groupid> javax.servlet </groupid> </exclusion> </exclusion> </dependency> <prectionemency> <groupid> org.apache.zookeeper </groupid> <artifactid> zetifactid> 3.4.10 <fexcelus> <fexclus>> <Artifactid> slf4j-log4j12 </artifactid> <groupid> org.slf4j </groupid> </groupid> </exception> </dependency> <prectionency> <groupid> org.apache.hbase </groupid> <artifactid> hbase-client </aitifactid> 1.4 </version> </version> </version> <solusions> <Artifactid> log4j </artifactid> <groupid> log4j </groupid> </exclusion> <exclusion> <artifactid> Zookeeper </artifactid> <groupid> org.apache.zookeeper </groupid> </exclusion> <exclusion> <artifactid> netty </intifactid> <groupid> io.netty </ontty> <제외> <artifactid> hadoop-common </artifactid> <groupid> org.apache.hadoop </groupid> </exclusion> <제외> <artifactid> guava </artifactid> <groupid> com.google.guava </groupid> </excrusion> <artifactid> <groupid> org.apache.hadoop </groupid> </exclusion> <제외> <artifactid> hadoop-yarn-common </artifactid> <groupid> org.apache.hadoop </groupid> </exclusion> <ArceCACTID> slf4J-log4j12 </artifactID> <Groupg.slf4j </제외> </exclusions> </depection> <pectionement> <groupId> org.apache.hadoop </groupId> <artifactid> hadoop-common </artifactid> <bersion> 2.7.3 </version> <Arcifactid> Commons-logging </artifactid> emplons-logging </artifactid> <groupId> Commons-Logging </groupId> </제외> <ArifActId> upator-client </artifactid> <groupId> org.apache.curator.curator.curator </groupid> </exclusion> <artifactid> Jackson-Mapper-Asl </artifactId> <groupICID> </exllusion> </groupson> <artifactid> Jackson-core-asl </artifactid> <groupid> org.codehaus.jackson </groupid> </exclusion> <ArtifactID> log4J </artifactid> <groupId> log4J </groupId> <Arcefactid> snappy-java </artifactid> <groupid> org.xerial.spappy </groupid> </exclusion> <제외> <artifactid> Zookeeper </artifactid> <groupid> org.apache.zookeeper </groupid> </exclusion> <artifactid> guava </artifactid> <groupId> com.guava </fexcent> <artifactid> hadoop-auth </artifactid> <groupid> org.apache.hadoop </groupid> </exclusion> <artifactid> commons-lang </artifactid> <groupId> commons-lang </groupid> </exclusion> <Artifactid> slf4j-log4j12 </artifactid> <groupid> org.slf4j </groupid> </제외> <제외> <artifactid> servlet-api </artifactid> <groupid> javax.servlet </groupid> </제외> </fectionency> <pectionement> <groupid> org.apache.hadoop </groupid> <Artifactid> Hadoop-Mapreduce-Examples </artifactid> <bersion> 2.7.3 </version> <제외> <제외> <ArtifactID> Commons-Logging </artifactid> <groupId> Commons-Logging </groupid> </exclusion> <artifactid> netty </artifactid> </제외> <제외> <artifactid> guava </artifactid> <groupid> com.google.guava </groupid> </exclusion> <exclusion> <artifactid> log4j </artifactid> <groupId> log4J </groupid> </exclusion> <artifactid> servlet-api </artifactid> <groupid> javax.servlet </groupid> </exclusion> </exclusions> </fectionency> <!-storm-> <pectionency> <groupid> org.apache.storm </groupid> <artifactid> Storm-core </artifactid> <version> $ {storm.version} </version> $ {ondelus>} </scope>} <groupid> org.apache.logging.log4j </groupid> <artifactid> log4j-slf4j-impl </artifactid> </exclusion> <arevirctid> <artifactid> servlet-api </artifactid> <groupid> javax.servlet </gouptusion> </fexcentions> <groupid> org.apache.storm </groupid> <artifactid> Storm-Kafka </artifactid> <bersion> 1.1.1 </version> <exclusions> <exclusion> <artifactid> kafka-clients </artifactid> <groupid> org.apache.kafka </excrusion> </fexcentions>JAR 패키지는 프로젝트 구성 종속성과 관련된 여러 종속성이 있으므로 제거됩니다. 스톰 버전은 1.1.0 스프링 부팅 관련 종속성입니다.
```Java
<!-Spring Boot-> <pectinement> <groupId> org.springframework.boot </groupId> <artifactid> Spring-Boot-Starter </artifactid> <exclusions> <groupid> org.springframework.boot </groupid> </boot-starter-logging </artifactid> </의존성> <pectionency> <groupId> org.springframework.boot </groupid> <artifactid> Spring-Boot-Starter-web </artifactid> </dependency> <groupId> org.springframework.boot </groupId> <arepincid> spring-boot-starter-a </artifactid> <groupid> org.springframework.boot </groupid> <artifactid> 스프링-부트 스타터-테스트 </artifactid> <cope> test </scope> </dependency> <prectionency> <groupId> org.springframework.boot </groupid> respont-starter-log4j2 </artifactid> <groupid> org.mybatis.spring.boot </groupid> <artifactid> mybatis-spring-boot-starter </artifactid> <버전> $ {mybatis-spring.version} </version> </fexendence> <pection> <groupid> org.springframewort.boot> <artifactid> 스프링 보트-구성 프로세서 </artifactid> <선택 사항> true </옵션> </fectionency>PS : Maven의 JAR 패키지는 프로젝트 사용 요구 사항으로 인해 가장 간소화되지 않았습니다. 그것은 당신의 참조만을위한 것입니다.
프로젝트 구조 :
다른 환경에서 구성 파일 구성 파일
스토리지 빌드 스프링 부팅 관련 구현 클래스를 빌드 이름과 같은
스프링 부팅을 시작할 때 우리는 찾을 수 있습니다
실제로 통합을 시작하기 전에 처음에는 접촉하지 않은 Storm에 대해 거의 알지 못했습니다. 나중에 Spring Boot에 통합 된 후 Spring Boot를 시작한 후 Topolgy를 커밋하는 기능을 트리거하는 해당 방법이 없었기 때문에 Spring Boot를 시작한 후 완료되었다고 생각했습니다. 결과적으로, 나는 30 분 동안 기다렸고 커밋을 유발하는 기능이 구현되지 않았다는 것을 알기 전에 아무 일도 일어나지 않았습니다.
이 문제를 해결하려면 내 아이디어는 : Spring Boot-> Kafka 청취 주제를 만들고 Topolgy를 시작하여 스타트 업을 완료합니다. 그러나 그러한 문제의 경우, 주제를 듣는 Kafka는 반복적으로 Topolgy를 유발할 것입니다. 이것은 분명히 우리가 원하는 것이 아닙니다. 잠시 동안 지켜본 후, 나는 Spring이 관련 스타트 업이 있고 완료된 후 특정 시간 방법을 실행한다는 것을 알았습니다. 이것은 나에게 구세주입니다. 그래서 토폴리를 유발한다는 아이디어는 다음과 같습니다.
스프링 부팅 시작 -> 트리거 메소드 실행 -> 해당 트리거 조건을 완료하십시오.
시공 방법은 다음과 같습니다.
/** * @Author Leezer * @Date 2017/12/28 * Spring이로드 된 후 자동으로 토폴로지를 제출합니다. 개인 정적 문자열 주제; 개인 정적 문자열 호스트; 개인 정적 문자열 포트; public autoload (@Value ( "$ {storm.brokerzkst}") String brokerzkstr, @Value ( "$ {Zookeeper.host}") String host, @Value ( "$ {Zookeeper.port}") String Port, @Value ( "$ {Kafka.default-topic}") {brokerztr) 호스트 = 호스트; 주제 = 주제; 포트 = 포트; } @override public void onapplicationEvent (contextrefreshedevent 이벤트) {try {// 토폴로지 빌더 클래스를 인스턴스화합니다. 토폴로지 빌더 토폴로지 빌더 = 새로운 토폴로지 빌더 (); // 분화 노드를 설정하고 동시성 번호를 할당합니다. 동시 번호는 클러스터의 객체의 스레드 수를 제어합니다. Brokerhosts Brokehosts = New Zkhosts (Brokerzkstr); // Zookeeper SpoutConfig = New SpoutConfig (BrokerHosts, Topic, "/Storm", "S32")의 데이터 노드 디렉토리 및 이름뿐만 아니라 Kafka 구독 주제를 구성합니다. SpoutConfig.scheme = New SchemeasmultisCheme (new StringsCheme ()); SpoutConfig.zkservers = collections.singletonList (호스트); SpoutConfig.zkport = integer.parseint (포트); // spoutConfig.StartOffSetTime = hOftSetRequest.LatestTime (); Kafkaspout 수신기 = 새로운 Kafkaspout (SpoutConfig); TopologyBuilder.setSpout ( "Kafka-spout", 수신기, 1) .SetNumtasks (2); TopologyBuilder.setbolt ( "Alarm-Bolt", New Alarmbolt (), 1) .SetNumtasks (2) .ShuffleGrouping ( "Kafka-Spout"); config config = 새 구성 (); config.setdebug (false); /* 토폴로지가 폭풍 클러스터에서 압류하려는 리소스 슬롯 수를 설정합니다. 슬롯은 감독자 노드의 작업자 프로세스에 해당합니다. 할당 된 지점의 수가 물리적 노드의 작업자 수를 초과하는 경우 제출이 실패 할 수 있습니다. 클러스터에 가입하면 이미 일부 토폴로지가 있으며 2 개의 작업자 자원이 남아 있습니다. 4 개의 토폴로지를 코드에 할당하면이 토폴로지를 제출할 수 있지만 커밋 후 실행되지 않음을 알 수 있습니다. 토폴로지를 죽이고 일부 슬롯을 해제하면 토폴로지가 정상적인 작동을 재개합니다. */ config.setNumworkers (1); LocalCluster Cluster = New LocalCluster (); cluster.submittopology ( "Kafka-spout", config, topologybuilder.createTopology ()); } catch (예외 e) {e.printstacktrace (); }}}메모:
프로젝트를 시작할 때 시작 오류는 시작에 포함 된 Tomcat을 사용하고 있기 때문에보고 될 수 있습니다.
[Tomcat-StartStop-1] 오류 OACCCCCCCCCCCCOTAINERBASE- STARTJAVA.UTIL.CANCURRENT.ExecutionException : org.apache.catalina.lifecycleException에서 하위 컨테이너가 실패했습니다. 구성 요소를 시작하지 못했습니다. [Tomcat] .StandardHost [LocalHost]. java.util.concurrent.futuretask.report (futuretask.java:122) ~ [? : 1.8.0_144] at java.util.concurrent.futuretask.get (futuretask.java:192) ~ [? : 1.8.0_144] at org.apache.catalina.core.containerbase.startinternal (containerbase.java:939) [tomcat-embed-core-8.5.23.jar : 8.5.23] at org.apache.catalina.core.starthost.startinternal (Standardhost.java:872). [Tomcat-embed-Core-8.5.23.jar : 8.5.23] at org.apache.catalina.util.lifecyclebase.start (lifecyclebase.java:150) [Tomcat-Embed-core-8.5.23] at org.apache.catalina.core.containerbase $ startchild.call (containerbase.java:1419) [tomcat-embed-core-8.5.23.jar : 8.5.23] at org.apache.core.core.core.containerbase $ startchild.call (containerbase.java:1409). [tomcat-embed-core-8.5.23.jar : 8.5.23] java.util.concurrent.futuretask.run $$ 캡처 (futureetask.java:266) [? : 1.8.0_144] at java.util.concurrent.futuretask.run (1.8.4.0) java.util.concurrent.threadpoolexecutor.runworker (java.util.concurrent.runworker (Threadpoolecutor.java:1149) [? java.util.concurrent.threadpoolexecutor $ worker.run (java.lang.thread.run (thread.java:748) [? : 1.8.0_144]
해당 수입 JAR 패키지는 내장 버전보다 서블릿 -API 버전을 낮추기 때문입니다. 우리가해야 할 일은 Maven 의존성을 열고 제거하는 것입니다.
<제외> <artifactid> servlet-api </artifactid> <groupid> javax.servlet </groupid> </제외>
그런 다음 다시 시작하십시오.
시작 중에보고 할 수 있습니다.
코드 사본은 다음과 같습니다.
org.apache.storm.utils.nimbusleadernotfoundexception : 종자 호스트에서 리더 nimbus를 찾을 수 없었습니다 [LocalHost]. config nimbus.seeds? at org.apache.storm.utils.nimbusclient.getConfiguredClientas (nimbusclient.java:90에 대한 nimbus 호스트 목록을 지정 했습니까?
나는이 문제에 대해 오랫동안 생각했고 온라인에서 설명이 모두 폭풍 구성 문제로 인해 발생했지만 폭풍이 서버에 배치되었습니다. 관련 구성이 없습니다. 이론적으로 서버에서 관련 구성을 읽어야하지만 결과는 그렇지 않습니다. 마침내, 나는 몇 가지 방법을 시도했고 그것이 잘못되었다는 것을 알았습니다. 여기서 클러스터를 구축 할 때 폭풍이 해당 로컬 클러스터를 제공 한 것으로 나타났습니다.
LocalCluster Cluster = New LocalCluster ();
로컬 테스트를 수행하십시오. 로컬로 테스트하는 경우 배포 테스트에 사용하십시오. 서버에 배포 된 경우 다음을 수행해야합니다.
cluster.submittopology ( "kafka-spout", config, topologybuilder.createtopology ()); // 고정 : stormsubmitter.submittopology ( "kafka-spout", config, topologybuilder.createpology ());
작업 제출을 수행하십시오.
위의 문제는 위의 문제 1-3을 해결합니다
질문 4 : Bolt에서 관련 Bean 인스턴스를 사용하고 있습니다. @component를 사용하여 봄에 넣으면 인스턴스를 얻을 수 없다는 것을 알았습니다. 내 추측은 Commit Topolgy를 구축 할 때 다음과 같습니다.
코드 사본은 다음과 같습니다.
TopologyBuilder.setbolt ( "Alarm-Bolt", New Alarmbolt (), 1) .SetNumtasks (2) .ShuffleGrouping ( "Kafka-Spout");
실행 볼트 관련 :
@override public void repay (MAP StormConf, TopologyContext Context, outputCollector Collector) {this.collector = collector; Stormlauncher Stormlauncher = Stormlauncher.getStormlauncher (); DataRepositorys = (AlarmDatarePositorys) Stormlauncher.getBean ( "AlarmDatarePositorys"); }볼트를 인스턴스화하지 않으면 스레드가 다르고 스프링을 얻을 수 없습니다. (여기서 많이 이해하지 못합니다. 큰 사람이 알고 있다면 공유 할 수 있습니다)
스프링 부팅을 사용하는 의미는 이러한 복잡한 객체가 얻어진다는 것입니다. 이 문제는 오랫동안 나를 괴롭 혔습니다. 마지막으로, 우리는 컨텍스트 GetBean을 통해 인스턴스를 얻을 수 있다고 생각하고 그것이 작동 할 수 있는지 알지 못해서 정의하기 시작했습니다.
예를 들어 볼트에서 서비스를 사용해야합니다.
/*** @Author Leezer* @date 2017/12/27* 저장 작업 실패 시간 **/ @service ( "AlarmDatarePositorys") 공개 클래스 AlarmDatarePositorys 확장 redisbase empless IalarmdatarePositorys {private static final String erro = "erro"; / *** @param 유형* @param 키 키 값* @return 오류 수 **/ @override public string geterRnumfromedis (문자열 유형, 문자열 키) {if (type == null || key == null) {return null; } else {valueOperations <String, String> valueOper = PrimartStringRedistemplate.opSforValue (); return valueOper.get (string.format ( "%s :%s :%s", erro, type, key)); }} / *** @param 유형 오류 유형* @param 키 키 값* @param 값 저장 값 ** / @override public void seterRnumtoredis (문자열 유형, 문자열 키, 문자열 값) {valueOperations <string, string> valueOper = primartStringRedistEmplate.OpSforValue (); ValueOper.set (String.format ( "%s :%s :%s", erro, type, key), value, dictionaries.apikeyydayoflifecycle, timeUnit.seconds); } catch (예외 e) {logger.info (dictionaries.redis_error_prefix+string.format ( "key redis를 %s", 키)); }}여기에 Bean의 이름을 지정하고 Bolt가 준비 할 때 : GetBean 메소드를 사용하여 관련 Bean을 얻고 해당 작업을 완료하십시오.
그런 다음 Kafka 구독 주제가 관련 처리를 위해 내 볼트로 전송됩니다. 여기에서 GetBean의 방법은 Bootmain 기능 정의를 시작하는 것입니다.
@springbootApplication@enableTransactionManagement@componentscan ({ "service", "storm"})@enableMongorepositories (basepackages = { "storm"})@propertySource (value = { "classPath :"classpath.properties ","classpath : "forplorperties", properties ")@inffortrescations"@infroperties ") "classpath : /configs/spring-hadoop.xml", "classpath : /configs/spring-hbase.xml"}) public class Stormlauncher는 springbootservletinitializer {// Secure Throok 인스턴스 개인 휘발성 Stormlauncher Stormlauncher를 설정합니다. // 컨텍스트 개인 ApplicationContext 컨텍스트를 설정합니다. public static void main (String [] args) {SpringApplicationBuilder Application = 새로운 SpringApplicationBuilder (Stormlauncher.class); // application.web (false) .run (args); 이 방법은 Spring Boot가 Application.Run (args)을 시작하지 않는다는 것입니다. Stormlauncher S = New StormLauncher (); S.SetApplicationContext (Application.Context ()); setstormlauncher (들); } private static void setstormlauncher (Stormlauncher Stormlauncher) {Stormlauncher.stormlauncher = Stormlauncher; } public static stormlauncher getStormlauncher () {return StormLauncher; } @override Protected SpringApplicationBuilder Configure (SpringApplicationBuilder Application) {return application.Sources (StormLauncher.class); } / ** * 컨텍스트를 가져옵니다 * * @응용 프로그램 컨텍스트 * / public applicationtext getApplicationContext () {return 컨텍스트; } /*** 컨텍스트를 설정합니다. * * @param appContext 컨텍스트 */ private void setApplicationContext (ApplicationContext appContext) {this.context = appContext; } /*** 사용자 지정 이름을 통해 인스턴스 Bean을 가져옵니다. * * @param 이름 * @ @return the bean */ public object getBean (문자열 이름) {return context.getBean (이름); } /*** 클래스를 통해 Bean을 얻습니다. * * @param <t> 유형 매개 변수 * @param Clazz * @ @e -return the bean */ public <t> t getbean (class <t> clazz) {return context.getBean (Clazz); } / ** * 지정된 콩을 이름으로 반환하고 Clazz * * @param <t> 유형 매개 변수 * @param 이름 * @param clazz * @param clazz * @return the bean * / public <t> t getbean (문자열 이름, 클래스 <t> Clazz) {return context.getbean (이름, Clazz); }폭풍과 카프카의 스프링 부트 통합이 끝났습니다. 관련 Kafka 및 기타 구성을 Github에 넣을 것입니다.
그건 그렇고, 여기에는 kafkaclient pit도 있습니다.
비동기 루프가 죽었다! java.lang.nosuchmethoderror : org.apache.kafka.common.network.networksend
Kafka는 Storm-Kafka에서 Kafka는 0.8 버전을 사용하고 NetworkSend는 버전 0.9 이상이기 때문에 Kafka 클라이언트 문제를보고합니다. 여기서 통합은 귀하가 통합하는 Kafka 관련 버전과 일치해야합니다.
통합은 비교적 간단하지만 참조는 거의 없습니다. 또한 방금 Storm과 접촉하기 시작 했으므로 많이 생각합니다. 나는 또한 여기에 기록 할 것입니다.
프로젝트 주소 -github
위는이 기사의 모든 내용입니다. 모든 사람의 학습에 도움이되기를 바랍니다. 모든 사람이 wulin.com을 더 지원하기를 바랍니다.