Предисловие
Из -за потребностей бизнеса Strom и Kafka должны быть интегрированы в проект Spring Boot, а другие выходные журналы услуг в теме подписки на Kafka. Storm обрабатывает тему в режиме реального времени для завершения мониторинга данных и другой статистики данных. Тем не менее, есть несколько онлайн -уроков. Сегодня я хочу написать, как интегрировать Storm+Kafka в Spring Boot, и, кстати, я расскажу о ловушках, с которыми я столкнулся.
Инструменты использования и конфигурация среды
1. Java версия JDK-1.8
2. Инструмент компиляции использует Idea-2017
3. Maven как управление проектами
4.spring boot-1.5.8.release
Требование проявление
1. Зачем вам интегрироваться в Spring Boot
Чтобы использовать Spring Boot для равномерного управления различными микросервисами и одновременно избегать нескольких децентрализованных конфигураций.
2. Конкретные идеи и причины интеграции
Используйте Spring Boot для управления бобами, необходимыми для Kafka, Storm, Redis и т. Д., Соберите их в Kafka через другие журналы обслуживания, и отправьте журналы для шторма в режиме реального времени и выполните соответствующие операции обработки, когда стром болт
Проблемы столкнулись
1. Там нет соответствующего интеграционного шторма при использовании Spring Boot
2. Я не знаю, как запустить Topolgy в Spring Boot
3. Я столкнулся с проблемой с Numbis, а не Client Localhost при отправке топологии
4. Компания Commandiated Bean не может быть получена с помощью аннотаций в штормовом болте для выполнения соответствующих операций
Решение
Перед интеграцией нам нужно знать соответствующий метод и конфигурацию запуска с пружинной загрузкой (если вы читаете эту статью, по умолчанию вы уже изучили и использовали Storm, Kafka и Spring Boot)
Есть несколько примеров интеграции шторма в Spring Boot в Интернете, но из -за соответствующих потребностей нам все еще нужно интегрироваться.
Сначала импортируйте необходимый пакет JAR:
<Depoydency> <groupId> org.apache.kafka </GroupId> <ArtifactId> kafka-clients </artifactid> <sersive> 0.10.1.1 </version> </restion> <dependency> <groupid> org.springframework.cloud </GroupId> <ratifactid> spring-cloud-starmer-kafka.com <scloysusion> <ratifactid> Zookeeper </artifactid> <groupid> org.apache.zookeeper </groupid> </exclusion> <sclosusion> <ratifactid> Spring-Boot-Actuator </artifactid> <groupid> org.springformwork.boot </GroupId> <cextusion> <croupt> <StrifactId> kafka-clients </artifactid> <groupid> org.apache.kafka </GroupId> </exclusion> </exclusion> </exclusion> </depervice> <dependency> <groupid> org.springframework.kafka </Groupid> <ratifactid> spring-kafka </artifactid> </Groupid> <ratifactid> spring-kafka </artifcemid> </GroupD> <strifactid> spring-kafka </artifcemid> </GroupD> <strifactid> spring-kafka </artifactid> <ratifactid> kafka-clients </artifactid> <groupid> org.apache.kafka </GroupId> </exclusion> </Исключения> </degulency> <dehyedency> <groupid> org.springframework.data </Ground> <ratifactid> Spring-data-hadooop </artifactid> 2..5.5.0.r.relese <prers >.5.5.5.0.r.relese </artifactid> Spring-data-hadooop </artifactid>. <исключения> <sclodusion> <groupid> org.slf4j </GroupId> <ArtifactId> slf4j-log4j12 </artifactid> </exclusion> <seclision> <ratifactid> commons-logging </artifactid> <groupid> net-logging </GroupD> </исключение> <исключение> <Strifact> <TratifAct> <Tratifact> <Tratifact> <groupId> io.netty </GroupId> </exclusion> <sclieSusion> <ArtifactId> Джексон-CORE-ASL </artifactId> <groupId> org.codehaus.jackson </GroupId> </exclusion> <skextusion> <ratifactid> curator-client </artifactid> <groupid> org.apache. <artifactid> jettison </artifactid> <groupid> org.codehaus.jettison </groupid> </exclusion> <conclimusion> <artifactid> jackson-mapper-asl </artifactid> <groupd> org.codehaus.jackson </Groupid> </excusion> <skextuse> <artifactid> hackson> jackson> <groupId> org.codehaus.jackson </GroupId> </exclusion> <sclosusion> <ratifactid> snappy-java </artifactid> <groupid> org.xerial.snappy </GroupId> </exclusion> <coldusion> <artifactid> jackson-xc </artifactid> <groupid> org.codehashs.Chodshon </artifactid> <groupd> org.codehash. <sclosusion> <ArtifactId> guava </artifactid> <groupid> com.google.guava </groupid> </exclusion> <sclodusion> <ratifactid> Hadoop-mapreduce-client-core </artifactid> <artifactid> org.apache.hadoop </GroupId> </excrusion> <skessuis <groupId> org.apache.zookeeper </groupid> </exclusion> <sclosusion> <ratifactid> Servlet-Api </artifactid> <groupid> javax.servlet </GroupId> </exclusion> </exclusion> </dependency> <Depeication> <groupId> org.apache.zookeeper </GroupD> <strifactid> zookeeper </artifactid> <sersive> 3.4.10 </version> <исключения> <scloysusion> <artifactid> slf4j-log4j12 </artifactid> <groupid> org.slf4j </GroupD> </exclusion> </exclusion> </depertive> <dery> <group> org. <ArtifactId> hbase-client </artifactid> <sersive> 1.2.4 </version> <conclinesions> <sclosusion> <artifactid> log4j </artifactid> <groupid> log4j </GroupId> </exclusion> <coldusis <scloysusion> <ArtifactId> netty </artifactid> <groupid> io.netty </GroupId> </exclusion> <sclosusion> <strifactid> hadoop-common </artifactid> <groupid> org.apache.hadoop </GroupId> </excusion> <skesusion> <ratifactid> guavah. <groupId> com.google.guava </GroupId> </exclusion> </exclusion> <sclosusion> <ArtifactId> hadoop-annotations </artifactid> <groupid> org.apache.hadoop </GroupId> </exclusion> или </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> <consulsions> <sclodusion> <ratifactid> commons-logging </artifactid> <groupid> commons-logging </artifactid> <groupid> commons-logging </groupid> </exclusion> <scover> <ratifactid> curator-client </artifactid> <groupid> org.apache.curator </GroupD> </arcactid> <groupid> org.apache. <artifactid> jackson-mapper-asl </artifactid> <groupid> org.codehaus.jackson </GroupId> </exclusion> <scliedusion> <ratifactid> jackson-core-asl </artifactid> <artifactid> org.codehaus.jackson </GroupId> </extrasus> <slosuse> <artifactid> <groupId> log4j </GroupId> </exclusion> <sclieSusion> <ArtifactId> snappy-java </artifactid> <groupid> org.xerial.snappy </GroupId> </excusion> <sclosusion> <artifactid> zookeeper </artifactid> <groupid> org.apache.zokeeper </artifactid> <groupid> org.apache.zoekeper </artifactid> <groupd> org.apache.zokeokeper </artifactid> <groupd> org.apache. <artifactid> guava </artifactid> <groupid> com.google.guava </GroupId> </exclusion> <scloysusion> <strifactid> hadoop-auth </artifactid> <groupid> org.apache.hadoop </GroupId> </excusion> <cessurusion> <artifactid> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-examples</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactid> netty </artifactid> <groupid> io.netty </GroupId> </exclusion> <sclieushy> <artifactid> guava </artifactid> <groupid> com.google.guava </GroupId> </excusion> <Exclusion> <artifactId> log4j </artifactid> <groupd> <group4j <cersusion> <artifactid> </artifactid> <groupd> <broup4 <TrifactId> Servlet-API </artifactId> <groupId> javax.servlet </GroupId> </exclusion> </Исключения> </depervice> <!-Storm-> <dehyed> <groupid> org.apache.storm </GroupD> <strifactid> Storm-core </artifactid> <service> {версия> {версия> {версия> {worry.version aversion {aversion aversion aversion aversion aversion aversion aversion aversion aversion aversion <cracpe> $ {seleple.scope} </scope> <sockusions> <sclosusion> <groupid> org.apache.logging.log4j </GroupId> <ArtifactId> log4j-slf4j-impl </artifactid> </exclusion> <sclusion> <artifactid> Aplet-Aplet </artifactid> <groupser> <groupser> </exclusion> </исключения> </dependency> <DeyEdency> <groupId> org.apache.storm </GroupID> <ArtifactId> Storm-Kafka </artifactid> <sersive> 1.1.1 </version> <cressusions> <coldusion> <artifactid> kafka-clients </artifactid> <groups> orgaache. </Исключение> </Исключения> </Зависимость>Пакет JAR удален, поскольку существует несколько зависимостей, связанных с зависимостью конструкции проекта. Версия Storm - 1,1.0 Spring Boot -зависимости
`` java
<!-- spring boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId> org.mybatis.spring.boot </GroupId> <ArtifactId> mybatis-spring-boot-starter </artifactid> <sersive> $ {mybatis-spring.version} </version> </redice> <dependency> <groupid> org.sprame.boot </group> <ArtifactId> Pring-Boot-Configuration-Processor </artifactid> <poptional> true </oppution> </depertion>PS: Пакет Maven's JAR не является наиболее оптимизированным из -за требований использования проекта. Это только для вашей ссылки.
Структура проекта:
Файлы конфигурации конфигурации в разных средах
Сборка для складской сборки Spring Boot, связанные с учебными классами, такими как название сборки
При запуске весеннего ботинка мы найдем
На самом деле, прежде чем начать интеграцию, я мало знал о Шторме, который не был вначале в начале. Позже я обнаружил, что после интеграции в Spring Boot у меня не было соответствующего способа запустить функцию совершения Topolgy после начала Spring Boot, поэтому я подумал, что после начала Spring Boot я закончил. В результате я ждал полчаса, и ничего не произошло, прежде чем я обнаружил, что функция запуска коммита не была реализована.
Чтобы решить эту проблему, моя идея: Start Spring Boot-> Создайте тему прослушивания Kafka и запустите Topolgy, чтобы завершить стартап. Однако для такой проблемы, кафка, слушающая тему, будет неоднократно запускать топольги, что, очевидно, не то, что мы хотим. Через некоторое время я обнаружил, что Spring имеет связанный стартап и выполняю метод определенного времени после его завершения. Это спаситель для меня. Таким образом, идея запуска топольги стала:
Start Spring Boot -> Выполнить метод триггера -> Заполните соответствующее условие триггера
Метод строительства:
/** * @Author Leezer * @Date 2017/12/28 * Автоматическое отправление топологии после загрузки весны **/ @configuration @componentPublic Class AutoLoad INSIRESS ApplicationListener <contextrefreshedevent> {private Static String BrokerzkStr; Частная статическая строка тема; частный статический хост строки; частный статический строковый порт; Public AutoLoad (@Value ("$ {Storm.BrokerzKStr}") String BrokerzKStr, @Value ("$ {Zookeeper.host}") String Host, @Value ("$ {Zookeeper.port}") String Port, @Value ("$ {kafka.def-topic}") stry-stryks) Brokerzkstr; Host = host; Тема = тема; Порт = порт; } @Override public void onApplicationEvent (contextrefreshedevent event) {try {// создание создания класса TopologyBuilder. TopologyBuilder TopologyBuilder = New TopologyBuilder (); // Установить узел извержения и выделить номер параллелизма. Номер параллелизма будет контролировать количество потоков объекта в кластере. Brokerhosts Brokehosts = new Zkhosts (Brokerzkstr); // Настроить тему для подписки KAFKA, а также каталог узлов данных и имя в Zookeeper SpoutConfig = new SpoutConfig (Brokerhosts, Topic, "/Storm", "S32"); spoutConfig.scheme = new Schemeasmultischeme (new Stringscheme ()); spoutConfig.zkservers = collections.singletonlist (host); spoutConfig.zkport = integer.parseint (port); // Читать spoutConfig.StartOffSettime = foffSetRequest.LatestTime (); Kafkaspout receiver = new kafkaspout (spoutconfig); topologybuilder.setspout ("kafka-Spout", приемник, 1) .setnumtasks (2); topologybuilder.setbolt ("тревога", новый сигнал тревоги (), 1) .setnumtasks (2) .shufflegrouping ("kafka-Spout"); Config config = new config (); config.setdebug (false); /* Установите количество слотов ресурсов, которые топология хочет захватить в штормовом кластере. Слот соответствует рабочему процессу на узле супервизора. Если количество мест, которые вы выделяете, превышают количество работников, которые есть у вашего физического узла, представление может быть неудачным. Присоединяясь к вашему кластеру, на нем уже есть некоторая топология, и осталось 2 работников. Если вы распределяете 4 топологию на свой код, то эту топологию можно отправить, но после совершения вы обнаружите, что она не работает. И когда вы убьете топологию и выпускаете несколько слотов, ваша топология возобновит нормальную работу. */ config.setnumworkers (1); LocalCluster Cluster = new LocalCluster (); cluster.submittopology ("kafka-Spout", config, topologybuilder.createTopology ()); } catch (Exception e) {e.printstackTrace (); }}}Примечание:
При запуске проекта можно сообщить о следующей ошибке, поскольку она использует встроенный Tomcat для запуска.
[Tomcat-startstop-1] ошибка oacccontainerbase-детский контейнер не удался во время startJava.Util.concurrent.executionException: org.apache.catalina.lifecycleexception: не удалось запустить компонент [StandardEngine [tomcat]. Standardhost [local]. java.util.concurrent.futuretask.report (futureTask.java:122) ~ [?: 1.8.0_144] на java.util.concurrent.futuretask.get (futureTask.java:192) ~ [?: 1.8.0_144] у org.apache.catalina.core.containerbase.startinternal (containerbase.java:939) [Tomcat-embed-core-8.5.23.jar: 8.5.23] на org.apache.catalina.core.standardhost.startinternal (Standardhost.java:872) [Tomcat-embed-core-8.5.23.jar: 8.5.23] at org.apache.catalina.util.lifecyclebase.start (LifeCyclebase.java:150) [Tomcat-Embed-Core-8.5.23.jar: 8.5.23] в org.apache.catalina.core.containerbase $ startchild.call (containerbase.java:1419) [Tomcat-embed-core-8.5.23.jar: 8.5.23] at org.apache.catalina.core.containerbase $ startchild.call (containbase.java:140909090909090909090909090909090909090909090909090909090909090909090909090090909. [Tomcat-embed-core-8.5.23.jar: 8.5.23] at java.util.concurrent.futuretask.run $$ Capture (FutureTask.java:266) [?: 1.8.0_144] на java.util.concurrent.futuretask.run (waturetask.java. java.util.concurrent.threadpoolexecutor.runworker (threadpoolexecutor.java:1149) [?: 1.8.0_144] на java.util.concurrent.threadpoolexecutor.runworker (ThreadPoolexeCutor.java:1149) [1.8.4.4.4.4.04.4.04.4.04.4. java.util.concurrent.threadpoolexecutor $ karder.run (threadpoolexecutor.java:624) [?: 1.8.0_144] на java.lang.thread.run (Thread.java:748) [?: 1.8.0_144]
Это связано с тем, что соответствующий импортный пакет JAR представляет версию Servlet-API ниже, чем встроенная версия. Все, что нам нужно сделать, это открыть зависимость Maven и удалить ее
<scloysusion> <ArtifactId> Servlet-API </artifactid> <groupid> javax.servlet </GroupId> </Исключение>
Затем перезапустить.
Можно сообщить во время стартапа:
Кода -копия выглядит следующим образом:
org.apache.storm.utils.nimbusleadernotfoundexception: не удалось найти лидера Нимбуса от семян -хозяев [localhost]. Вы указали действительный список хостов Nimbus для config nimbus.seeds? At org.apache.storm.utils.nimbusclient.getConfiguredClientas (nimbusclient.java:90
Я думал об этой проблеме в течение долгого времени и обнаружил, что онлайн -объяснения были вызваны проблемой конфигурации шторма, но мой шторм развернут на сервере. Там нет соответствующей конфигурации. Теоретически, мы также должны прочитать соответствующую конфигурацию на сервере, но результат не в порядке. Наконец, я попробовал несколько методов и обнаружил, что это было неправильно. Здесь я обнаружил, что при создании кластера Storm предоставил соответствующий локальный кластер.
LocalCluster Cluster = new LocalCluster ();
Выполнить локальное тестирование. Если вы тестируете локально, используйте его для тестов на развертывание. При развертывании на сервере вам нужно:
cluster.submittopology ("kafka-spout", config, topologybuilder.createtopology ()); // исправлено на: Stormsubmitter.submittopology ("kafka-Spout", config, topologybuilder.createTopology ());Провести подчинение задачи;
Вышеуказанное решает вышеупомянутые задачи 1-3
ВОПРОС 4: Я использую соответствующий экземпляр Bean в Bolt, я обнаружил, что не могу получить экземпляр, если я помесчу его весной, используя @component: я предполагаю, что, когда мы строим тополги, он будет в:
Кода -копия выглядит следующим образом:
topologybuilder.setbolt ("тревога", новый сигнал тревоги (), 1) .setnumtasks (2) .shufflegrouping ("kafka-Spout");
Связанный с болтом исполнения:
@Override public void Prepare (Map StormConf, TopologyContext Context, outputCollector Collector) {this.collector = collector; Stormlauncher Stormlauncher = Stormlauncher.getStormLauncher (); dataRepository = (armerdatarePositors) StormLauncher.getBean ("AlarmDatarePositorys"); }Без инстанции болта, нити отличаются, а пружина не может быть получена. (Я не очень понимаю это здесь, если большой парень знает, вы можете поделиться этим)
Значение использования пружинной загрузки заключается в том, что эти сложные объекты получают. Эта проблема долго меня беспокоила. Наконец, я подумал, что мы можем получить экземпляры через контекст GetBean и не знаем, может ли он работать, поэтому я начал определять:
Например, мне нужно использовать сервис в болте:
/*** @Author Leezer* @Date 2017/12/27* Операция хранения Время сбоя **/ @Service ("AlarmDatarePository") Общедоступный класс AlarmDatarePository Extends RedIsBase реализует ilarmDatarePository {Private State Final String erro = "erro"; / *** @param type type* @param ключа клавиши* @return Номер ошибок **/ @override public String geterrnumfromredis (type, type, string key) {if (type == null || key == null) {return null; } else {valueOperations <string, string> valueOper = promiseStringRedIstemplate.opforValue (); return valueOper.get (string.format ("%s:%s:%s", erro, type, key)); }} / *** @param type type type* @param ключа клавиши* @param value value ** / @override public void seterrnumtoredis (тип строки, ключ строки, строковое значение) {try {valueoperations <string> valueOper = promiseStringStementementAte.OpsForValue (); valueOper.set (string.format ("%s:%s:%s", erro, type, key), значение, словарные. } catch (Exception e) {logger.info (dictionares.redis_error_prefix+string.format ("Ключ не удалось сохранить Redis в %s", key)); }}Здесь я указываю имя боба, и когда Bolt выполняет подготовку: используйте метод GetBean, чтобы получить соответствующую боб и завершить соответствующую операцию.
Затем Кафка подписывается тема отправляется в мой болт для связанной обработки. Метод GetBean здесь - запустить определение функции Bootmain:
@SpringBootApplication@enableTransActionManagement@ComponentsCan ({"Service", "Storm"})@enablemongorePositories (basepackages = {"Storm"})@PropertySource (value = {"classPath: service.properties", "classPath: Application.Properties", "classPath: Storm.Properties"})@ImportResource (locations = {"classPath: /configs/spring-hadoop.xml", "classpath: /configs/spring-hbase.xml"}) Экземпляр частного летучих статических Stormlauncher Stormlauncher; // Установить контекст частного контекста ApplicationContext; public static void main (string [] args) {SpringApplicationBuilder Application = New SpringApplicationBuilder (StormLauncher.class); // application.web (false) .run (args); Этот метод заключается в том, что Spring Boot не запускает Application.Run (args); Stormlauncher s = new Stormlauncher (); S.SetApplicationContext (Application.Context ()); SetstormLauncher (ы); } private static void setstormlauncher (Stormlauncher Stormlauncher) {Stormlauncher.stormlauncher = Stormlauncher; } public static Stormlauncher getStormLauncher () {return Stormlauncher; } @Override Protected SpringApplicationBuilder Configure (SpringApplicationBuilder приложение) {return Application.sources (Stormlauncher.class); } / ** * Получить контекст * * @return The Application Context * / public ApplicationContext getApplicationContext () {return Context; } /*** Установить контекст. * * @param AppContext Context */ private void setApplicationContext (ApplicationContext appcontext) {this.context = appcontext; } /*** Получите фасоль экземпляра через пользовательское имя. * * @param name Имя * @return the bean */ public object getbean (string name) {return context.getbean (name); } /*** Получить боб через класс. * * @param <t> Параметр типа * @param clazz the clazz * @return the bean */ public <t> t getbean (class <t> clazz) {return context.getbean (clazz); } / ** * Возвращает указанный фасоль по имени и clazz * * @param <t> Параметр типа * @param Название * @param clazz the clazz * @return the bean * / public <t> t getbean (string name, class <t> clazz) {return context.getbean (name, clazz); }Интеграция Storm и Kafka в Spring Boot закончилась. Я помесчу связанные кафку и другие конфигурации в GitHub
Кстати, здесь также есть яма Kafkaclient:
Async Loop умерла! java.lang.nosuchmethoderror: org.apache.kafka.common.network.networksend.
Проект сообщит о проблеме клиента Kafka, потому что в Storm-Kafka Kafka использует версию 0.8, а Networksend-версия 0,9 или выше. Интеграция здесь должна соответствовать версии, связанной с Кафкой, которую вы интегрируете.
Хотя интеграция относительно проста, есть несколько ссылок. Кроме того, я только начал вступать в контакт со Штормом, поэтому я думаю много. Я также запишу его здесь.
Адрес проекта - GitHub
Выше всего содержание этой статьи. Я надеюсь, что это будет полезно для каждого обучения, и я надеюсь, что все будут поддерживать Wulin.com больше.