Preface
Due to business needs, Strom and kafka need to be integrated into the spring boot project, and other services output logs to the kafka subscription topic. Storm handles the topic in real time to complete data monitoring and other data statistics. However, there are few online tutorials. What I want to write today is how to integrate storm+kafka to spring boot, and by the way, I will talk about the pitfalls I encountered.
Usage tools and environment configuration
1. Java version jdk-1.8
2. Compilation tool uses IDEA-2017
3. maven as project management
4.spring boot-1.5.8.RELEASE
Demand manifestation
1. Why do you need to integrate into spring boot
In order to use spring boot to manage various microservices uniformly and avoid multiple decentralized configurations at the same time
2. Specific ideas and reasons for integration
Use spring boot to manage beans required by kafka, storm, redis, etc., collect them to Kafka through other service logs, and send logs to storm in real time, and perform corresponding processing operations when strom bolt
Problems encountered
1. There is no relevant integration storm when using spring boot
2. I don’t know how to trigger the commit Topolgy in spring boot
3. I encountered a problem with numbis not client localhost when submitting Topology
4. The instantiated bean cannot be obtained through annotations in Storm bolt to perform corresponding operations
Solution
Before integration, we need to know the corresponding spring boot startup method and configuration (if you are reading this article, by default, you have already learned and used storm, kafka and spring boot)
There are few examples of integrating storm in spring boot on the Internet, but because of corresponding needs, we still need to integrate.
First import the required jar package:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <exclusions> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>spring-boot-actuator</artifactId> <groupId>org.springframework.boot</groupId> </exclusion> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.5.0.RELEASE</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> <exclusion> <artifactId>jackson-core-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>curator-client</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>jettison</artifactId> <groupId>org.codehaus.jettison</groupId> </exclusion> <exclusion> <artifactId>jackson-mapper-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-jaxrs</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion> <artifactId>jackson-xc</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.4</version> <exclusions> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> <exclusion> <artifactId>hadoop-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> </exclusion> <exclusion> <artifactId>hadoop-annotations</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>hadoop-yarn-common</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>curator-client</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>jackson-mapper-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-core-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-auth</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>commons-lang</artifactId> <groupId>commons-lang</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-examples</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <!--storm--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>${provided.scope}</scope> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.1</version> <exclusions> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency>The jar package is removed because there are multiple dependencies related to project construction dependencies. The storm version is 1.1.0 spring boot related dependencies are
```java
<!-- spring boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis-spring.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>ps:maven's jar package is not the most streamlined because of project usage requirements. It is for your reference only.
Project structure:
config-storing configuration files in different environments
Storage build spring boot related implementation classes such as build name
When starting spring boot we will find
In fact, before starting the integration, I knew little about storm, which was not in contact at the beginning. Later, I found that after integrating into spring boot, I did not have the corresponding way to trigger the function of committing Topolgy after starting spring boot, so I thought that after starting spring boot, I was done. As a result, I waited for half an hour and nothing happened before I found that the function of triggering the commit was not implemented.
To solve this problem, my idea is: Start spring boot->Create kafka listening Topic and start Topolgy to complete the startup. However, for such a problem, kafka listening to the topic will repeatedly trigger Topolgy, which is obviously not what we want. After watching for a while, I found that spring has a related startup and execute a certain time method after it is completed. This is a savior to me. So the idea of triggering Topolgy has become:
Start spring boot -> execute trigger method -> complete the corresponding trigger condition
The construction method is:
/** * @author Leezer * @date 2017/12/28 * Automatically submit Topology after spring is loaded **/@Configuration@Componentpublic class AutoLoad implements ApplicationListener<ContextRefreshedEvent> { private static String BROKERZKSTR; private static String TOPIC; private static String HOST; private static String PORT; public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr, @Value("${zookeeper.host}") String host, @Value("${zookeeper.port}") String port, @Value("${kafka.default-topic}") String topic ){ BROKERZKSTR = brokerZkstr; HOST= host; TOPIC= topic; PORT= port; } @Override public void onApplicationEvent(ContextRefreshedEvent event) { try { //Instantiate the topologyBuilder class. TopologyBuilder topologyBuilder = new TopologyBuilder(); //Set the eruption node and allocate the concurrency number. The concurrency number will control the number of threads of the object in the cluster. BrokerHosts brokeHosts = new ZkHosts(BROKERZKSTR); // Configure the Topic for Kafka subscription, as well as the data node directory and name in zookeeper SpoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32"); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConfig.zkServers = Collections.singletonList(HOST); spoutConfig.zkPort = Integer.parseInt(PORT); //Read spoutConfig.startOffsetTime = OffsetRequest.LatestTime(); KafkaSpout receiver = new KafkaSpout(spoutConfig); topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2); topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout"); Config config = new Config(); config.setDebug(false); /* Set the number of resources slots that the topology wants to seize in the storm cluster. A slot corresponds to a worker process on the supervisor node. If the number of spots you allocate exceeds the number of workers your physical node has, the submission may be unsuccessful. Joining your cluster, there are already some topology on it, and there are 2 worker resources left. If you allocate 4 topology to your code, then this topology can be submitted, but after committing, you will find that it is not running. And when you kill some topology and release some slots, your topology will resume normal operation. */ config.setNumWorkers(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology()); } catch (Exception e) { e.printStackTrace(); } }}Note:
When starting the project, the following error may be reported because it is using embedded tomcat for startup.
[Tomcat-startStop-1] ERROR oaccContainerBase - A child container failed during startjava.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]] at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144] at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23] at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23] at java.util.concurrent.FutureTask.run$$capture(FutureTask.java:266) [?:1.8.0_144] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
This is because the corresponding imported jar package introduces the servlet-api version lower than the embedded version. All we need to do is to open the maven dependency and remove it
<exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId></exclusion>
Then restart.
It is possible to report during startup:
The code copy is as follows:
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90
I thought about this problem for a long time and found that the explanations online were all caused by the storm configuration problem, but my storm is deployed on the server. There is no relevant configuration. In theory, we should also read the relevant configuration on the server, but the result is not the case. Finally, I tried several methods and found that it was wrong. Here I found that when building the cluster, storm provided the corresponding local cluster.
LocalCluster cluster = new LocalCluster();
Perform local testing. If you are testing locally, use it for deployment tests. If deployed to the server, you need to:
cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());//Fixed to: StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());Conduct task submission;
The above solves the above problems 1-3
Question 4: I am using the relevant bean instance in bolt, I found that I can't get the instance if I put it in spring using @Component: My guess is that when we build the commit Topolgy, it will be in:
The code copy is as follows:
topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");
Execution bolt related:
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; StormLauncher stormLauncher = StormLauncher.getStormLauncher(); dataRepositorys =(AlarmDataRepositorys) stormLauncher.getBean("alarmdataRepositorys"); }Without instantiating the bolt, the threads are different and spring cannot be obtained. (I don’t understand it very much here, if a big guy knows, you can share it)
The meaning of using spring boot is that these complicated objects are obtained. This problem has troubled me for a long time. Finally, I thought that we can get instances through the context getbean and don’t know if it can work, so I started to define:
For example I need to use a service in bolt:
/** * @author Leezer * @date 2017/12/27 * Storage operation failure time**/@Service("alarmdataRepositorys")public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys { private static final String ERRO = "erro"; /** * @param type type* @param key key value* @return Number of errors**/ @Override public String getErrNumFromRedis(String type,String key) { if(type==null || key == null){ return null; }else { ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue(); return valueOper.get(String.format("%s:%s:%s",ERRO,type,key)); } } /** * @param type Error type* @param key key value* @param value Stored value**/ @Override public void setErrNumToRedis(String type, String key,String value) { try { ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue(); valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS); } catch (Exception e){ logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key failed to store redis in %s", key)); } }Here I specify the name of the bean, and when bolt executes prepare: use the getbean method to obtain the relevant bean and complete the corresponding operation.
Then kafka subscribe topic is sent to my bolt for related processing. The method of getbean here is to start the bootmain function definition:
@SpringBootApplication@EnableTransactionManagement@ComponentScan({"service","storm"})@EnableMongoRepositories(basePackages = {"storm"})@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})@ImportResource(locations = { "classpath:/configs/spring-hadoop.xml", "classpath:/configs/spring-hbase.xml"})public class StormLauncher extends SpringBootServletInitializer { //Set the secure thread launcher instance private volatile static StormLauncher stormLauncher; //Set the context private ApplicationContext context; public static void main(String[] args) { SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class); // application.web(false).run(args); This method is that spring boot does not start application.run(args); StormLauncher s = new StormLauncher(); s.setApplicationContext(application.context()); setStormLauncher(s); } private static void setStormLauncher(StormLauncher stormLauncher) { StormLauncher.stormLauncher = stormLauncher; } public static StormLauncher getStormLauncher() { return stormLauncher; } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(StormLauncher.class); } /** * Get the context* * @return the application context */ public ApplicationContext getApplicationContext() { return context; } /** * Set the context. * * @param appContext Context */ private void setApplicationContext(ApplicationContext appContext) { this.context = appContext; } /** * Get the instance bean through a custom name. * * @param name the name * @return the bean */ public Object getBean(String name) { return context.getBean(name); } /** * Get Bean through class. * * @param <T> the type parameter * @param clazz the clazz * @return the bean */ public <T> T getBean(Class<T> clazz) { return context.getBean(clazz); } /** * Return the specified Bean by name, and Clazz * * @param <T> the type parameter * @param name the name * @param clazz the clazz * @return the bean */ public <T> T getBean(String name, Class<T> clazz) { return context.getBean(name, clazz); }Integration of storm and kafka to spring boot has ended. I will put related kafka and other configurations into github
By the way, there is also a kafkaclient pit here:
Async loop died! java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.
The project will report a kafka client problem, because in storm-kafka, kafka uses version 0.8, while NetworkSend is version 0.9 or above. The integration here needs to be consistent with the kafka related version you integrate.
Although the integration is relatively simple, there are few references. In addition, I have just started to come into contact with storm, so I think a lot. I will also record it here.
Project address - github
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.