Vorwort
Im vorherigen Artikel sprechen wir darüber, wie man einen Kafka -Cluster erstellt, und in diesem Artikel wird darüber gesprochen, wie man einfach Kafka benutzt. Bei der Verwendung von Kafka sollten Sie jedoch Kafka immer noch kurz verstehen.
Einführung in Kafka
Kafka ist ein hochdurchsatz verteiltes Abonnement-Abonnement-Messaging-System, das alle Aktionsflussdaten in einer Website im Verbraucherbereich übernimmt.
Kafka hat die folgenden Eigenschaften:
Kafka -Begriffe
Kafka Core API
Kafka hat vier Kern -APIs
Das Beispieldiagramm lautet wie folgt:
Kafka -Anwendungsszenarien
Weitere Informationen finden Sie im offiziellen Kafka -Dokument.
Entwicklungsvorbereitung
Was sollen wir tun, wenn wir ein Kafka -Programm entwickeln würden?
Erstens müssen wir nach dem Bau einer Kafka -Umgebung überlegen, ob wir ein Hersteller oder ein Verbraucher sind, dh der Absender oder Empfänger der Nachricht.
In diesem Artikel werden sich jedoch sowohl Produzenten als auch Verbraucher entwickeln und erklären.
Nach einem groben Verständnis von Kafka werden wir das erste Programm entwickeln.
Die hier verwendete Entwicklungssprache ist Java, das Bauwerkzeug Maven.
Die Abhängigkeiten von Maven sind wie folgt:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version> 1.0.0 </version> </abhängig> <depeopcy> <gruppe> org.apache.kafka </Groupid> <artifactid> kafka-streams </artifactid> <version> 1.0.0 </Version> </abhängig>
Kafka -Produzent
Während der Entwicklung und Produktion stellen wir kurz die verschiedenen Konfigurationsanweisungen von Kafka vor:
...
Es gibt mehr Konfigurationen, Sie können die offizielle Dokumentation überprüfen, die hier nicht erläutert wird.
Dann lautet unsere Kafka -Produzentenkonfiguration wie folgt:
Eigenschaften props = neue Eigenschaften (); props.put ("Bootstrap.Servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("acks", "alles"); props.put ("returns", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringserializer.class.getName ()); props.put ("value.Serializer", Stringserializer.class.getName ()); KafkaproDucer <String, String> produzierer = new KafkaproDucer <String, String> (Requisiten);Nachdem wir die KAFKA -Konfiguration hinzugefügt haben, beginnen wir mit der Erstellung von Daten. Der Produktionsdatencode muss nur wie folgt sein:
Produzent.Send (neue procoderEnRecord <String, String> (Thema, Schlüssel, Wert));
Nach dem Schreiben des Produzentenprogramms beginnen wir zuerst zu produzieren!
Die Nachricht, die ich hier gesendet habe, lautet:
String messAgr = "Hallo, das ist die"+Messageno+"Daten";
Und es werden nur 1.000 Nachrichten gesendet und die Ergebnisse sind wie folgt:
Sie können sehen, dass die Informationen erfolgreich gedruckt wurden.
Wenn Sie das Programm nicht verwenden möchten, um zu überprüfen, ob das Programm erfolgreich gesendet wird und die Genauigkeit des Sendens der Nachricht, können Sie den Befehl verwenden, um es auf dem Kafka -Server anzuzeigen.
Kafka -Verbraucher
Der Kafka -Verbrauch sollte schließlich der entscheidende Punkt sein, der die meiste Zeit hauptsächlich den Datenverbrauch verwenden.
Die Konfiguration des Kafka -Verbrauchs lautet wie folgt:
Dann lautet unsere Kafka -Verbraucherkonfiguration wie folgt:
Eigenschaften props = neue Eigenschaften (); props.put ("Bootstrap.Servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("Group.id", GroupID); props.put ("enable.auto.commit", "wahr"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("max.poll.records", 1000); props.put ("auto.offset.reset", "frühestens"); props.put ("Key.Deserializer", StringDeserializer.class.getName ()); props.put ("value.deserializer", stringDeserializer.class.getName ()); Kafkaconsumer <String, String> Consumer = new Kafkaconsumer <String, String> (Requisiten); Da ich die automatische Einreichung eingerichte, lautet der Verbrauchscode wie folgt:
Wir müssen zuerst ein Thema abonnieren, dh angeben, welches Thema zu konsumieren ist.
Consumer.Subscribe (Arrays.aSlist (Thema));
Nach dem Abonnieren ziehen wir Daten von Kafka:
CENSTEMERRECORDS <string, String> msglist = Consumer.Poll (1000);
Im Allgemeinen wird die Überwachung verwendet, wenn der Konsum durchgeführt wird. Hier verwenden wir (;;), um den Verbrauch von 1.000 Elementen zu überwachen und zu beenden!
Die Ergebnisse sind wie folgt:
Es ist ersichtlich, dass wir hier erfolgreich Produktionsdaten konsumiert haben.
Code
Dann sind die Codes für Hersteller und Verbraucher wie folgt:
Produzent:
importieren java.util.properties; import org.apache.kafka.clients.producer KAFKA -Produzent Demo* Version: 1.0.0* @Author Pancm* @date 26. Januar 2018*/Public Class KafkaproDucertest implementiert Runnable {private endgültige KafkaproDucer <String, String> Produzent; privates endgültiges String -Thema; public kafkaproDucertest (String topicname) {Properties props = new Properties (); props.put ("Bootstrap.Servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("acks", "alles"); props.put ("returns", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringserializer.class.getName ()); props.put ("value.Serializer", Stringserializer.class.getName ()); this.Producer = new KafkaproDucer <String, String> (Requisiten); this.topic = topicname; } @Override public void run () {int Messageno = 1; Versuchen Sie {für (;;) {String messASTEST = "Hallo, dies ist die"+Messageno+"Balkenleiste"; Produzent.Send (neue ProducTErrecord <String, String> (Thema, "Nachricht", MessASTESTR)); // Wenn 100 Elemente erzeugt werden, if (Messageno%100 == 0) {System.out.println ("gesendete Nachricht:" + MessASTESTR); } // Wenn 1000 Elemente erzeugt werden, if (Messageno%1000 == 0) {System.out.println ("erfolgreich gesendet"+Messageno+"Bar"); brechen; } Messageno ++; }} catch (Ausnahme e) {e.printstacktrace (); } endlich {Produzent.CLOSE (); }} public static void main (String args []) {kafkaproducertest test = new Kafkaproducertest ("kafka_test"); Thread Thread = neuer Thread (Test); Thread.Start (); }}Verbraucher:
Import Java.util.Arrays; Import Java.util.Properties; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrecord; org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.common.Serialization.stringDeserializer;/**** Titel: Kafkaconsumertest* Beschreibung:* Kafka Consumer -Verbraucher. KafkaconSumertest implementiert Runnable {private endgültige Kafkaconsumer <String, String> Consumer; Private ConsumerRecords <String, String> msglist; privates endgültiges String -Thema; private statische endgültige String GroupID = "Groupa"; public KafkaconSumertest (String topicname) {Properties props = new Properties (); props.put ("Bootstrap.Servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("Group.id", GroupID); props.put ("enable.auto.commit", "wahr"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("auto.offset.reset", "frühestens"); props.put ("Key.Deserializer", StringDeserializer.class.getName ()); props.put ("value.deserializer", stringDeserializer.class.getName ()); this.consumer = new Kafkaconsumer <String, String> (Requisiten); this.topic = topicname; this.consumer.subscribe (arrays.aslist (Thema)); } @Override public void run () {int Messageno = 1; System.out.println ("------------------------------------"); try {for (;;) {msglist = Consumer.poll (1000); if (null! record.Value ()+"offset ==="+record.Offset ()); } // Sobald 1000 Elemente konsumiert sind, beenden Sie, wenn (Messageno%1000 == 0) {Break; } Messageno ++; }} else {thread.sleep (1000); }}} catch (interruptedException e) {e.printstacktrace (); } endlich {Consumer.CLOSE (); }} public static void main (String args []) {KafkaconSumertest Test1 = new Kafkaconsumertest ("kafka_test"); Thread Thread1 = neuer Thread (test1); Thread1.Start (); }}HINWEIS: Master, SLAVE1, SLAVE2 liegt daran, dass ich in meiner eigenen Umgebung Beziehungszuordnungen gemacht habe, die durch die IP des Servers ersetzt werden können.
Natürlich habe ich das Projekt auf GitHub gesetzt, und wenn Sie interessiert sind, können Sie einen Blick darauf werfen. https://github.com/xuwujing/kafka (lokaler Download)
Zusammenfassen
Die einfache Entwicklung eines Kafka -Programms erfordert die folgenden Schritte:
Kafka Einführung Siehe offizielles Dokument: http://kafka.apache.org/Intro
Zusammenfassen
Das obige ist der gesamte Inhalt dieses Artikels. Ich hoffe, dass der Inhalt dieses Artikels einen gewissen Referenzwert für das Studium oder die Arbeit eines jeden hat. Wenn Sie Fragen haben, können Sie eine Nachricht zur Kommunikation überlassen. Vielen Dank für Ihre Unterstützung bei Wulin.com.