Préface
En raison des besoins de l'entreprise, Strom et Kafka doivent être intégrés dans le projet Spring Boot et d'autres journaux de sortie des services au sujet d'abonnement Kafka. Storm gère le sujet en temps réel pour terminer la surveillance des données et d'autres statistiques de données. Cependant, il y a peu de tutoriels en ligne. Ce que je veux écrire aujourd'hui, c'est comment intégrer Storm + Kafka à Spring Boot, et en passant, je parlerai des pièges que j'ai rencontrés.
Outils d'utilisation et configuration de l'environnement
1. Version Java JDK-1.8
2. L'outil de compilation utilise l'idée-2017
3. Maven comme gestion de projet
4.Spring boot-1.5.8.release
Manifestation de la demande
1. Pourquoi avez-vous besoin de vous intégrer dans Spring Boot
Afin d'utiliser Spring Boot pour gérer uniformément divers microservices et éviter plusieurs configurations décentralisées en même temps
2. Idées et raisons spécifiques de l'intégration
Utilisez Spring Boot pour gérer les haricots requis par Kafka, Storm, Redis, etc., collectez-les à Kafka via d'autres journaux de service et envoyez des journaux à la tempête en temps réel et effectuez des opérations de traitement correspondantes lorsque Strom Bolt
Problèmes rencontrés
1. Il n'y a pas de tempête d'intégration pertinente lors de l'utilisation de Spring Boot
2. Je ne sais pas comment déclencher le commit Topolgy dans Spring Boot
3. J'ai rencontré un problème avec Numbis Not Client LocalHost lors de la soumission de la topologie
4. Le haricot instancié ne peut pas être obtenu par annotations dans Storm Bolt pour effectuer des opérations correspondantes
Solution
Avant l'intégration, nous devons connaître la méthode et la configuration de démarrage de Spring Boot correspondantes (si vous lisez cet article, par défaut, vous avez déjà appris et utilisé Storm, Kafka et Spring Boot)
Il existe peu d'exemples d'intégration de Storm dans Spring Boot sur Internet, mais en raison des besoins correspondants, nous devons encore intégrer.
Importez d'abord le package JAR requis:
<dependency> <proupId> org.apache.kafka </prôdId> <Artifactid> Kafka-Clienties </ Artifactid> <DersonD> 0.10.1.1 </DERNIFRIENTAGE> <c exclusion> <Artifactid> Zookeeper </retifactid> <proupId> org.apache.zookeeper </proncId> </clusion> <cchusion> <Artifactid> printemps-boot-actuator </proustid> <proupId> org.springframework.boot </proupId> </clusion> <cusclusion> <ArtefactId> Kafka-Clients </ Artifactid> <ProupId> org.apache.Kafka </rolsid> </cchusion> </ exclusion> </cossion> </dependency> <Dendency> <proupId> org.springframework.kafka </proupId> <Artifactid> printemps-kafka </ artifactID> <ArtefactId> Kafka-Clienties </ Artifactid> <proupId> org.apache.kafka </proncId> </cchusion> </ exclusions> </pedimency> <dependency> <proupId> org.springframework.data </prouprid> <artifiactid> spring-data-hadoop </ artifactid> <cchusions> <cchusion> <proupId> org.slf4j </proupId> <ArtifActid> SLF4J-LOG4J12 </ ArtifactId> </clusion> <pordsid> COMMons-Logging </proupId> </clusion> <parfactive> <ArtefactId> NetTy </ Artifactid> <GroupId> io.netty </romp grouped> </ exclusion> <cchusion> <Artifactid> Jackson-Core-Asl </ Artifactid> <GroupId> org.codehaus.jackson </prouventid> </ exclusion> <cusclusion> <artifactive> CORARATEUR-CLIENT </ ARTIFUSID> <GroupId> org.apache.curator </rom grouped> </ exclusion> <cchusion> <Artifactid> Jettison </lefactive> <proupId> org.codehaus.jetsonison </proupId> </clusion> <grouprid> <artifactid> jackson-Mapper-ASL </letifactid> <groupenïde> org.codehaus.jack à Asl </ Artifactid> <GroupId> Org.Codehaus.jackjack </partifAtid> <GroupId> Org.Codehaus.jackjack </letifactid> <groupe </ exclusion> <cuscusion> <artifactive> jackson-jaxrs </retifactid> <proupId> org.codehaus.jackson </prappend> </cchusion> <parnussion> <artifactive> snappy-java </ artifactid> <proupId> org.xerial.snappy </prouvend> </cusclusion> <artifactId>jackson-xc</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <proupId> org.apache.hadoop </rom grouped> </ exclusion> <cussion> <Artifactid> Zookeeper </ ArtiFactId> <GroupId> org.apache.zookeeper </prôdId> </clusion> <garnsion> <artifactid> servlet-api </ artifactid> <proupId> javax.sservlet </cartid>> <proupId> javax.sservlet </cartid>> <proupId> Javax.Servlet </clustid> </ exclusion> </ dépendance> <dependency> <proupId> org.apache.zookeeper </proupId> <Artifactid> Zookeeper </ artifactId> <version> 3.4.10 </ version> <cglusions> <cusclusion> <Ertifactid> Slf4j-Log4j12 </carfactive> <proupId> Org.SLf4j </clusfactive> <proupId> Org.SLF4J </ exclusion> </ dépendance> <dependency> <proupId> org.apache.hbase </proncId> <Artifactid> hbase-client </ artifactive> <version> 1.2.4 </de version> <cuslusions> <cuslusion> <arfactive> log4j </runfactive> <proupId> log4j </proncID> </cuslusion> <ArtefactId> ZooKeeper </Retifactid> <ProupId> org.apache.zookeeper </prôdId> </cchusion> <exclusion> <Artifactid> netty </ artifactid> <proupId> io.netty </proupId> </clusfactive> <cexclusion> <Artifactid> Hadoop-COMMON </ttifactive> <GroupId> org.apache.hadoop </romboud> </cchusion> <cuslusion> <Artifactid> Guava </Retifactid> <proupId> com.google.guava </prouvenid> </clusion> </cglusion> <cglusion> <artifactid> Hadoop-Annotations </ artifactid> </ exclusion> <cchusion> <artifactive> hadoop-yarn-common </ artifactive> <proupId> org.apache.hadoop </proisprid> </cchusion> <cuslusion> <lefactive> slf4j-lig4j12 </ artifactid> <proupId> org.slf4j </prouventid> </cusclusion> <dependency> <proupId> org.apache.hadoop </proupId> <Artifactid> Hadoop-Common </Retifactid> <Dersion> 2.7.3 </DERNIERDE> <CARFLUSIONS> <GroupId> COMMONDING </RetifActid> <ProupId> Commons-Logging </clushing> <c exclusion> <ErtifactId> Curator-Client </ Artifactid> <ProupId> org.apache.curator </prôdId> </clusion> <exclusion> <Artifactid> Jackson-Mapper-Asl </ artifactid> <proupId> org.codehaus.jackson </proupid> </clusion> <cusclusion> <ArtefactId> jackson-core-asl </letefactive> <proupId> org.codehaus.jackson </prônid> </cchusion> <cussion> <artifactid> LOG4J </ artifactid> <proupdid> log4j </prapped> </ exclusion> <exclusion> <artifactive> snappy-java </ arrifactive> <GroupIdId> Org.Xerial.Snappy </ GroupId> </ Exclusion> <cusclusion> <ArtifActid> ZooKeeper </Retifactid> <GroupId> Org.apache.zookeeper </proupId> </clusion> <cuslusion> <Artifactid> Guava </ ArtifactId> <proupId> com.google.guava. <cchusion> <Ertifactid> Hadoop-Auth </ artifactive> <proupId> org.apache.hadoop </rompuprid> </cchusion> <cusclusion> <aRtifactid> Commons-Lang </ Artifactid> <ProupId> Commons-Lang </proupId> </clusion> </ exclusion> <Artifactid> SLF4J-Log4j12 </cglutid> <Artifactid> Slf4j-Log4j12 </ Artiford> <GroupId> org.slf4j </rom grouped> </ exclusion> <cusclusion> <Ertifactid> servlet-api </retifactid> <proupId> javax.servlet </proupid> </clusion> </clusion> </peedency> <dependency> <proupId> org.apache.hadoop </prouprid> <ArtefactId> Hadoop-MapReduce-Examples </ Artifactid> <Dersion> 2.7.3 </DERNIFROITION> <cchusions> <exclusion> <exclusion> <e ArtifactId> Commons-Logging </ Artifactid> <ProupId> Commons-Logging </rom grouped> </clusion> <exclusion> <ArtifActid> Netty </tifactID> <GroupId> io.netty </rom grouped> </ exclusion> <cchusion> <artifactId> Guava </ artifactid> <proupId> com.google.guava </prôdId> </cchusion> <cussion> <eRtifactid> LOG4J </ artifactid> <proupId> LOG4J </prônid> </cuslusion> <ArtefactId> Servlet-API </ artifactid> <proupId> javax.servlet </rom grouped> </ exclusion> </clusions> </ Dependency> <! - Storm -> <Dendency> <proupId> org.apache.storm </proupId> <Artifactid> Storm-core </ptetifActid> <version> $ {Storm.Version} <ccope> $ {fournie.scope} </ scope> <cchusions> <exclusion> <proupId> org.apache.logging.log4j </proupId> <ArtifActid> LOG4J-SLF4J-Implt </ Artifactid> </clusion> <exclusion> <ArtifActid> Servlet-API </ Artifactid> <GroupId> javax.servlet </rom grouped> </ exclusion> </cglusions> </pependency> <dependency> <proupId> org.apache.storm </rompuprid> <Artefactid> Storm-Kafka </carfactid> <version> 1.1.1 </ version> </ artifactid> <exclusion> <artifactive> kafka-clientives </cuscid>> <groupId> org.apache.kafka </rom grouped> </clusion> </clusion> </Dependency>Le package JAR est supprimé car il existe plusieurs dépendances liées aux dépendances de construction du projet. La version Storm est les dépendances liées au démarrage de Spring 1.1.0 sont
`` Java
<! - Spring Boot -> <Dedency> <GroupId> org.springframework.boot </proncId> <Artifactid> printemps-boot-starter </ artifactId> <cordusions> <exclusion> <proupId> org.springframework.boot </proupId> </ artifactid> Spring-Boot-starter-ging-ginggging </ exclusions> </ dépendances> <dependency> <proupId> org.springframework.boot </prôdId> <Artifactive> printemps-boot-starter-web </lefactid> </pedency> <dependency> <proupId> org.springframework.boot </proupId> <ArtifActid> <dependency> <proupId> org.springframework.boot </proupId> <ArtifactId> Spring-boot-starter-test </lefactive> <ccope> test </ccope> </pependency> <dependency> <proupId> org.springframework.boot </proupId> </pterific> Spring-Boot-starter-log4j2 </ artifactID> <dependency> <proupId> org.mybatis.spring.boot </rom grouped> <ArtifActid> Mybatis-Spring-boot-starter </ artifactid> <version> $ {mybatis-spring.version} </ version> </peedendency> <dependency> <proupId> org.springframework. <ArtefactId> Spring-Boot-Configuration-Processor </ Artifactid> <Apultal> VRUE </potel> </Dependency>PS: Le package de pot de Maven n'est pas le plus rationalisé en raison des exigences d'utilisation du projet. C'est pour votre référence uniquement.
Structure du projet:
Fichiers de configuration de configuration dans différents environnements
Stockage Build Spring Boot Classes d'implémentation liées telles que le nom de build
Lors du démarrage de Spring Boot, nous trouverons
En fait, avant de commencer l'intégration, je ne savais pas peu de choses sur Storm, qui n'était pas en contact au début. Plus tard, j'ai constaté qu'après l'intégration dans Spring Boot, je n'avais pas le moyen correspondant de déclencher la fonction de commettre Topolgy après avoir commencé Spring Boot, donc j'ai pensé qu'après avoir commencé Spring Boot, j'ai fini. En conséquence, j'ai attendu une demi-heure et rien ne s'est passé avant de constater que la fonction de déclenchement de l'engagement n'a pas été mise en œuvre.
Pour résoudre ce problème, mon idée est: Démarrez Spring Boot-> Créer un sujet d'écoute Kafka et démarrez Topolgy pour terminer le démarrage. Cependant, pour un tel problème, Kafka écoutant le sujet déclenchera à plusieurs reprises Topolgy, ce qui n'est évidemment pas ce que nous voulons. Après avoir regardé un certain temps, j'ai découvert que Spring a une startup connexe et exécuté une certaine méthode de temps une fois la fin. C'est un sauveur pour moi. L'idée de déclencher Topolgy est donc devenue:
Démarrer le démarrage de Spring -> Exécuter la méthode du déclencheur -> Complétez la condition de déclenchement correspondante
La méthode de construction est:
/ ** * @Author Leezer * @Date 2017/12/28 * Soumettez automatiquement la topologie après que le printemps soit chargé ** / @ configuration @ ComponentPublic Class Autoload implémente ApplicationListener <ContextreFreshEvent> {private static String Brokerzkstr; Sujet de chaîne statique privée; Hôte de chaîne statique privée; Port de chaîne statique privé; public Autoload (@Value ("$ {Storm.BrokerzkStr}") String Brokerzkstr, @Value ("$ {ZooKeeper.host}") String Host, @Value ("$ {ZooKear.Port}") String Port, @value ("$ {kafka.default-topic}" " Host = host; Sujet = sujet; Port = port; } @Override public void onApplicationEvent (événement ContextreFreshEdEvent) {try {// Instancier la classe topologybuilder. Topologybuilder topologybuilder = new topologybuilder (); // Définissez le nœud d'éruption et allouez le numéro de concurrence. Le numéro de concurrence contrôlera le nombre de threads de l'objet dans le cluster. BrokerHosts BrokeHosts = New Zkhosts (Brokerzkstr); // Configurez le sujet de l'abonnement Kafka, ainsi que le répertoire et le nom de nœud de données dans ZooKeeper SpoutConfig = new SpoutConfig (BrokerHosts, Sujet, "/ Storm", "S32"); spoutconfig.scheme = new SchemeasMultiScheme (new StringScheme ()); spoutconfig.zkservers = collection.SingletonList (hôte); spoutConfig.zkport = Integer.ParseInt (port); // Lire spoutconfig.startoffsettime = offSetRequest.LaSestTime (); Kafkaspout récepteur = new kafkaspout (spoutconfig); topologybuilder.setspout ("kafka-spout", récepteur, 1) .setnumtasks (2); topologybuilder.setbolt ("alarm-boult", new AlarmBolt (), 1) .setnumtasks (2) .shufflegrouping ("kafka-spout"); Config config = new config (); config.setdebug (false); / * Définissez le nombre de créneaux de ressources que la topologie souhaite saisir dans le cluster Storm. Un emplacement correspond à un processus de travailleur sur le nœud de superviseur. Si le nombre de spots que vous alliez dépasse le nombre de travailleurs de votre nœud physique, la soumission peut échouer. En rejoignant votre cluster, il y a déjà une topologie et il reste 2 ressources de travailleurs. Si vous allouez 4 topologie à votre code, cette topologie peut être soumise, mais après avoir commis, vous constaterez qu'elle ne fonctionne pas. Et lorsque vous tuez une topologie et libérez des emplacements, votre topologie reprendra un fonctionnement normal. * / config.setnumworkers (1); LocalCluster cluster = new localCluster (); Cluster.Submittopology ("Kafka-Spout", config, topologyBuilder.CreateTopology ()); } catch (exception e) {e.printStackTrace (); }}}Note:
Lors du démarrage du projet, l'erreur suivante peut être signalée car elle utilise Tomcat intégré pour le démarrage.
[Tomcat-StartStop-1] Erreur OACCContainerBase - Un conteneur enfant a échoué pendant startjava.util.concurrent.executionException: org.apache.catalina.lifecycleException: a échoué à démarrer le composant [standardEngine [Tomcat] .StandardHost [LocalHost] .TomCatedDedContext []. java.util.concurrent.futuretask.report (futurask.java:122) ~ [?: 1.8.0_144] à java.util.concurrent.futuretask.get (FutureTask.java:192) ~ [?: 1.8.0_144] at à l'atte org.apache.catalina.core.contierbase.startinternal (contenerbase.java:939) [tomcat-embed-core-8.5.23.jar: 8.5.23] à org.apache.catalina.core.standardhost.startinternal (standardhost.java:872) [Tomcat-Embed-core-8.5.23.jar: 8.5.23] à org.apache.catalina.util.lifecyclebase.start (LifecycleBase.java:150) [Tomcat-Embed-core-8.5.23.jar: 8.5.23] AT AT org.apache.catalina.core.ContainerBase $ startchild.call (contenerbase.java:1419) [Tomcat-Embed-Core-8.5.23.jar: 8.5.23] sur org.apache.catalina.core.containerBase $ startChild.Call (ContenerBase.java:1409) [Tomcat-Embed-core-8.5.23.jar: 8.5.23] à java.util.concurrent.futuredask.run $$ capture (Futuretask.java:266) [?: 1.8.0_144] à java.util.concurrent.futuretask.run (futuretask.java) [?: 1.8 java.util.concurrent.threadpoolexecutor.runworker (threadpoolexecutor.java:1149) [?: 1.8.0_144] sur java.util.concurrent.threadpoolExecutor.runworker (threadpoolexecutor.java:1149) [? java.util.concurrent.threadpoolExecutor $ worker.run (threadpoolexecutor.java:624) [?: 1.8.0_144] à java.lang.thread.run (thread.java:748) [?: 1.8.0_144]
En effet Tout ce que nous devons faire est d'ouvrir la dépendance Maven et de la supprimer
<c exclusion> <ArtefactId> Servlet-API </ artifactid> <proupId> javax.servlet </rompgroud> </cchusion>
Puis redémarrez.
Il est possible de signaler pendant le démarrage:
La copie de code est la suivante:
org.apache.storm.utils.nimbusleadernotfoundException: n'a pas pu trouver le leader Nimbus des hôtes de graines [localhost]. Avez-vous spécifié une liste valide d'hôtes Nimbus pour config nimbus.seeds? At org.apache.storm.utils.nimbusclient.getConfiguredClientas (nimbusclient.java:90
J'ai réfléchi à ce problème pendant longtemps et j'ai constaté que les explications en ligne étaient toutes causées par le problème de configuration de la tempête, mais ma tempête est déployée sur le serveur. Il n'y a pas de configuration pertinente. En théorie, nous devons également lire la configuration pertinente sur le serveur, mais le résultat n'est pas le cas. Enfin, j'ai essayé plusieurs méthodes et j'ai découvert que c'était mal. Ici, j'ai trouvé que lors de la construction du cluster, Storm a fourni le cluster local correspondant.
LocalCluster cluster = new localCluster ();
Effectuer des tests locaux. Si vous testez localement, utilisez-le pour des tests de déploiement. Si vous êtes déployé sur le serveur, vous devez:
Cluster.Submittopology ("Kafka-Spout", config, topologybuilder.createTopology ()); // fixé à: StormSubmitteur.Submittopology ("Kafka-Spout", config, topologyBuilder.CreateTopology ());Effectuer une soumission de tâche;
Ce qui précède résout les problèmes ci-dessus 1-3
Question 4: J'utilise l'instance de bean pertinente dans Bolt, j'ai trouvé que je ne peux pas obtenir l'instance si je le mets à Spring en utilisant @Component: je suppose que lorsque nous construisons le Commit Topolgy, ce sera dans:
La copie de code est la suivante:
topologybuilder.setbolt ("alarm-boult", new AlarmBolt (), 1) .setnumtasks (2) .shufflegrouping ("kafka-spout");
Boulon d'exécution lié:
@Override public void prépare (map StormConf, TopologyContext Context, OutputCollector Collector) {this.collector = Collector; Stormlauncher Stormlauncher = Stormlauncher.getStormlauncher (); dataRepositorys = (AlarmDataRepositorys) Stormlauncher.getBean ("AlarmDataRepositorys"); }Sans instancier le boulon, les filetages sont différents et le ressort ne peut pas être obtenu. (Je ne le comprends pas beaucoup ici, si un grand gars le sait, vous pouvez le partager)
La signification de l'utilisation de Spring Boot est que ces objets compliqués sont obtenus. Ce problème m'a troublé depuis longtemps. Enfin, j'ai pensé que nous pouvons obtenir des instances dans le contexte Getbean et je ne sais pas si cela peut fonctionner, alors j'ai commencé à définir:
Par exemple, j'ai besoin d'utiliser un service dans Bolt:
/ ** * @Author Leezer * @Date 2017/12/27 * Opération de stockage Temps de défaillance ** / @ Service ("AlarmDataRepositorys") Classe publique AlarmDataRepositorys étend Redisbase implémente ialarmDataRepositorys {private static final String erro = "erro"; / ** * @param Type Type * @param Valeur clé de clé * @return Nombre d'erreurs ** / @Override public String getErrnumFromredis (String Type, String key) {if (type == null || key == null) {return null; } else {valueOperations <String, String> valueOper = primaireStringRedistEmplate.opSforValue (); return ValueOper.get (String.format ("% s:% s:% s", erro, type, key)); }} / ** * @param Type d'erreur Type * @Param Key Key Valeur * @param Valeur Valeur stockée ** / @Override public void seterRnumToredis (String Type, String Key, String Value) {try {valueperations <String, string> valueOper = primaireStringRedistEmplate.OpsForValue (); ValueOper.Set (String.Format ("% S:% S:% S", Erro, Type, Key), Value, Dictionaries.APIKEYDAYOFLIFECYCLE, TimeUnit.seconds); } catch (exception e) {logger.info (dictionnaires.redis_error_prefix + string.format ("La clé n'a pas réussi à stocker redis dans% s", key)); }}Ici, je spécifie le nom du haricot, et lorsque Bolt s'exécute Préparez-vous: utilisez la méthode GetBean pour obtenir le bean pertinent et terminer l'opération correspondante.
Ensuite, le sujet de Kafka est envoyé à mon boulon pour un traitement connexe. La méthode de GetBean ici est de démarrer la définition de la fonction bootmain:
@ SpringbootApplication @ InteableTransActionManagement @ ComponentsCan ({"Service", "Storm"}) @ pertiablemongorepositries (basepackages = {"Storm"}) @ PropertySource (Value = {"ClassPath: Service.Properties", "classpath:application.properties","classpath:storm.properties"})@ImportResource(locations = { "classpath:/configs/spring-hadoop.xml", "classpath:/configs/spring-hbase.xml"})public class StormLauncher extends SpringBootServletInitializer { //Set the secure thread launcher instance Stormlauncher statique volatile privé Stormlauncher; // Définir le contexte de contexte d'application privée de contexte; public static void main (String [] args) {SpringApplicationBuilder Application = new SpringApplicationBuilder (Stormlauncher.class); // application.web (false) .run (args); Cette méthode est que Spring Boot ne démarre pas Application.Run (args); Stormlauncher S = new Stormlauncher (); S.SetApplicationContext (application.context ()); SetStormlauncher (s); } private static void setStormlauncher (Stormlauncher Stormlauncher) {Stormlauncher.Stormlauncher = Stormlauncher; } public static Stormlauncher getStormlauncher () {return Stormlauncher; } @Override Protected SpringApplicationBuilder Configure (application SpringApplicationBuilder) {return application.sources (Stormlauncher.class); } / ** * Obtenez le contexte * * @return le contexte d'application * / public applicationContext getApplicationContext () {return context; } / ** * Définissez le contexte. * * @param AppContext Context * / private void setApplicationContext (applicationContext AppContext) {this.context = appContext; } / ** * Obtenez le bean d'instance via un nom personnalisé. * * @param name le nom * @return le bean * / objet public getBean (name de chaîne) {return context.getBean (name); } / ** * Obtenez le bean via la classe. * * @param <T> Le paramètre de type * @param Clazz The Clazz * @return the Bean * / public <T> t getbean (class <t> Clazz) {return context.getBean (Clazz); } / ** * Renvoyez le bean spécifié par son nom, et Clazz * * @param <T> Le paramètre Type * @param Nom du nom * @param Clazz The Clazz * @return the Bean * / public <T> T Getbean (Nom de la chaîne, classe <T> Clazz) {return Context.getBean (nom, Clazz); }L'intégration de Storm et Kafka à Spring Boot a pris fin. Je mettrai Kafka et d'autres configurations apparentées dans GitHub
Soit dit en passant, il y a aussi une fosse de kafkaclient ici:
Loop asynchrone est mort! java.lang.nosuchMethoDerror: org.apache.kafka.common.network.networksend.
Le projet rapportera un problème client Kafka, car dans Storm-Kafka, Kafka utilise la version 0.8, tandis que Networksend est la version 0.9 ou plus. L'intégration ici doit être cohérente avec la version liée à Kafka que vous intégrez.
Bien que l'intégration soit relativement simple, il y a peu de références. De plus, je viens de commencer à entrer en contact avec Storm, donc je pense beaucoup. Je vais également l'enregistrer ici.
Adresse du projet - GitHub
Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.