Vorwort
Aufgrund der geschäftlichen Bedürfnisse müssen Strom und Kafka in das Spring Boot -Projekt und andere Dienste -Ausgabemittel in das Kafka -Abonnement -Thema integriert werden. Storm behandelt das Thema in Echtzeit, um die Datenüberwachung und andere Datenstatistiken zu vervollständigen. Es gibt jedoch nur wenige Online -Tutorials. Was ich heute schreiben möchte, ist, wie man Storm+Kafka in Spring Boot integriert, und übrigens werde ich über die Fallstricks sprechen, die ich begegnet bin.
Verwendungswerkzeuge und Umgebungskonfiguration
1. Java Version JDK-1.8
2. Compilation Tool verwendet Idea-2017
3.. Maven als Projektmanagement
4.Spring Boot-1.5.8.Release
Forderung Manifestation
1. Warum müssen Sie sich in Spring Boot integrieren?
Um Spring Boot zu verwenden, um verschiedene Microservices einheitlich zu verwalten und gleichzeitig mehrere dezentrale Konfigurationen zu vermeiden
2. Spezifische Ideen und Gründe für die Integration
Verwenden Sie Spring Boot, um Bohnen zu verwalten, die von Kafka, Storm, Redis usw. benötigt werden, sammeln Sie sie über andere Serviceprotokolle bei Kafka und senden Sie Protokolle in Echtzeit an Sturm und führen Sie entsprechende Verarbeitungsvorgänge durch, wenn Strom Bolt ist
Probleme auftreten
1. Es gibt keinen relevanten Integrationssturm bei der Verwendung des Spring Boots
2. Ich weiß nicht, wie ich das Commit -Topolgy im Spring Boot auslösen soll
3.. Ich habe ein Problem mit Numbis, nicht Kunden localhost, bei der Einreichung einer Topologie gestoßen
V.
Lösung
Vor der Integration müssen wir die entsprechende Methode und Konfiguration des Spring -Boot -Starts kennen (wenn Sie diesen Artikel lesen, haben Sie standardmäßig Storm, Kafka und Spring Boot verwendet und verwendet).
Es gibt nur wenige Beispiele für die Integration von Storm in Spring Boot im Internet, aber aufgrund der entsprechenden Anforderungen müssen wir uns noch integrieren.
Importieren Sie zuerst das erforderliche JAR -Paket:
<Depepecy> <GroupId> org.apache.kafka </GroupId> <artifactId> Kafka-Clients </artifactId> <version> 0.10.1 </Version> </abhängig> <Depopact> <Gruppe-org. <Exclusion> <artifactId> zookeeper </artifactId> <gruppe> org.apache.zookeeper </Groupid> </exklusion> <xclusion> <artifactID> Spring-Boot-Actuator </artifactID> <gruppe> org.spingframework.boot </gruppen> </</</</</</</</</</</</Group> </</</</Group> </</</</</</Group> </</</</</</</</</</</Group> </</</</</</</</</</</</</</</</</</</</</</</</</</</</</</</</</</ <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.5.0.RELEASE</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>netty</artifactId> <SupructId> io.netty </GroupId> </exklusion> <xclusion> <artifactID> Jackson-core-asl </artifactId> <gruppe> org.codehehaus.jackson </Groupid> </exclusion> <exclusion> <artifactId> curator-client </artifactid> <gruppe> <gruppe> <gruppe> <gruppe> <gruppe> <gruppe> <gruppe> <gruppe> <gruppe> <gruppe> <gruppe> <Gruppen> </artifactid> <gruppe> gruppen> gruppen> rot.apache <artifactId>jettison</artifactId> <groupId>org.codehaus.jettison</groupId> </exclusion> <exclusion> <artifactId>jackson-mapper-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-jaxrs</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion> <artifactId>jackson-xc</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>guava</artifactId> <groupId>com.google.guava</groupId> </exclusion> <exclusion> <artifactId>hadoop-mapreduce-client-core</artifactId> <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.4</version> <exclusions> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <CruupId> org.apache.zookeeper </GroupID> </exklusion> <exclusion> <artifactId> netty </artifactId> <gruppe> io.netty </gruppen> </exklusion> <xclusion> <artifactid> hadoop-common </artifactid> <gruppen> org.apache <artifactId> Guava </artifactId> <gruppe> com.google.guava </Groupid> </exklusion> </exklusion> <exklusion> <artifactid> Hadoop-Annotationen </artifactId> <gruppe> org. <gruppeId> org.apache.hadoop </GroupId> </exklusion> <xclusion> <artifactId> SLF4J-log4j12 </artifactid> <gruppe> org.slf4j </GroupId> </exclusion> </exclusions> </abhängig> <abhängigkeit> <gruppId> org.apache <artifactId>hadoop-common</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> <exclusion> <artifactId>curator-client</artifactId> <groupId>org.apache.curator</groupId> </exclusion> <exclusion> <artifactId>jackson-mapper-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>jackson-core-asl</artifactId> <groupId>org.codehaus.jackson</groupId> </exclusion> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>snappy-java</artifactId> <groupId>org.xerial.snappy</groupId> </exclusion> <exclusion> <artifactId>zookeeper</artifactId> <gruppeId> org.apache.zookeeper </gruppeId> </exklusion> <exclusion> <artifactId> Guava </artifactId> <gruppe> com.google <artifactId> commons-Lang </artifactId> <gruppe> commons-Lang </Groupid> </exklusion> <xclusion> <artifactid> SLF4J-log4j12 </artifactId> <gruppe> org.slf4j </gruppen> </exclusion> <artifactid> servlet-api </artifactid> <groupId>javax.servlet</groupId> </exclusion> </exclusion> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-examples</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <exclusion> <artifactId>commons-logging</artifactId> <gruppe> Commons-Logging </GroupId> </exklusion> <xclusion> <artifactId> netty </artifactId> <gruppe> io.netty </gruppen> </exklusion> <exclusion> <artifactID> Guava </artifactId> <gruppe> com.google.guava </gruppa </gruppa </gruppa </gruppa </gruppa </gruppa </gruppa </gruppa </gruppa </gruppa </gruppa <artifactId> log4j </artifactId> <gruppe> log4j </GroupID> </exklusion> <exclusion> <artifactId> servlet-api </artifactId> <gruppe> Javax.Servlet </GroupId> </exklusion> </exklusions> </abhängig> <! <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>${provided.scope}</scope> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <artifactId> servlet-api </artifactId> <gruppe> javax.servlet </GroupID> </exklusion> </exklusions> </abhängig> <abhängig> <gruppe> org.apache <artifactId> Kafka-Clients </artifactid> <gruppe> org.apache.kafka </Groupid> </exklusion> </exklusions> </abhängig>Das JAR -Paket wird entfernt, da es mehrere Abhängigkeiten zu Projektkonstruktionsabhängigkeiten gibt. Die Sturmversion ist 1.1.0 Abhängigkeiten mit Springstiefel bezogen
`` `Java
<!-Spring Boot-> <Depopentcy> <GroupId> org.springFramework.boot </GroupId> <artifactId> Spring-Boot-Starter </artifactId> <Exclusions> <exklusion> <GroupId> org.springFramework.boot </gruppen> <artifactid> Spring-boot-Starter-Logging </artifactid>; </abhängig> <depeping> <gruppe> org.springFramework <gruppeID> org.springFramework.boot </GroupId> <artifactId> Spring-Boot-Starter-Test </artifactID> <Schope> Test </scope> </abhängig> <Depopentcy> <GroupId> org.springFramework. <gruppeId> org.mybatis.spring.boot </GroupId> <artifactId> mybatis-pring-boot-starter </artifactid> <version> $ {mybatis-pring.version} </Version> </abhängig> <Diefency> <gruppe <gruppens org. <artifactid> Spring-Boot-Konfigurationsprozessor </artifactId> <option> true </optional> </abhängig>PS: Das JAR -Paket von Maven ist aufgrund der Projektnutzungsanforderungen nicht am optimiertesten. Es dient nur zur Referenz.
Projektstruktur:
Konfigurationskonfigurationsdateien in verschiedenen Umgebungen
Speicher Build Spring Boot verwandte Implementierungsklassen wie Build -Name
Beim Starten des Spring Boot werden wir finden
Vor Beginn der Integration wusste ich wenig über Storm, das zu Beginn nicht in Kontakt stand. Später stellte ich fest, dass ich nach der Integration in Spring Boot nicht die entsprechende Möglichkeit hatte, die Funktion des Topolgies nach dem Start des Spring Stiefel auszulösen. Deshalb dachte ich, dass ich nach dem Start von Spring Stiefel fertig war. Infolgedessen wartete ich eine halbe Stunde und nichts passierte, bevor ich feststellte, dass die Funktion des Auslösens des Commits nicht implementiert wurde.
Um dieses Problem zu lösen, ist meine Idee: Start Spring Boot-> Kafka-Hörsthema erstellen und topolgy starten, um das Startup zu vervollständigen. Für ein solches Problem wird Kafka jedoch wiederholt Topolgy auslösen, was offensichtlich nicht das ist, was wir wollen. Nachdem ich eine Weile gesehen hatte, stellte ich fest, dass der Frühling ein verwandtes Startup hat und nach Abschluss einer bestimmten Zeitmethode ausgeführt wurde. Dies ist ein Retter für mich. Die Idee, Topolgy auszulösen, ist also:
Start Spring Start -> Führen Sie die Triggermethode aus -> Vervollständigen Sie die entsprechende Auslöserbedingung
Die Konstruktionsmethode lautet:
/** * @Author Leezer * @date 2017/12/28 * automatisch Topologie nach dem Laden des Springs einreichen privates statisches String -Thema; privater statischer String -Host; privater statischer String -Port; public autoload (@Value ("$ {storm.brokrokerzStr}") String BrokerZKSTR, @Value ("$ {zookeeper.host}") String-Host, @Value ("$ {zookeeper.port}") String-Port, @value ($ {kafka.Default-topic} ") String thema) {{kafka.default-topic}") String thema) {{kafka.default-topic} ") String thema) {{kafka.default-topic}") String thema) {{kafka.default-topic} ") String thema) {{kaFka.Default. BrokerZKStr; Host = Host; Thema = Thema; Port = port; } @Override public void onapplicationEvent (contextreFreshedEvent Ereignis) {try {// Die TopologyBuilder -Klasse instanziiert. TopologyBuilder TopologyBuilder = New TopologyBuilder (); // Setzen Sie den Eruptionsknoten und weisen Sie die Parallelitätsnummer zu. Die Parallelitätsnummer steuert die Anzahl der Threads des Objekts im Cluster. BrokerHosts brakeHosts = neu Zkhosts (Brokerzkstr); // Konfigurieren Sie das Thema für das KAFKA -Abonnement sowie das Datenknotenverzeichnis und den Namen in Zookeeper Spoutconfig = New Spoutconfig (BrokerHosts, Thema, "/Storm", "S32"); ausgoutconfig.scheme = new Schemeasmultischeme (New StringScheme ()); ausgoutconfig.zkServers = collections.singletonList (Host); ausspoutconfig.zkport = integer.parseInt (port); // Spoutconfig.startoffsettime = offsetRequest.LatestTime (); Kafkaspout receiver = new kafkaspout (ausspoutconfig); TopologyBuilder.SetSpout ("Kafka-Spout", Empfänger, 1) .SetNumTasks (2); TopologyBuilder.SetBolt ("Alarm-Bolt", New Alarmbolt (), 1) .SetNumTasks (2) .ShuffleGrouping ("Kafka-Spout"); Config config = new config (); config.setDebug (false); /* Legen Sie die Anzahl der Ressourcen -Slots fest, die die Topologie im Sturmcluster einnehmen möchte. Ein Steckplatz entspricht einem Arbeiterprozess auf dem Vorgesetztenknoten. Wenn die Anzahl der von Ihnen zugewiesenen Flecken die Anzahl der Arbeitnehmer überschreitet, die Ihr physischer Knoten hat, kann die Einreichung nicht erfolgreich sein. Wenn Sie sich Ihrem Cluster anschließen, gibt es bereits einige Topologie, und es sind noch 2 Arbeiterressourcen übrig. Wenn Sie Ihrem Code 4 Topologie zuweisen, kann diese Topologie eingereicht werden, aber nach dem Verhalten werden Sie feststellen, dass sie nicht ausgeführt wird. Und wenn Sie einige Topologie töten und einige Slots veröffentlichen, wird Ihre Topologie den normalen Betrieb fortsetzen. */ config.setnumworkers (1); LocalCluster cluster = new localCluster (); cluster.submittopology ("kafka-spout", config, topologyBuilder.createTopology ()); } catch (Ausnahme e) {e.printstacktrace (); }}}Notiz:
Beim Starten des Projekts kann der folgende Fehler gemeldet werden, da ein eingebettetes Tomcat zum Start verwendet wird.
[Tomcat-StartStop-1] Fehler oacccontainerbase-Ein unterstiegser Container ist während startjava.util.concurrent ausgefallen. ExecutionException: org.apache.catalina java.util.concurrent.futuretask.report (futuretask.java:122) ~ [?: 1.8.0_144] unter java.util.concurrent.futuretask.get (futuretask.java:192) ~ [?: 1.8.0_144] org.apache.catalina.core.containerbase.startinternal (containerbase.java:939) [tomcat-embed-8.5.23.jar: 8.5.23] at org.apache.catalina.core.standardhost.startinternal (StandardHost.java:872)) [tomcat-embed-core-8.5.23.jar: 8.5.23] at org.apache.catalina org.apache.catalina.core.containerbase $ startchild.call (containerbase.java:1419) [tomcat-embed-core-8.5.23.jar: 8.5.23] at org.apache.catalina.core.containbase $ Startchild.call (content (contorbase [tomcat-embed-core-8.5.23.jar:8.5.23] at java.util.concurrent.FutureTask.run$$capture(FutureTask.java:266) [?:1.8.0_144] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144] at java.util.concurrent.threadpoolexecutor.runworker (threadpoolexecutor.java:1149) [?: 1.8.0_144] bei Java.util.Concurrent.Threadpoolexecutor.runworker (threadpoolexecutor.java:1149) [?: 1.8.0_0_144] ATREAL ATHAVA. java.util.concurrent.threadpoolexecutor $ Worker.run (threadpoolexecutor.java:624) [?: 1.8.0_144] bei java.lang.thread.run (thread.java:748) [?: 1.8.0_144]
Dies liegt daran, dass das entsprechende importierte JAR-Paket die Servlet-API-Version, die niedriger als die eingebettete Version ist, einführt. Alles was wir tun müssen, ist die Abhängigkeit von Maven zu öffnen und zu entfernen
<Ecclusion> <artifactId> servlet-api </artifactId> <gruppe> javax.servlet </Groupid> </exklusion>
Dann neu starten.
Es ist möglich, während des Startups zu melden:
Die Codekopie lautet wie folgt:
org.apache.storm.utils.nimbusleadernotFoundException: konnte den Führer Nimbus nicht von Saatguthosts finden [Localhost]. Haben Sie eine gültige Liste von Nimbus -Hosts für config nimbus.seeds angegeben?
Ich dachte lange über dieses Problem nach und stellte fest, dass die Online -Erklärungen durch das Problem der Sturmkonfiguration verursacht wurden, aber mein Sturm wird auf dem Server bereitgestellt. Es gibt keine relevante Konfiguration. Theoretisch sollten wir auch die relevante Konfiguration auf dem Server lesen, das Ergebnis ist jedoch nicht der Fall. Schließlich probierte ich mehrere Methoden aus und stellte fest, dass es falsch war. Hier stellte ich fest, dass Storm beim Bau des Clusters den entsprechenden lokalen Cluster zur Verfügung stellte.
LocalCluster cluster = new localCluster ();
Lokale Tests durchführen. Wenn Sie lokal testen, verwenden Sie es für Bereitstellungstests. Wenn Sie auf dem Server bereitgestellt werden, müssen Sie:
cluster.submittopology ("kafka-spout", config, topologyBuilder.createTopology ()); // fix an: stormsubmitter.submittopology ("kafka-spout", config, topologybuilder.createtopology ());Einreichung von Aufgaben durchführen;
Die oben genannten löst die obigen Probleme 1-3
Frage 4: Ich benutze die relevante Bean -Instanz in Bolt. Ich habe festgestellt, dass ich die Instanz nicht erhalten kann, wenn ich sie im Frühjahr mit @Component einsetze: Ich vermute, wenn wir das Commit -Topolgy aufbauen, wird es in:
Die Codekopie lautet wie folgt:
TopologyBuilder.SetBolt ("Alarm-Bolt", New Alarmbolt (), 1) .SetNumTasks (2) .ShuffleGrouping ("Kafka-Spout");
Ausführungsschraube verwandt:
@Override public void prepe (map stormconf, topologycontext context, outputCollector collector) {this.collector = collector; Stormlauncher Stormlauncher = Stormlauncher.getSmlauncher (); DataRepositorys = (alarmdatarepositorys) stormlauncher.getbean ("alarmdatarepositorys"); }Ohne die Bolzen zu instanziieren, sind die Gewinde unterschiedlich und die Feder kann nicht erhalten werden. (Ich verstehe es hier nicht sehr, wenn ein großer Kerl es weiß, können Sie es teilen)
Die Bedeutung der Verwendung des Federschuhs ist, dass diese komplizierten Objekte erhalten werden. Dieses Problem hat mich schon lange beunruhigt. Schließlich dachte ich, wir können Instanzen durch den Kontext -GetBean bekommen und nicht wissen, ob es funktionieren kann. Deshalb habe ich angefangen zu definieren:
Zum Beispiel muss ich einen Dienst in Bolt verwenden:
/*** @Author Leezer* @date 2017/12/27* Speicherbetriebsausfallzeit **/ @service ("alarmdatarepositorys") öffentliche Klasse alarmdatarepositorys erweitert Redisbase -Geräte IalarmDatArpositorys {private statische Schluss -String erro = "erro"; / *** @param Typtyp* @param Schlüsselschlüssel Wert* @return Anzahl der Fehler **/ @Override public String getRnumFromRedis (String -Typ, String -Schlüssel) {if (type == null || key == null) {return null; } else {valueOperations <string, string> valueOper = primaryStringRedistemplate.opsforValue (); return valueOper.get (string.format ("%s:%s:%s", erro, type, key)); }} / *** @param Typ Fehlertyp* @Param Schlüsselschlüssel Wert* @param Wert Storierter Wert ** / @Override public void setRnumtoredis (String -Typ, String -Taste, String -Wert) {try {valueOperations <String, String> valueOper = primaryStringReDemplate.opsforValue (); valueOper.set (string.format ("%s:%s:%s", erro, type, key), value, wörterbücher } catch (Ausnahme e) {logger.info (Dictionaries.redis_error_prefix+string.format ("Schlüssel konnte redis in %s", Schlüssel nicht speichern)); }}Hier geben ich den Namen der Bean an und wenn Bolt ausführt, Vorbereitung: Verwenden Sie die GetBean -Methode, um die entsprechende Bean zu erhalten und die entsprechende Operation abzuschließen.
Dann wird Kafka abonnieren, dass ein Thema für die damit verbundene Verarbeitung an meinen Bolzen gesendet wird. Die Methode der GetBean hier besteht darin, die BootMain -Funktionsdefinition zu starten:
@SpringBootApplication@EnableTransactionManagement@componentscan ({"service", "storm"})@enableMongorePositories (BasEpackages = {"Storm"})@PropertySoSource (value = {"classPath: service.Properties", "classpath: approperties", "classpath:" classpath: "path:" path: "path:" path: "path:" path: "path. "ClassPath: /configs/spring-hadoop.xml", "ClassPath: /configs/spring-hbase.xml"}) Der öffentliche Stormlauncher erweitert SpringbootServletializer {// Setzen Sie den Secure Thread Launcher Instance privat volatile statische Stormlaerer Stormlaercher; // Setzen Sie den Kontext private applicationContext -Kontext; public static void main (String [] args) {SpringApplicationBuilder application = new SpringApplicationBuilder (Stormlauncher.Class); // application.web (falsch) .run (args); Diese Methode ist, dass Spring Boot nicht mit der Anwendung startet. Run (Args); Stormlauncher S = neuer Stormlauncher (); S.SetApplicationContext (Application.Context ()); setStormlauncher (s); } Private statische Leere SetStormlauncher (Stormlauncher Stormlauncher) {Stormlauncher.stormlauncher = Stormlauncher; } public static stormlauncher getStormlauncher () {return stormlauncher; } @Override Protected SpringApplicationBuilder configure (SpringApplicationBuilder -Anwendung) {return application.sources (stormlauncher.class); } / ** * den Kontext erhalten * * @return den Anwendungskontext * / public ApplicationContext getApplicationContext () {return context; } /*** Setzen Sie den Kontext. * * @param AppContext Context */ private void setApplicationContext (ApplicationContext appContext) {this.context = AppContext; } /*** Holen Sie sich die Instanzbohne über einen benutzerdefinierten Namen. * * @param name den namen * @return die bean */ public Object getBean (String -Name) {return context.getBean (Name); } /*** Holen Sie sich Bohnen durch die Klasse. * * @param <t> Der Typ Parameter * @param clazz the clazz * @return the bean */ public <t> T getBean (Klasse <T> clazz) {return context.getBean (Clazz); } / ** * Geben Sie die angegebene Bean mit dem Namen zurück und Clazz * * @param <T> Der Typ -Parameter * @param name den Namen * @param clazz the clazz * @return the bean * / public <T> t getBean (String -Name, Klasse <T> clazz) {return context.getbean (name, clazz); }Die Integration von Storm und Kafka in Spring Boot ist beendet. Ich werde verwandte Kafka und andere Konfigurationen in GitHub einbringen
Übrigens gibt es hier auch eine Kafkaklient -Grube:
Async Loop starb! java.lang.nosuchMethoderror: org.apache.kafka.common.network.networksend.
Das Projekt meldet ein Kafka-Kundenproblem, da Kafka in Storm-Kafka Version 0.8 verwendet, während Networksend Version 0.9 oder höher ist. Die Integration hier muss mit der von Ihnen integrierten Kafka -Version übereinstimmen.
Obwohl die Integration relativ einfach ist, gibt es nur wenige Referenzen. Außerdem habe ich gerade angefangen, mit Sturm in Kontakt zu kommen, also denke ich viel. Ich werde es auch hier aufzeichnen.
Projektadresse - GitHub
Das obige ist der gesamte Inhalt dieses Artikels. Ich hoffe, es wird für das Lernen aller hilfreich sein und ich hoffe, jeder wird Wulin.com mehr unterstützen.