In diesem Artikel wird vorgestellt, wie Kafka -Senden und Empfangen von Nachrichten in das Springboot -Projekt integriert werden.
KAFKA ist ein hochdurchsatz verteiltes Veröffentlichungs-Subscribe-Nachrichtensystem mit den folgenden Eigenschaften: Bieten Sie eine Nachrichtenpersistenz durch die Datenträgerdatenstruktur von O (1) an, wodurch eine stabile Leistung für eine lange Zeit aufrechterhalten kann, selbst wenn Nachrichtenspeicher Terabyte sind. Hoher Durchsatz: Selbst sehr gewöhnliche Hardware -Kafka kann Millionen von Nachrichten pro Sekunde unterstützen. Unterstützt die Aufteilung von Nachrichten über Kafka -Server und Verbrauchercluster. Unterstützen Sie Hadoop -Paralleldatenladen.
Installieren Sie Kafka
Da die Installation von KAFKA die Unterstützung von Zookeeper bei der Installation von Windows erfordert, müssen Sie zuerst Zookeeper installieren und dann Kafka installieren. Im Folgenden werde ich Ihnen die Schritte zur Installation von Mac und die Punkte geben, um darauf zu achten. Die Konfiguration von Windows unterscheidet sich fast nicht für die verschiedenen Standorte.
Brauen installieren Sie Kafka
Ja, es ist so einfach. Sie können es mit einem Befehl auf dem Mac verarbeiten. Dieser Installationsprozess erfordert möglicherweise eine Weile und sollte mit dem Netzwerkstatus zusammenhängen. In der Installationsaufforderung kann eine Fehlermeldung vorhanden sein, z. B. "Fehler: Darf nicht verknüpfen:/usr/local/share/doc/homebrew". Dies spielt keine Rolle, es wird automatisch ignoriert. Schließlich gelang es uns, als wir sahen, was unten war.
==> Zusammenfassung ð¼ º/usr/local/cellar/kafka/1.1.0: 157 Dateien, 47,8 MB
Die Installationskonfigurationsdateisposition ist wie folgt. Ändern Sie einfach die Portnummer entsprechend Ihren Anforderungen.
Installierte Zoopeeper- und Kafka -Standort/USR/Lokal/Keller/
Konfigurationsdatei /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
Starten Sie Zookeeper
Kopieren Sie den Code wie folgt: ./ Bin/Zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
Starten Sie Kafka
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
Erstellen Sie ein Thema für Kafka. Das Thema heißt Test. Sie können es auf den gewünschten Namen konfigurieren. Gehen Sie zurück und konfigurieren Sie ihn im Code richtig.
Kopieren Sie den Code wie folgt: ./ Bin/Kafka-topics-Create-Zookeper localhost: 2181-Replikationsfaktor 1-Partitionen 1-Topic Test
1. Lösen Sie zuerst Abhängigkeiten
Wir werden die Abhängigkeiten im Zusammenhang mit Springboot nicht erwähnen. Die Abhängigkeiten im Zusammenhang mit Kafka beruhen nur auf ein Spring-Kafka-Integrationspaket.
<De vorhöhe> <gruppe> org.springframework.kafka </Groupid> <artifactId> Spring-Kafka </artifactid> <version> 1.1.1.Release </Version> </abhängig>
Hier zeigen wir zuerst die Konfigurationsdatei an
#============================================================================================================================================================================================================================================================================================== kafka.consumer.servers = 10.93.21.21: 2181kafka.consumer.enable.auto.commit = trukafka.consumer.Session.timeout = 6000Kafka.consumer.auto.commit.interval = 100Kafka.Consumer.Ado.Offset .Reset = letzteskafka.consumer.topic = testkafka.consumer.group.id = testkafka.consumer.Concurrency = 10kafka.Producer.Servers = 10.9 3.21.21: 9092Kafka.Producer.Retries = 0Kafka.Producer.batch.size = 4096Kafka.Producer.linger = 1Kafka.Producer.Buffer.Memory = 40960
2. Konfiguration: Kafka -Produzent
1) Konfiguration deklarieren und Kafkatemplate -Funktionen über @Configuration und @EnableKafka öffnen.
2) Injize Kafka -Konfiguration in die Konfigurationsdatei von application.Properties über @Value.
3) Bean erzeugen, @Bean
Paket com.kangaroo.entinel.collect.configuration; import Java.util.hashMap; Import Java.util.map; Imory org.apache.kafka.clients.Producer.Producerconfig; Import org.apache.kafka.common.Sserialization.Stringsly. org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation org.springframework.kafka.core.defaultkafkaproducerFactory; @Value ("$ {kafka.Producer.Servers}") private String -Server; @Value ("$ {Kafka.Producer.Retries}") private int retries; @Value ("$ {kafka.Producer.batch.size}") private int batchsize; @Value ("$ {kafka.Producer.linger}") Private int Desser; @Value ("$ {Kafka.Producer.Buffer.Memory}") private int buffMemory; public map <String, Objekt> produzierenConfigs () {map <String, Objekt> props = new HashMap <> (); props.put (ProducerConfig.bootstrap_Servers_config, Server); props.put (ProducerConfig.Retries_Config, Retries); props.put (producerConfig.batch_size_config, batchSize); props.put (producerConfig.linger_ms_config, Dinger); props.put (ProducerConfig.Buffer_Memory_Config, BufferMemory); props.put (ProducerConfig.key_Serializer_class_config, Stringserializer.class); props.put (ProducerConfig.Value_Serializer_class_config, StringSerializer.class); Return Requisiten; } public proctreserFactory <String, String> processerFactory () {return New DefaultKafkaproducerFactory <> (ProducerConfigs ()); } @Bean public Kafkatemplate <String, String> kafkatemplate () {return New Kafkatemplate <String, String> (processerFactory ()); }}Experimentieren Sie unseren Produzenten und schreiben Sie einen Controller. Want topic = test, key = key, meldung senden
Paket com.kangaroo.sentinel.collect.controller; import com.kangaroo.sentinel org.springframework.beans.factory.annotation.autowired; import org.springframework.kafka.core.kafkatemplate; import org.springframework.web.bind.annotation. javax.servlet.http.httpServletResponse;@rastController@requestmapPing ("/kafka") public class CollectController {Protected Final Logger logger = loggerfactory.getLogger (this.getClass ()); @Autowired Private Kafkatemplate Kafkatemplate; @RequestMapping (value = "/send", method = requestMethod.get) öffentliche Antwort sendkafka (httpServletRequest -Anforderung, httpServletResponse -Antwort) {try {string message = request.getParameter ("message"); logger.info ("kafka message = {}", message); kafkatemplate.send ("test", "key", meldung); logger.info ("Kafka erfolgreich senden."); Neue Antwort zurückgeben (resultcode.success, "Kafka erfolgreich senden", null); } catch (Ausnahme e) {logger.Error ("Kafka senden", e); Neue Antwort zurückgeben (resultcode.exception, "send kafka fehlgeschlagen", null); }}}3. Konfiguration: Kafka -Verbraucher
1) Konfiguration deklarieren und Kafkatemplate -Funktionen über @Configuration und @EnableKafka öffnen.
2) Injize Kafka -Konfiguration in die Konfigurationsdatei von application.Properties über @Value.
3) Bean erzeugen, @Bean
Paket com.kangaroo.entinel.collect.configuration; import org.apache.kafka.clients.consumer.consumerconfig; import org.apache.kafka.common.Serialization.StringDeSerializer; Import org. org.springframework.context.annotation.bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation org.springframework.kafka.config.kafkalistenercontainerFactory; import org.springframework.kafka.core.consumerFactory; import org.springframework.kafka.core.defaultkafkaconumerFactory; org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value ("$ {kafka.consumer.enable.auto.commit}") Private boolean EnableAutoCommit; @Value ("$ {kafka.consumer.Session.timeout}") private String Sessiontimeout; @Value ("$ {kafka.consumer.auto.commit.interval}") private String AutoCommitInterval; @Value ("$ {kafka.consumer.group.id}") Private String GroupID; @Value ("$ {kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value ("$ {kafka.consumer.Concurrency}") Private Int Concurrency; @Bean Public KafkalistenerContainerFactory <ConcurrentMessagelistenerContainer <String, String >> KafkalistenerContainerFactory () {ConcurrentKafkalistenContainerFactory <String, String> Factory = New ConcurentKafkalistenContainerFactory <> () (); factory.setConsumerFactory (ConsumerFactory ()); factory.setConcurrency (Parallelität); factory.getContainerProperties (). setpolltimeout (1500); Rückgabefabrik; } public ConsumerFactory <String, String> ConsumerFactory () {Neue DefaultKafkaconsumerFactory <> (ConsumerConfigs ()); } public map <String, Objekt> ConsumerConfigs () {Map <String, Object> propsmap = new HashMap <> (); propsmap.put (ConsumerConfig.bootstrap_Servers_config, Server); propSmap.put (ConsumerConfig.Enable_auto_Commit_Config, EnableAutoCommit); propsmap.put (ConsumerConfig.Auto_Commit_Interval_ms_config, autocommitinterval); propsmap.put (ConsumerConfig.session_Timeout_ms_config, SessiontimeOut); propsmap.put (ConsumerConfig.key_Deserializer_class_config, StringDeserializer.class); propSmap.put (ConsumerConfig.Value_Deserializer_Class_Config, StringDeserializer.class); propSmap.put (ConsumerConfig.group_id_config, GroupID); propSmap.put (ConsumerConfig.Auto_Offset_Reset_Config, AutoOffsetReset); propsmap zurückgeben; } @Bean public louser louser () {return New Listener (); }}New Listener () generiert eine Bean, um Daten aus Kafka zu verarbeiten. Die einfache Implementierungsdemo des Hörers lautet wie folgt: Lesen Sie einfach einfach die Schlüssel- und Nachrichtenwerte
@Kafkalisteners Themenattribut werden verwendet, um den Kafka -Themennamen anzugeben. Der Themename wird vom Nachrichtenproduzenten angegeben, dh von Kafkatemplate beim Senden einer Nachricht angegeben.
Paket com.kangaroo.entinel.collect.configuration; import org.apache.kafka.clients.consumer.consumerrecord; Logger logger = loggerFactory.getLogger (this.getClass ()); @Kafkalistener (topics = {"test"}) public void listen (conspeterRecord <?,?> Record) {logger.info ("Kafkas Schlüssel:" + record.key ()); logger.info ("Kafkas Wert:" + record.Value (). toString ()); }}Tipps:
1) Ich habe nicht vorgestellt, wie Kafka installiert und konfiguriert wird. Bei der Konfiguration von KAFKA anstelle von Localhost oder 127.0.0.1 ist es am besten am besten, ein vollständig gebundenes Netzwerk -IP zu verwenden.
2) Es ist am besten, Kafkas eigenes Zookeeper nicht zur Bereitstellung von Kafka zu verwenden, da dies zu Zugang zu Unzugänglichkeit führen kann.
3) Theoretisch sollte der Verbraucher Kafka durch Zookeeper lesen, aber hier verwenden wir die Adresse von Kafkaserver. Warum haben wir nicht detailliert in die Tiefe eingegangen?
4) Beim Definieren der Konfiguration der Überwachungsmeldung wird der Wert des Konfigurationselements gruppe_id_config verwendet, um den Namen der Verbrauchergruppe anzugeben. Wenn in derselben Gruppe mehrere Listener -Objekte vorhanden sind, kann nur ein Listener -Objekt die Nachricht empfangen.
Das obige ist der gesamte Inhalt dieses Artikels. Ich hoffe, es wird für das Lernen aller hilfreich sein und ich hoffe, jeder wird Wulin.com mehr unterstützen.