Préface
Dans l'article précédent, nous parlons de la façon de construire un cluster Kafka, et cet article explique comment utiliser simplement Kafka. Cependant, lorsque vous utilisez Kafka, vous devriez toujours comprendre brièvement Kafka.
Introduction à Kafka
Kafka est un système de messagerie d'abonnement publié à haut débit distribué qui gère toutes les données de flux d'action dans un site Web à l'échelle des consommateurs.
Kafka a les caractéristiques suivantes:
Termes kafka
API Kafka Core
Kafka a quatre API centrales
L'exemple de diagramme est le suivant:
Scénarios d'application Kafka
Pour l'introduction ci-dessus, reportez-vous au document officiel de Kafka.
Préparation au développement
Si nous devions développer un programme Kafka, que devons-nous faire?
Tout d'abord, après avoir construit un environnement Kafka, nous devons nous demander si nous sommes producteur ou consommateur, c'est-à-dire l'expéditeur ou le destinataire du message.
Cependant, dans cet article, les producteurs et les consommateurs se développeront et expliqueront.
Après une compréhension approximative de Kafka, nous développerons le premier programme.
Le langage de développement utilisé ici est Java, l'outil de construction Maven.
Les dépendances de Maven sont les suivantes:
<dependency> <proupId> org.apache.kafka </prôdId> <Ertifactid> kafka_2.12 </ artifactid> <version> 1.0.0 </-version> <ccope> fourni </cope> </Dependency> <Dependency> <proupId> org.apache.kafka </proupId> <ArtifActid> kafka-clienties </ artifActid> <version> 1.0.0 </ version> </ Dependency> <Dedency> <ProupId> org.apache.kafka </prôdId> <ArtefactId> Kafka-Streams </ Artifactive> <Dersion> 1.0.0 </preinte> </ Depentency>
Producteur de kafka
Pendant le développement et la production, introduisons brièvement les différentes instructions de configuration de Kafka:
...
Il y a plus de configurations, vous pouvez consulter la documentation officielle, qui ne sera pas expliquée ici.
Ensuite, notre configuration de producteur Kafka est la suivante:
Properties props = news properties (); Prophes.put ("bootstrap.servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); propuls.put ("acks", "all"); Prophes.put ("Retries", 0); Prophes.put ("Batch.Size", 16384); props.put ("key.serializer", stringserializer.class.getName ()); props.put ("value.serializer", stringserializer.class.getName ()); Kafkaproducer <string, string> producer = new kafkaproducer <string, string> (props);Après avoir ajouté la configuration de Kafka, nous commençons à produire des données. Le code de données de production ne doit être que les suivants:
producer.send (new productorRecord <string, string> (thème, key, valeur));
Après avoir écrit le programme des producteurs, commençons à produire en premier!
Le message que j'ai envoyé ici est:
String Messagestr = "Hello, c'est les données" + MessageNo + ";
Et seulement 1 000 messages sont envoyés et les résultats sont les suivants:
Vous pouvez voir que les informations ont été imprimées avec succès.
Si vous ne souhaitez pas utiliser le programme pour vérifier si le programme est envoyé avec succès et l'exactitude de l'envoi du message, vous pouvez utiliser la commande pour le visualiser sur le serveur Kafka.
Consommateur kafka
La consommation de Kafka devrait être le point clé, après tout, la plupart du temps, nous utilisons principalement la consommation de données.
La configuration de la consommation de Kafka est la suivante:
Ensuite, notre configuration de consommation Kafka est la suivante:
Properties props = news properties (); Prophes.put ("bootstrap.servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); Prophes.put ("Group.id", GroupID); props.put ("activer.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("Session.Timeout.ms", "30000"); propuls.put ("max.poll.records", 1000); props.put ("auto.offset.reset", "début"); props.put ("key.deserializer", stringdeserializer.class.getName ()); props.put ("value.deserializer", stringdeserializer.class.getName ()); Kafkaconsumer <string, string> Consumer = new Kafkaconsumer <String, String> (accessoires); Puisque je configure la soumission automatique, le code de consommation est le suivant:
Nous devons d'abord souscrire à un sujet, c'est-à-dire pour spécifier le sujet à consommer.
Consumer.Subscribe (arrays.aslist (sujet));
Après son abonnement, nous tirons des données de Kafka:
ConsumerCords <String, String> msglist = Consumer.Poll (1000);
D'une manière générale, la surveillance est utilisée lorsque la consommation est effectuée. Ici, nous utilisons pour (;;) pour surveiller et configurer la consommation de 1 000 articles et sortir!
Les résultats sont les suivants:
On peut voir que nous avons réussi à consommer des données de production ici.
Code
Ensuite, les codes pour les producteurs et les consommateurs sont les suivants:
Producteur:
Importer java.util.properties; import org.apache.kafka.clients.producer.kafkaproduner; import org.apache.kafka.clients.producer.producerrecord; import org.apache.kafka.common.serialization.stringSerializer; / ** * * Title: KafKaproDerizon * Description: * KafKaUr: * KafKink Demo * Version: 1.0.0 * @author pancm * @Date 26 janvier 2018 * / classe publique KafkaproducerTest implémente Runnable {private final kafkaproducer <String, String> producteur; Sujet de chaîne finale privée; public kafkaproducerTest (String topicName) {Properties props = new Properties (); Prophes.put ("bootstrap.servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); propuls.put ("acks", "all"); Prophes.put ("Retries", 0); Prophes.put ("Batch.Size", 16384); props.put ("key.serializer", stringserializer.class.getName ()); props.put ("value.serializer", stringserializer.class.getName ()); this.produner = new Kafkaproducer <String, String> (accessoires); this.topic = topicname; } @Override public void run () {int messageno = 1; essayez {pour (;;) {String Messagestr = "Hello, c'est la barre" + Messageno + "des données"; producer.send (new productorRecord <string, string> (thème, "message", messagestr)); // si 100 éléments sont produits, if (MessageNo% 100 == 0) {System.out.println ("Message envoyé:" + Messagestr); } // si 1000 éléments sont produits, if (MessageNo% 1000 == 0) {System.out.println ("Sendmy Send" + MessageNo + "Bar"); casser; } MessageNo ++; }} catch (exception e) {e.printStackTrace (); } enfin {producer.close (); }} public static void main (String args []) {kafkaproducerTest test = new kafkaproducerTest ("kafka_test"); Thread thread = nouveau thread (test); thread.start (); }}consommateur:
Importer java.util.arrays; import java.util.properties; import org.apache.kafka.clients.consumer.consumerCord; import org.apache.kafka.clients.consumer.consumerCord; org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.common.serialization.stringdeserializer; / ** * * Titre: KafkaconsumerTest * Description: * Kafka Consumer Demo * Version: 1.0.0 * @Author Pancm * @date janvier 26, 2018 * / public classe Kafkac Runnable {private final kafkaconsumer <string, string> consommateur; Private ConsumerCords <String, String> msglist; Sujet de chaîne finale privée; chaîne finale statique privée groupId = "groupa"; public kafkaconSumerTest (String topicName) {Properties props = new Properties (); Prophes.put ("bootstrap.servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); Prophes.put ("Group.id", GroupID); props.put ("activer.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("Session.Timeout.ms", "30000"); props.put ("auto.offset.reset", "début"); props.put ("key.deserializer", stringdeserializer.class.getName ()); props.put ("value.deserializer", stringdeserializer.class.getName ()); this.consumer = new kafkaconsumer <string, string> (accessoires); this.topic = topicname; this.consumer.subscribe (arrays.aslist (thème)); } @Override public void run () {int messageno = 1; System.out.println ("----------------------------------------"); try {for (;;) {msglist = consacter.poll (1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList) { //Print 100 items when consumed, but the printed data is not necessarily the rule if(messageNo%100==0){ System.out.println(messageNo+"========receive: key = " + record.key() + ", value = " + Record.Value () + "Offset ===" + enregistre.offset ()); } // Une fois que 1000 éléments sont consommés, sortez if (Messageno% 1000 == 0) {Break; } MessageNo ++; }} else {Thread.Sleep (1000); }}} catch (InterruptedException e) {e.printStackTrace (); } enfin {consumer.close (); }} public static void main (String args []) {kafkaconsumerTest test1 = new kafkaconsumerTest ("kafka_test"); Thread thread1 = nouveau thread (test1); thread1.start (); }}Remarque: Master, Slave1, Slave2, c'est parce que j'ai fait une cartographie relationnelle dans mon propre environnement, qui peut être remplacé par l'IP du serveur.
Bien sûr, j'ai mis le projet sur GitHub, et si vous êtes intéressé, vous pouvez jeter un œil. https://github.com/xuwujing/kafka (téléchargement local)
Résumer
Le développement simple d'un programme Kafka nécessite les étapes suivantes:
Kafka Introduction Reportez-vous au document officiel: http://kafka.apache.org/intro
Résumer
Ce qui précède est l'intégralité du contenu de cet article. J'espère que le contenu de cet article a une certaine valeur de référence pour l'étude ou le travail de chacun. Si vous avez des questions, vous pouvez laisser un message pour communiquer. Merci pour votre soutien à wulin.com.