Prefacio
Debido a las necesidades comerciales, Strom y Kafka deben integrarse en el proyecto Spring Boot, y otros registros de salida de servicios al tema de suscripción de Kafka. Tormenta maneja el tema en tiempo real para completar el monitoreo de datos y otras estadísticas de datos. Sin embargo, hay pocos tutoriales en línea. Lo que quiero escribir hoy es cómo integrar Storm+Kafka a Spring Boot, y por cierto, hablaré sobre las dificultades que encontré.
Herramientas de uso y configuración del entorno
1. Versión Java JDK-1.8
2. La herramienta de compilación utiliza Idea-2017
3. Maven como gestión de proyectos
4.Spring Boot-1.5.8.
Manifestación de la demanda
1. ¿Por qué necesitas integrarte en Spring Boot?
Para usar el arranque de primavera para administrar varios microservicios de manera uniforme y evitar múltiples configuraciones descentralizadas al mismo tiempo
2. Ideas y razones específicas para la integración
Use el arranque de primavera para administrar los frijoles requeridos por Kafka, Storm, Redis, etc., recójalos a Kafka a través de otros registros de servicio y envíe registros a tormenta en tiempo real y realice las operaciones de procesamiento correspondientes cuando Strom se ataúde
Problemas encontrados
1. No hay una tormenta de integración relevante cuando se usa el arranque de primavera
2. No sé cómo activar el comandante topolgy en la bota de primavera
3. Encontré un problema con Numbis, no con el cliente local, al enviar topología
4. El bean instanciado no se puede obtener a través de anotaciones en el perno de tormenta para realizar las operaciones correspondientes
Solución
Antes de la integración, necesitamos conocer el método y la configuración de inicio de arranque de primavera correspondiente (si está leyendo este artículo, de forma predeterminada, ya ha aprendido y utilizado Storm, Kafka y Spring Boot)
Hay pocos ejemplos de tormenta integradora en el arranque de primavera en Internet, pero debido a las necesidades correspondientes, aún necesitamos integrarnos.
Primero importe el paquete jar requerido:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <exclusions> <exclusion> <artifactid> Zookeeper </arfactid> <uproupid> org.apache.zookeeper </groupid> </extusion> <exclusion> <Atifactid> spring-boot -actuator </artifactid> <grupo> org.springframe.boot </proupid> </axusion> <extusion> <MoupRid> org.apache.kafka </groupid> </extusion> </extusion> </extusion> </dependence> <pendency> <proupid> org.springframework.kafka </groupid> <arfactid> spring-kafka </artifactid> <excusion> <arfactid> kafka-clients </artifactid> <MoupRid> org.apache.kafka </groupid> </extusion> </excusions> </pendency> <pendency> <proupid> org.springframework.data </proupid> <arifactid> spring-data-data-hadoop </artifactid> <version> 2.5.0.Release </verversion> <excusion> <XCULLUSHIR> og.slsslsslsslsslssslssslsslsslsslsslsslsslssls. <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> <exclusion> <artifactid> jackson-core-asl </arfactid> <grupid> org.codehaus.jackson </groupid> </extusion> <extusion> <artifactid> curador-client </artifactid> <proupid> org.apache.curator </groupid> </sextusion> <exclusion> <artifactid> jettison </artfifid> <MoupRid> org.codehaus.jettison </proupid> </extusion> <Sclusion> <AtifactId> Jackson-Mapper-ASL </artifactid> <MoupRupid> org.codehaus.jackson </groupid> </excusion> <Sciatifactid> Jackson-Jaxrs </artifactid> <MoupRid> org.codehaus.jackson </groupid> </extusion> <Scusion> <AtifactId> Snappy-Java </arfactid> <MoupRupid> org.xerial.snappy </groupid> </excusion> <excusion> <artifactid> jackson-xc </ artifactid> <grupiD> org.codehahous.Jaus.JAd> </extusion> <Sclusion> <AtifactId> Guava </artifactid> <MoupRiD> com.google.guava </groupid> </extusion> <Scusion> <AtifactId> hadoop-mapreduce-client-core </arfactid> <grupeID> org.apache.hadoop </uproupid> </exusion> <exchusion> <AtifactId> Zookeeper </arfactid> <grupoD> org.apache.zookeeper </proupid> </extusion> <Sclusion> <AtifactId> Servlet-API </artifactId> <MoupRupid> javax.servlet </groupid> </excusion> </extusion> </pendency> <pendency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusion> </dependency> <dependency> <MoupRid> org.apache.hbase </groupid> <artifactid> hbase-client </artifactid> <versión> 1.2.4 </versión> <exclusions> <exclusion> <artifactid> log4j </arfactid> <proupid> log4j </groupid> </excusion> <jourifactid> <MoupRid> org.apache.zookeeper </groupid> </extusion> <exclusion> <artifactid> netty </artifactid> <grupoD> io.netty </groupid> </excusion> <excusion> <artifactid> hadoop-comunicid </artifactid> <uproupid> org.apache.hadaop <//groupid> </excusion> hadoop-comunic. <artifactid> guava </arfactid> <proupid> com.google.guava </groupid> </extusion> </extusion> <extusion> <artifactid> hadoop-annotations </artifactid> <uproupid> org.apache.hadoop </proupid> </excusion> <sextusion> <artifactid> hadoop-yarn-common </artemon </artmon. <MoupRid> org.apache.hadoop </groupid> </extusion> <exclusion> <artifactid> slf4j-log4j12 </artifactid> <grupoD> org.slf4j </proupId> </excusion> </excusions> </pendence> <epardency> <uproupid> org.apache.hadoop </groupid> <Atifactid> Hadoop-Common </arfactid> <verserse> 2.7.3 </versewers> <exclusions> <exclusion> <artifactid> commons-logging </arfactid> <grupoD> commons-logging </artifactid> <grupoD> commons-logging </groupid> </sextusion> <sartifactid> curator-client </tififactid> <MoupRid> org.apache.curator </groupid> </extusion> <exclusion> <artifactid> jackson-mapper-asl </artifactid> <grupoD> org.codehaus.jackson </proupid> </extusion> <extifactid> jackson-core-asl </artifactid> <uproupid> org.codehodeshaus. </extusion> <Sclusion> <SartifactId> log4j </sartifactid> <MoupRid> log4j </groupId> </extusion> <extusion> <AtifactId> Snappy-java </artifactid> <MoupRoMID> org.xerial.snappy </proupid> </excusion> <ScOXIFUSION> <ArtififactId> <MoupRid> org.apache.zookeeper </groupid> </extusion> <exclusion> <artifactid> guava </arfactid> <proupid> com.google.guava </groupid> </extusion> <extusion> <ariFactid> hadoop-authem. <artifactid> commons-lang </artifactid> <grupoD> commons-lang </proupid> </extusion> <clusion> <artifactid> slf4j-log4j12 </artifactid> <grupoD> org.slf4j </groupid> </extusion> <Sclusion> <Atifactid> servlet-api </artifactid> <MoupRid> javax.servlet </groupid> </extusion> </extusion> </dependence> <pendency> <uproupid> org.apache.hadoop </proupid> <artifactid> hadoop-mapreduce-exambles </artifactid> <versers> 2.7.3 </sersion> <ScusionSions> <exclusion> <extifiDid> <MoupRid> Commons-logging </groupid> </extusion> <exclusion> <artifactid> netty </artifactid> <grupiD> io.netty </proupid> </extusion> <exclusion> <artifactid> guava </artifactid> <grupid> com.google.guava </groupid> </extusion> <excusion>> <artifactid> log4j </arfactid> <proupid> log4j </groupid> </extusion> <exclusion> <artifactid> servlet-api </artifactid> <grupoD> javax.servlet </proupid> </exclusion> </excusiones> </pendency> <!--> <pendency> <proupid> org.apOrtice. <excusions> <artifactid> Storm-core </arfactid> <versever> $ {storm.version} </versewer> <cope> $ {provisional <artifactid> servlet-api </arfactid> <grupoD> javax.servlet </groupid> </extusion> </exclusions> </pendency> <pendency> <proupid> org.apache.storm </groupid> <arifactid> tormenta-kafka </artifactid> <verly> 1.1.1 </versión> <excultus> <excusion> <artifactid> kafka-clients </artifactid> <grupoD> org.apache.kafka </proupid> </extusion> </explusions> </pepertency>El paquete JAR se elimina porque hay múltiples dependencias relacionadas con las dependencias de construcción del proyecto. La versión de tormenta es 1.1.0 Las dependencias relacionadas con el arranque de primavera son
`` `Java
<!-Spring Boot-> <Spendency> <MoupRid> org.springframework.boot </groupid> <artifactID> spring-boot-starter </artifactid> <exclusions> <exclusion> <grupid> org.springframework.boot </groupid> <artifactid> spring-boot-starter-logging </artifactid> </exclusion> </exclusion> </exclusions exclusion> </exclusions> </exclusion> </exclusion> </exclusions/exclusions. </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <MoupRid> org.mybatis.spring.boot </groupid> <artifactid> mybatis-spring-boot-starter </artifactid> <versión> $ {mybatis-spring.version} </versión> </dependence> <epartency> <proupid> org.springframework.boot </groupid> <artifactID> spring-boot-configuation-procesador </artifactid> <pectional> true </pectional> </dependency>PD: El paquete JAR de Maven no es el más simplificado debido a los requisitos de uso del proyecto. Es solo para su referencia.
Estructura del proyecto:
Archivos de configuración de almacenamiento en diferentes entornos
Almacenamiento de clases de implementación relacionadas con el arranque del resorte como el nombre de compilación
Al comenzar la bota de primavera encontraremos
De hecho, antes de comenzar la integración, sabía poco sobre Storm, que no estaba en contacto al principio. Más tarde, descubrí que después de integrarme en el arranque de primavera, no tenía la forma correspondiente de activar la función de cometer topolgy después de comenzar el arranque de primavera, así que pensé que después de comenzar el arranque de primavera, terminé. Como resultado, esperé durante media hora y no pasó nada antes de descubrir que no se implementó la función de desencadenar la confirmación.
Para resolver este problema, mi idea es: Iniciar Boot Spring-> Crear tema de escucha de Kafka y comenzar de topolía para completar el inicio. Sin embargo, para tal problema, Kafka escuchando el tema se activará repetidamente, que obviamente no es lo que queremos. Después de tener un tiempo, descubrí que Spring tiene una startup relacionada y ejecutar un método de tiempo determinado después de que se complete. Este es un salvador para mí. Entonces, la idea de activar el topolgo se ha convertido:
Inicio Spring Boot -> Ejecutar el método de activación -> Complete la condición de activación correspondiente
El método de construcción es:
/** * @author leeezer * @date 2017/12/28 * Enviar automáticamente la topología después de que se cargue el resorte **/ @configuration @componentPublic Class Autoload implementa ApplicationListener <ContextreFreshedEvent> {String estático privado Brokerzkstr; tema de cadena estática privada; host de cadena estática privada; puerto de cadena estática privada; public Autoload (@Value ("$ {Storm.brokerzkstr}") String BrokerzkStr, @Value ("$ {ZOOKEEPER.HOST}") String Host, @Value ("$ {ZOOKEEPER.PORT}") String Port, @Value ("$ {Kafka. Brokerzkstr; Host = host; Tema = tema; Puerto = puerto; } @Override public void onApplicationEvent (Evento de ContextreFreshedEvent) {try {// Instanciar la clase TopologyBuilder. TopologyBuilder topologyBuilder = new TopologyBuilder (); // Establezca el nodo de erupción y asigne el número de concurrencia. El número de concurrencia controlará el número de hilos del objeto en el clúster. Brokerhosts Brokehosts = nuevos Zkhosts (Brokerzkstr); // Configure el tema para la suscripción de Kafka, así como el directorio y el nombre del nodo de datos en Zookeeper SpoutConfig = new SpoutConfig (Brokerhosts, Topic, "/Storm", "S32"); spoutconfig.scheme = new SchemEasMultIsCheme (new StringsCheme ()); spoutconfig.zkservers = colección.singletonList (host); spoutConfig.zkport = Integer.ParseInt (puerto); // leer spoutconfig.startoffsetTime = offsetRequest.latesttime (); Receptor de kafkospout = new Kafkaspout (SpoutConfig); TopologyBuilder.setsPout ("Kafka-Spout", receptor, 1) .SetNumTasks (2); topologyBuilder.SetBolt ("Alarm-Bolt", newarmbolt (), 1) .SetNumTasks (2) .shuffleGrouping ("Kafka-Spout"); Config config = new config (); config.setDebug (falso); /* Establezca el número de ranuras de recursos que la topología quiere aprovechar en el clúster de tormenta. Una ranura corresponde a un proceso de trabajo en el nodo del supervisor. Si el número de puntos que asigna excede la cantidad de trabajadores que tiene su nodo físico, la presentación no tiene éxito. Uniéndose a su clúster, ya hay algo de topología, y quedan 2 recursos de trabajadores. Si asigna 4 topología a su código, entonces esta topología se puede enviar, pero después de comprometerse, encontrará que no se está ejecutando. Y cuando matas algo de topología y liberas algunas ranuras, tu topología reanudará el funcionamiento normal. */ config.setNumWorkers (1); LocalCluster cluster = new LocalCluster (); cluster.subMitTopology ("Kafka-Spout", config, topologyBuilder.CreateTopology ()); } catch (Exception e) {E.PrintStackTrace (); }}}Nota:
Al comenzar el proyecto, se puede informar el siguiente error porque está utilizando TomCat incrustado para el inicio.
[Tomcat-startstop-1] Error OACCContainerBase: un contenedor infantil falló durante startjava.util.concurrent.executionException: org.apache.catalina.lifecycleException: no se pudo iniciar el componente [Standardengine [Tomcat]. Standardhost [localhost] .TomCateMbedContextExtExt [] java.util.concurrent.futuretask.report (futuretask.java:122) ~ [?: 1.8.0_144] en java.util.concurrent.futureTask.get (futuretask.java:192) ~ [?: 1.8.0_144] AT org.apache.catalina.core.containerbase.startinternal (Containerbase.java:939) [Tomcat-Embed-Core-8.5.23.Jar: 8.5.23] en org.apache.catalina.core.standardhost.startinternal (Standardhost.Java:872) [Tomcat-Embed-Core-8.5.23.Jar: 8.5.23] en org.apache.catalina.util.lifecyclebase.start (lifecyclebase.java:150) [Tomcat-Embed-Core-8.5.23.Jar: 8.5.23] AT org.apache.catalina.core.containerbase $ startchild.call (Containerbase.java:1419) [Tomcat-Embed-Core-8.5.23.jar: 8.5.23] en org.apache.catalina.core.containerbase $ startchild.call (contenedorbase.java:1409) [Tomcat-Embed-Core-8.5.23.Jar: 8.5.23] en java.util.concurrent.futuretask.run $$ captura (futuretask.java:266) [?: 1.8.0_144] en java.util.concurrent.futuretask.run (futuretas java.util.concurrent.threadpoolexecutor.runworker (threadpoolexeCutor.java:1149) [?: 1.8.0_144] en java.util.concurrent.threadpoolexecutor.runworker (threadpoolexecutor.Java:1149) [?: 1.8.0_144] AT java.util.concurrent.threadpoolexecutor $ trabajador.run (threadpoolexecutor.java:624) [?: 1.8.0_144] en java.lang.thread.run (Thread.java:748) [?: 1.8.0_144]
Esto se debe a que el paquete JAR importado correspondiente presenta la versión Servlet-API más baja que la versión incrustada. Todo lo que necesitamos hacer es abrir la dependencia de Maven y eliminarla
<Sextusion> <ArFactId> Servlet-API </artifactId> <MoupRupid> javax.servlet </groupId> </extusion>
Luego reinicie.
Es posible informar durante el inicio:
La copia del código es la siguiente:
org.apache.storm.utils.nimbusleadernotfoundexception: No pudo encontrar al líder Nimbus de los anfitriones de semillas [localhost]. ¿Especificó una lista válida de hosts Nimbus para config nimbus.seeds? En org.apache.storm.utils.nimbusclient.getConfiguredClientas (nimbusclient.java:90
Pensé en este problema durante mucho tiempo y descubrí que las explicaciones en línea fueron causadas por el problema de configuración de la tormenta, pero mi tormenta se implementa en el servidor. No hay una configuración relevante. En teoría, también debemos leer la configuración relevante en el servidor, pero el resultado no es el caso. Finalmente, probé varios métodos y descubrí que estaba mal. Aquí descubrí que al construir el clúster, Storm proporcionó el clúster local correspondiente.
LocalCluster cluster = new LocalCluster ();
Realizar pruebas locales. Si está probando localmente, úselo para pruebas de implementación. Si se implementa en el servidor, debe:
cluster.subMitTopology ("Kafka-spout", config, topologyBuilder.createTopology ()); // fijado a: tormenta de tormenta.submittopology ("kafka-spout", config, topologybuilder.createTopology ());Realizar la presentación de la tarea;
Lo anterior resuelve los problemas anteriores 1-3
Pregunta 4: Estoy usando la instancia de Bean relevante en Bolt, descubrí que no puedo obtener la instancia si la pongo en primavera usando @Component: Supongo que cuando construimos el confirmación de topolía, estará en:
La copia del código es la siguiente:
topologyBuilder.SetBolt ("Alarm-Bolt", newarmbolt (), 1) .SetNumTasks (2) .shuffleGrouping ("Kafka-Spout");
Relacionado con el perno de ejecución:
@Override public void Prepare (map stormconf, topologyContext context, outputCollector Collector) {this.collector = colector; Stormlauncher Stormlauncher = Stormlauncher.getstormlauncher (); datArepositorys = (alarmDatarPositorys) Stormlauncher.getBean ("AlarmDatarPositorys"); }Sin instancias del perno, las roscas son diferentes y no se pueden obtener resorte. (No lo entiendo mucho aquí, si un tipo grande sabe, puedes compartirlo)
El significado de usar el arranque de primavera es que se obtienen estos objetos complicados. Este problema me ha preocupado durante mucho tiempo. Finalmente, pensé que podemos obtener instancias a través del contexto GetBean y no sé si puede funcionar, así que comencé a definir:
Por ejemplo, necesito usar un servicio en Bolt:
/*** @author leeezer* @Date 2017/12/27* Tiempo de falla de operación de almacenamiento **/ @Service ("AlarmDatarPositorys") clase pública AlarmDatarPositorys extiende RedisBase Implements ialarmDatarepositorys {cadena final estática privada erro = "Erro"; / *** @param Tipo de tipo* @param Tecla del valor* @@return número de errores **/ @Override public String getErrnumFromredis (tipo de cadena, tecla de cadena) {if (type == null || key == null) {return null; } else {ValueOperations <String, String> valueOper = primarioRingRedistEmplate.OpSforValue (); return valueoper.get (string.format ("%s:%s:%s", erro, type, key)); }} / *** @param Tipo de error Tipo* @Param Tecla del valor* @param Valor almacenado Valor ** / @Override public void seterrnumToredis (tipo de cadena, tecla de cadena, valor de cadena) {try {valueOperations <string, string> valueoper = primarioRingRedistEmplate.opsForValue (); valueoper.set (string.format ("%s:%s:%s", erro, type, clave), valor, diccionarios. APikeydayoflifecycle, TimeUnit.seconds); } catch (Exception e) {logger.info (diccionarios.redis_error_prefix+string.format ("La clave no pudo almacenar redis en %s", clave)); }}Aquí especifico el nombre del bean y cuando Bolt se ejecuta Prepare: use el método GetBean para obtener el bean relevante y complete la operación correspondiente.
Luego, Kafka suscribe el tema se envía a mi perno para el procesamiento relacionado. El método de GetBean aquí es iniciar la definición de la función BootMain:
@Springbootapplication@enableTransactionManagement@ComponentScan ({"Servicio", "Storm"})@EnableMongoRepositories (basepackages = {"tormenta"})@PropertySource (valor = {"classpath: servicio.properties", "classpath: Application.Properties", "classpath: storm.properties"}}) = {"classpath: /configs/spring-hadoop.xml", "classpath: /configs/spring-hbase.xml"}) Clase de tormenta de tormenta de clase pública extiende SpringBootServletinitializer {// Establezca la instancia segura del lanzador de hilos privado Volátil estaticista estaticer estaticer; // Establecer el contexto de contexto privado contexto de trefontext; public static void main (string [] args) {SpringApplicationBuilder Application = new SpringApplicationBuilder (Stormlauncher.class); // Application.Web (False) .run (args); Este método es que Spring Boot no inicia Application.run (Args); Stormlauncher S = New Stormlauncher (); S.SetApplicationContext (Application.Context ()); setstormlauncher (s); } Void static privado setstormlauncher (Stormlauncher Stormlauncher) {Stormlauncher.stormlauncher = Stormlauncher; } public static Stormlauncher getStormlauncher () {return Stormlauncher; } @Override SpringApplicationBuilder Configure (SpringApplicationBuilder Application) {return Application.sources (Stormlauncher.class); } / ** * Obtenga el contexto * * @return El contexto de la aplicación * / publicidad publicContext getApplicationContext () {return context; } /*** Establezca el contexto. * * @param AppContext Context */ private void setApplicationContext (ApplicationContext AppContext) {this.context = appContext; } /*** Obtenga el bean de instancia a través de un nombre personalizado. * * @param nombre el nombre * @return el bean */ público objeto getBean (nombre de cadena) {return context.getBean (nombre); } /*** Obtenga la clase Bean a través de la clase. * * @param <t> el tipo de parámetro * @param clazz the clazz * @return el bean */ public <t> t getBean (class <t> clazz) {return context.getBean (clazz); } / ** * Devuelve el bean especificado por nombre, y clazz * * @param <t> el tipo de parámetro * @param nombre el nombre * @param clazz the clazz * @return the bean * / public <t> t getBean (nombre de cadena, clase <t> clazz) {return context.getBean (nombre, clazz); }La integración de Storm y Kafka a Spring Boot ha terminado. Pondré Kafka y otras configuraciones relacionadas en GitHub
Por cierto, también hay un pozo de kafkaclient aquí:
Async Loop murió! java.lang.nosuchmethoderror: org.apache.kafka.common.network.networksend.
El proyecto informará un problema del cliente Kafka, porque en Storm-Kafka, Kafka usa la versión 0.8, mientras que NetworkSend es la versión 0.9 o superior. La integración aquí debe ser consistente con la versión relacionada con Kafka que integra.
Aunque la integración es relativamente simple, hay pocas referencias. Además, acabo de comenzar a entrar en contacto con Storm, así que creo que mucho. También lo grabaré aquí.
Dirección del proyecto - Github
Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.