Prefácio
Devido às necessidades de negócios, Strom e Kafka precisam ser integrados ao projeto de inicialização da primavera e outros logs de saída de serviços ao tópico de assinatura Kafka. Storm lida com o tópico em tempo real para concluir o monitoramento de dados e outras estatísticas de dados. No entanto, existem poucos tutoriais on -line. O que eu quero escrever hoje é como integrar a Storm+Kafka à Spring Boot e, a propósito, falarei sobre as armadilhas que encontrei.
Ferramentas de uso e configuração do ambiente
1. Versão Java JDK-1.8
2. Ferramenta de compilação usa a Idea-2017
3. Maven como gerenciamento de projetos
4.Spring boot-1.5.8.Release
Manifestação de demanda
1. Por que você precisa se integrar à inicialização da primavera
Para usar a inicialização da primavera para gerenciar vários microsserviços uniformemente e evitar várias configurações descentralizadas ao mesmo tempo
2. Idéias e razões específicas para integração
Use a bota da primavera para gerenciar feijões exigidos por Kafka, Storm, Redis etc., colete -os para Kafka através de outros registros de serviço e envie toras para Storm em tempo real e execute operações de processamento correspondentes quando o Strom Bolt
Problemas encontrados
1. Não há tempestade de integração relevante ao usar a bota da primavera
2. Não sei como acionar o comando topolgy na bota da primavera
3. Encontrei um problema com Numbis e não localhost cliente ao enviar a topologia
4. O feijão instanciado não pode ser obtido através de anotações em Storm Bolt para executar operações correspondentes
Solução
Antes da integração, precisamos conhecer o método e configuração de inicialização de inicialização da mola correspondente (se você estiver lendo este artigo, por padrão, você já aprendeu e usou Storm, Kafka e Spring Boot)
Existem poucos exemplos de integração de tempestades no Spring Boot na Internet, mas, devido às necessidades correspondentes, ainda precisamos integrar.
Primeiro importe o pacote JAR exigido:
<Depencency> <PuerpId> org.apache.kafka </groupiD> <TRAFACTID> kafka-clients </artifactId> <Versão> 0.10.1.1 </versão </dependency> <pendency> <propidId> org.springframework.cloud </agrupid> <TROTIFACTID> Spring-StarsTars STRILHO <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>spring-boot-actuator</artifactId> <groupId>org.springframework.boot</groupId> </exclusion> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.5.0.RELEASE</version> <exclusions> <exclusion> <PuProuDID> org.slf4j </frugiD> <TRAFACTID> SLF4J-LOG4J12 </ARTIFACTID> </clountion> <cclusion> <stifactId> Commons-Logging </ArtifactId> <urfactId> Commons-Logging </groupid> </clighion> <clution> <stiftid> Nettty> comons-logging </groupid> </clighion> <clution> <stiftid> lettyt> comons-logging </groupid> </cclusion> <cclusion> <stiftid> netfty> comons-logging </groupid> </ctrusion> <cclusion> <stif> <PuerpId> io.netty </frupiD> </clusion> <clustrogus> <TRARFACTID> Jackson-core-asl </ArtifactId> <puperid> org.codehaus.jackson </grupactid> </exclusion> <cligsion> <stifactId> curator-client </strifactId> </exclusion> orgid> outiftid> <//gruptid> </exclusion> orgilate> outifactador </clientid> </exclusion> ou <clutrigum> <cluction> <stifactid> <//ArtifactId> </exclusion> orgs> outiftator <artifactId>jettison</artifactId> <groupId>org.codehaus.jettison</groupId> </exclusion> <exclusion> <artifactId>jackson-mapper-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-jaxrs</artifactId> <PuerpId> org.codehaus.jackson </frupiid> </clusion> <cclusion> <stifactId> snappy-java </artifactId> <purbumid> org.xerial.snappy </groupactid> </exclusão> <clusion> <tifactId> jackson-xcc </purtactId> </exclusion> <clusion> <tifactId> jackson-xcc </Artactid> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.4</version> <exclusions> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <PuProuDID> io.netty </frupiD> </clusion> <clusion> <TRATIFACTID> Hadoop-Common </starifactId> <puperid> org.apache.hadoop </proupid> </exclusion> <cclusion> <tAtifactId> guava </ristifactId> <pclusion> com.google.guava.guava.guava.guavag <artifactId>hadoop-annotations</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-yarn-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <GroupID> org.slf4j </frugiD> </cligiion> </clusions> </pendesency> <pendence> <puperiD> org.apache.hadoop </foupiD> <stifactId> hadoop-common </stifactId> <versão 2.7.3 </versão> <clusions> <componftid> </artifactid> <sipers> 2.7.3 </versão> <clusions> <ccllusion> <GroupId> Commons-Logging </stifactId> <puerpiD> Commons-Logging </GroupId> </clusion> <cclusion> <stifactId> curador-client </artifactId> <purfactid> org.apache.curator </groupid> </cligsion> <cclusion> <stifactId> jack-mack -s-s-s-s <GrupId> org.codehaus.jackson </frugiD> </clusion> <cclusion> <stifactId> Jackson-core-asl </stutifactId> <purbumid> org.codehaus.jackson </groupId> </exclusion> <cclusion> <TristifactId> Log4j </ArtifactId>) <TarfactId> snappy-java </starifactId> <voundiD> org.xerial.snappy </foupid> </clusion> <cclusion> <stifactId> Zookeeper </starfactid> <pursion> org.apache.zokeeper </grupo> </exclusion> <clustion> artifactid> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-auth</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>commons-lang</artifactId> <groupId>commons-lang</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusion> </dependency> <dependency> <PuProuDID> org.apache.hadoop </frupiD> <TRAFACTID> HADOOP-MAPREDUCE-EXAMPOS </STIFACTID> <Versão> 2.7.3 </siers> <clusions> <cclusion> <cclusion> <sclusion> <sclusion> </GrupoId> Commons-Logging </ArtifactIl> <pcerlId> comons-loggging </Grupoid> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <TarifactId> servlet-api </artifactId> <puperiD> javax.servlet </groupid> </clusion> </clusions> </dependency> <!-Storm-> <pendency> </Groupid> org.apache.storm </groupid> <ArtifactId> Storm-core </Artifactid> <scope>${provided.scope}</scope> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </clusions> </dependency> <pendency> <rouprouid> org.apache.storm </groupiD> <TRAFACTID> Storm-kafka </artifactId> <Versão> 1.1.1 </versactid> <clusions> <cligsion> <cliFActid> kafka-clients </</sistactId> </clusions> </pendence>O pacote JAR é removido porque existem várias dependências relacionadas às dependências de construção do projeto. A versão tempestade é 1.1.0 dependências relacionadas à inicialização da mola são
`` `Java
<!-Spring Boot-> <Depencency> <PuerpId> org.springframework.boot </groupiD> <TRATIFACTID> Spring-boot-starter </artifactId> <clusions> <cclusion> <pclirtId> org.springframework.boot </groupId> <TifactId> <pclirtion> org.springframework.boot </grupo> </dependency> <pendency> <puperid> org.springframework.boot </frugiD> <TRATIFACTID> Spring-boot-starter-web </artifactId> </dependency> <pendência> <purbumid> org.springframework.boot </purpuld> <TifactId> Spring-Boottern <PupidId> org.springframework.boot </roupiD> <TRATIFACTID> Spring-boot-Starter-test </ArtifactId> <SCOPE> Test </scope> </dependency> <PependEncy> <PrentId> org.springframework.boot </grupo> <ArtifactId> spring-BOTTRAR <GroupID> org.mybatis.spring.boot </foupiid> <stifactId> mybatis-spring-boot-starter </artifactId> <versão> $ {mybatis-spring.version} </sipers> </dependency> <puadeid> org.springframework.bot </siers> </dependency> <puadeid> org.springframework.Bl. <TarifactId> Spring-Boot-Configuration-Processor </stutifactId> <cptional> true </cptional> </dependency>PS: O pacote JAR do MAVEN não é o mais simplificado devido aos requisitos de uso do projeto. É apenas para sua referência.
Estrutura do projeto:
Arquivos de configuração de armazenamento de configuração em diferentes ambientes
ARMAZEMBUIR Build Spring Boot Related Implementation Classes, como o nome da construção
Ao iniciar a bota da primavera, encontraremos
De fato, antes de iniciar a integração, eu sabia pouco sobre tempestades, que não estava em contato no início. Mais tarde, descobri que, depois de integrar a Spring Boot, não tinha a maneira correspondente de desencadear a função de cometer o Topolgy após o início da Spring Boot, então pensei que, depois de iniciar a inicialização da mola, terminei. Como resultado, esperei meia hora e nada aconteceu antes de descobrir que a função de desencadear a confirmação não foi implementada.
Para resolver esse problema, minha ideia é: Iniciar a inicialização da primavera-> Crie o tópico de escuta Kafka e inicie a Topolgy para concluir a startup. No entanto, para esse problema, Kafka, ouvindo o tópico, desencadeará repetidamente o Topolgy, o que obviamente não é o que queremos. Depois de assistir por um tempo, descobri que a primavera tem uma startup relacionada e executa um certo método de tempo após a conclusão. Isso é um salvador para mim. Portanto, a ideia de desencadear a topolgia se tornou:
Iniciar a inicialização da mola -> Executar o método do gatilho -> completar a condição de gatilho correspondente
O método de construção é:
/** * @Author Leezer * @date 2017/12/28 * Envie automaticamente a topologia após a primavera ser carregada **/ @configuration @componentPublic class Autoload implementa ApplicationListener <contextRefreshedEvent> {private static brokerzkstr; Tópico de string estática privada; host estático privado; porta de sequência estática privada; public AUTOLOAD (@Value ("$ {Storm.BrokerzkStr}") String BrokerzkStr, @Value ("$ {Zookeeper.host}") String host, @Value ("$ {Zookeeper.port}") String Port, @Value ("$ {{KAFKA.DeFAFAFOUSTOMENTO}") String Port, @Value ("$ {{kafka.default.brapic.kem) Host = host; Tópico = tópico; Porta = porta; } @Override public void onApplicationEvent (evento contextrefreshedEvent) {try {// instanciado a classe TopologyBuilder. TopologyBuilder TopologyBuilder = new TopologyBuilder (); // Defina o nó de erupção e aloce o número de simultaneidade. O número de simultaneidade controlará o número de encadeamentos do objeto no cluster. Brokerhosts BrokeHosts = New Zkhosts (Brokerzkstr); // Configure o tópico para a assinatura Kafka, bem como o diretório e o nome dos nó de dados em Zookeeper SPOUTCONFIG = new SpoutConfig (BrokerHosts, tópico, "/Storm", "S32"); spoutConfig.scheme = new Schemeasmultischeme (new stringscheme ()); spoutConfig.zkServers = collection.singletonList (host); spoutConfig.zkport = Integer.parseint (porta); // leia spoutConfig.startoffsetTime = offsetRequest.latesttime (); Receptor Kafkaspout = New Kafkaspout (SpoutConfig); topologybuilder.setspout ("kafka-spout", receptor, 1) .setNumTasks (2); topologybuilder.setbolt ("alarm-bolt", new alarmbolt (), 1) .setNumTasks (2) .ShuffilGrouping ("kafka-spout"); Config config = new config (); config.setDebug (false); /* Defina o número de slots de recursos que a topologia deseja aproveitar no cluster da tempestade. Um slot corresponde a um processo de trabalhador no nó supervisor. Se o número de pontos que você alocar exceder o número de trabalhadores que seu nó físico possui, a submissão poderá não ter sucesso. Juntando -se ao seu cluster, já há alguma topologia e restam 2 recursos de trabalhadores. Se você alocar 4 topologia ao seu código, essa topologia poderá ser enviada, mas depois de cometer, você descobrirá que ela não está em execução. E quando você mata uma topologia e solta alguns slots, sua topologia retomará a operação normal. */ config.setnumworkers (1); LocalCluster cluster = new LocalCluster (); cluster.submittopology ("kafka-spout", config, topologybuilder.createtopology ()); } catch (Exceção e) {e.printStackTrace (); }}}Observação:
Ao iniciar o projeto, o seguinte erro pode ser relatado porque está usando o TomCat incorporado para inicialização.
[Tomcat-startStop-1] Erro oaccContainerBase-Um recipiente infantil falhou durante o startjava.util.concurrent.executionException: org.apache.catalina.lifecycleException: falhou em iniciar o componente [StandardEngine [tomcat] .StandardHost [LocalHost]. java.util.concurrent.futureTask.report (futureTask.java:122) ~ [?: 1.8.0_144] em java.util.concurrent.futureTask.get (futureTask.java:192) ~ [?: 1.8.0_144] org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [TOMCAT-EMBED-CORE-8.5.23.JAR: 8.5.23] em org.apache.catalina.util.lifecyclebase.start (lifecyclebase.java:150) [tomcat-empabed core-8.5.23.Jar: 8.5.23] org.apache.catalina.core.containerbase $ startChild.call (containerbase.java:1419) [tomcat-emborled-core-8.5.23.jar: 8.5.23] em org.apache.catalina.core.containerBase $ startChild.call [TOMCAT-EMBED-CORE-8.5.23.JAR: 8.5.23] em java.util.concurrent.futureTask.run $$ captura (futureTask.java:266) [?: 1.8.0_144] at Java.util.CoCurrent.FutureTask.rut FutureTernUn] java.util.concurrent.threadpoolexecutor.runworker (threadpoolexecutor.java:1149) [?: 1.8.0_144] em java.util.concurrent.threadpoolexecutor.runworker (threadpoolExecutor.java:149) [: 1.8) [: 1.8) java.util.concurrent.threadpoolexecutor $ trabalhador.run (threadpoolexecutor.java:624) [?: 1.8.0_144] em java.lang.thread.run (thread.java:748) [?: 1.8.0_144]
Isso ocorre porque o pacote de jar importado correspondente apresenta a versão servlet-api mais baixa que a versão incorporada. Tudo o que precisamos fazer é abrir a dependência do Maven e removê -la
<cclusion> <TRAFACTID> servlet-api </artifactId> <voundid> javax.servlet </foupid> </clusion>
Em seguida, reinicie.
É possível relatar durante a inicialização:
A cópia do código é a seguinte:
org.apache.storm.utils.nimbusleadernotfoundException: não conseguiu encontrar líder nimbus de hospedeiros de sementes [localhost]. Você especificou uma lista válida de hosts nimbus para o Config nimbus.seeds? Em org.apache.storm.utils.nimbusclient.getconfiguredclientas (nimbusclient.java:90
Pensei nesse problema por um longo tempo e descobri que as explicações on -line foram todas causadas pelo problema da configuração da tempestade, mas minha tempestade é implantada no servidor. Não há configuração relevante. Em teoria, também devemos ler a configuração relevante no servidor, mas o resultado não é o caso. Finalmente, tentei vários métodos e descobri que estava errado. Aqui, descobri que, ao construir o cluster, a Storm forneceu o cluster local correspondente.
LocalCluster cluster = new LocalCluster ();
Realizar testes locais. Se você estiver testando localmente, use -o para testes de implantação. Se implantado no servidor, você precisa:
cluster.submithopology ("kafka-spout", config, topologybuilder.createtopology ()); // corrigido para: stormsubmitter.submithopology ("kafka-spout", configuração, topologybuilder.createtopology ());Conduzir o envio de tarefas;
O acima resolve os problemas acima 1-3
Pergunta 4: Estou usando a instância de feijão relevante em Bolt, descobri que não posso obter a instância se colocá -la na primavera usando @Component: Meu palpite é que, quando construímos o Topolgy, ele estará em:
A cópia do código é a seguinte:
topologybuilder.setbolt ("alarm-bolt", new alarmbolt (), 1) .setNumTasks (2) .ShuffilGrouping ("kafka-spout");
Relacionado ao parafuso de execução:
@Override public void Prepare (mapa StormConf, TopologyContext Context, O outputCollector Collector) {this.Collector = Collector; Stormlauncher Stormlauncher = Stormlauncher.GetStormlauncher (); DataSarepositorys = (alarmDataRepositorys) Stormlauncher.getBean ("alarmatePositórios"); }Sem instantar o parafuso, as roscas são diferentes e a mola não pode ser obtida. (Eu não entendo muito aqui, se um homem grande sabe, você pode compartilhá -lo)
O significado do uso da bota de mola é que esses objetos complicados são obtidos. Esse problema me incomodou há muito tempo. Por fim, pensei que podemos obter instâncias através do contexto GetBean e não sei se ele pode funcionar, então comecei a definir:
Por exemplo, preciso usar um serviço no parafuso:
/*** @Author Leezer* @Date 2017/12/27* Tempo de falha na operação de armazenamento **/ @Service ("alarmateStarePositorys") Public class AlarmatePositorys estende o redisbase implementa ialarmdatatarepositorys {private static final string erro = "erro"; / *** @param tipo tipo* @param Chave Valor* @return Número de erros **/ @Override public String getERRNUMFROMREDIS (String Type, String key) {if (type == null || key == null) {return null; } else {ValueOperations <string, string> valueOper = primaryStringRedRedEMplate.OpsForValue (); Retorno ValueOper.get (string.format ("%s:%s:%s", erro, tipo, chave)); }} /** ValueOper.Set (String.Format ("%S:%S:%S", Erro, Type, Key), Valor, Dictionaries.ApikeyDayOflifeCycle, TimeUnit.Seconds); } catch (Exceção e) {Logger.info (dictionaries.redis_error_prefix+string.format ("A chave falhou ao armazenar redis em %s", chave); }}Aqui, especifiquei o nome do feijão e, quando Bolt executar preparar: use o método getBean para obter o feijão relevante e concluir a operação correspondente.
Em seguida, o tópico de assinatura de Kafka é enviado ao meu Bolt para processamento relacionado. O método de getbean aqui é iniciar a definição da função bootmain:
@SpringBootApplication@EnableTransactionManagement@ComponentScan({"service","storm"})@EnableMongoRepositories(basePackages = {"storm"})@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})@ImportResource(locations = { "ClassPath: /configs/spring-hadoop.xml", "Classpath: /configs/spring-hbase.xml"}) classe pública Stormlauncher estende a troca de troca SpringBootSletinitializer {// Defina o lançamento seguro do lança privado volátil Stormlauncher Stormlauncher; // Defina o contexto de contexto de aplicativo privado; public static void main (string [] args) {springApplicationBuilder Application = new SpringApplicationBuilder (Stormlauncher.class); // Application.web (false) .run (args); Este método é que a inicialização da primavera não inicia o aplicativo.run (args); Stormlauncher s = new Stormlauncher (); s.setApplicationContext (Application.Context ()); setStormLauncher (s); } private estático void setStormlauncher (Stormlauncher Stormlauncher) {Stormlauncher.stormlauncher = Stormlauncher; } public static stormlauncher getStormlauncher () {return stormlauncher; } @Override Protected SpringApplicationBuilder Configure (SpringApplicationBuilder Application) {return Application.sources (Stormlauncher.class); } / ** * Obtenha o contexto * * @RETURN O APLICATIVO CONTEXT * / public ApplicationContext getApplicationContext () {retorna contexto; } /*** Defina o contexto. * * @param AppContext Contexto */ private void setApplicationContext (ApplicationContext AppContext) {this.Context = AppContext; } /*** Obtenha o bean da instância através de um nome personalizado. * * @param nome o nome * @return the bean */ public objeto getbean (nome da string) {return context.getbean (nome); } /*** Obtenha o feijão através da aula. * * @param <T> O parâmetro de tipo * @param clazz the clazz * @return the bean */ public <t> t getBean (classe <t> clazz) {return context.getbean (clazz); } / ** * Retorne o feijão especificado pelo nome, e clazz * * @param <T> o parâmetro de tipo * @param nome o nome * @param clazz the clazz * @return the bean * / public <t> t getbean (nome da string, classe <t> clazz) {retornar contexto.getBean (nome, clazz); }A integração de Storm e Kafka à Spring Boot terminou. Vou colocar Kafka relacionado e outras configurações no Github
A propósito, há também um poço Kafkaclient aqui:
Loop assíncrono morreu! java.lang.nosuchmethoderror: org.apache.kafka.common.network.networksend.
O projeto reportará um problema do cliente Kafka, porque no Storm-Kafka, Kafka usa a versão 0.8, enquanto o NetworkSend é a versão 0.9 ou acima. A integração aqui precisa ser consistente com a versão relacionada ao Kafka que você integra.
Embora a integração seja relativamente simples, existem poucas referências. Além disso, comecei a entrar em contato com a tempestade, então penso muito. Eu também vou gravar aqui.
Endereço do projeto - Github
O exposto acima é todo o conteúdo deste artigo. Espero que seja útil para o aprendizado de todos e espero que todos apoiem mais o wulin.com.