Prefacio
En el artículo anterior, hablamos sobre cómo construir un clúster Kafka, y este artículo habla sobre cómo usar simplemente Kafka. Sin embargo, cuando usa Kafka, aún debe comprender brevemente a Kafka.
Introducción a Kafka
Kafka es un sistema de mensajería de suscripción de publicación de alto rendimiento que maneja todos los datos de flujo de acción en un sitio web a escala del consumidor.
Kafka tiene las siguientes características:
Términos de Kafka
API Core Kafka
Kafka tiene cuatro API de núcleo
El diagrama de ejemplo es el siguiente:
escenarios de aplicación Kafka
Para la introducción anterior, consulte el documento oficial de Kafka.
Preparación para el desarrollo
Si tuviéramos que desarrollar un programa Kafka, ¿qué debemos hacer?
En primer lugar, después de construir un entorno de Kafka, debemos considerar si somos un productor o un consumidor, es decir, el remitente o el destinatario del mensaje.
Sin embargo, en este artículo, tanto los productores como los consumidores desarrollarán y explicarán.
Después de una comprensión aproximada de Kafka, desarrollaremos el primer programa.
El lenguaje de desarrollo utilizado aquí es Java, la herramienta de construcción Maven.
Las dependencias de Maven son las siguientes:
<Spendency> <MoupRoD> org.apache.kafka </groupid> <artifactid> kafka_2.12 </artifactid> <verserse> 1.0.0 </versión> <cope> proporcionó </scope> </pendency> <pendency> <grupoD> org.apache.kafka </groupid> <artifactid> kafka-clients </artifactid> <versión> 1.0.0 </versión> </pendency> <pendency> <grupoD> org.apache.kafka </proupid> <artifactid> kafka-streams </artifactid> <versión> 1.0.0 </versión> </pendency>
Productor de kafka
Durante el desarrollo y la producción, presentemos brevemente las diversas instrucciones de configuración de Kafka:
...
Hay más configuraciones, puede verificar la documentación oficial, que no se explicará aquí.
Entonces nuestra configuración del productor de Kafka es la siguiente:
Propiedades Props = New Properties (); Props.put ("Bootstrap.Servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); propssput ("acks", "todos"); propssput ("reintentos", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringserializer.class.getName ()); props.put ("value.serializer", stringserializer.class.getName ()); KAFKAPRODUCER <String, String> Producer = new Kafkaproducer <String, String> (Props);Después de agregar la configuración de Kafka, comenzamos a producir datos. El código de datos de producción solo debe ser el siguiente:
Producer.send (nuevo ProducerRecord <String, String> (Topic, Key, Value));
Después de escribir el programa de productores, ¡comencemos a producir primero!
El mensaje que envié aquí es:
String Messagestr = "Hola, este es el"+Messageno+"Data";
Y solo se envían 1,000 mensajes y los resultados son los siguientes:
Puede ver que la información se ha impreso con éxito.
Si no desea usar el programa para verificar si el programa se envía correctamente y la precisión del envío del mensaje, puede usar el comando para verlo en el servidor Kafka.
Consumidor de kafka
El consumo de Kafka debe ser el punto clave, después de todo, la mayoría de las veces, utilizamos principalmente el consumo de datos.
La configuración del consumo de Kafka es la siguiente:
Entonces nuestra configuración de consumo de Kafka es la siguiente:
Propiedades Props = New Properties (); Props.put ("Bootstrap.Servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("group.id", groupId); propssput ("enable.auto.commit", "verdadero"); props.put ("auto.commit.interval.ms", "1000"); propssput ("session.timeout.ms", "30000"); props.put ("max.poll.records", 1000); propssput ("autofoffset.reset", "más temprano"); props.put ("key.deserializer", stringDeserializer.class.getName ()); props.put ("value.deserializer", stringDeserializer.class.getName ()); Kafkaconsumer <string, string> consumer = new KafkAconsumer <String, String> (Props); Como estoy configurando el envío automático, el código de consumo es el siguiente:
Primero debemos suscribirnos a un tema, es decir, especificar qué tema consumir.
Consumer.subscribe (arrays.aslist (tema));
Después de suscribirse, extraemos datos de Kafka:
ConsumerRecords <String, String> msglist = Consumer.Poll (1000);
En términos generales, el monitoreo se usa cuando se lleva a cabo el consumo. ¡Aquí usamos para (;;) para monitorear y establecer el consumo de 1,000 artículos y salir!
Los resultados son los siguientes:
Se puede ver que hemos consumido con éxito datos de producción aquí.
Código
Entonces los códigos para productores y consumidores son los siguientes:
Productor:
import java.util.properties; import org.apache.kafka.clients.producer.kafkaproducer; importar org.apache.kafka.clients.producer.producerRecord; import org.apache.kafka.common.serialization.stringserializer;/** * * * title: kafkapRoducerTaDUTUX: Demo* Versión: 1.0.0* @author pancm* @Date 26 de enero de 2018*/public class kafkaproDucTest implements runnable {private final kafkaproducer <string, string> productor; Tema privado de cadena final; public kafkaproDucTest (string topicName) {Properties props = new Properties (); Props.put ("Bootstrap.Servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); propssput ("acks", "todos"); propssput ("reintentos", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringserializer.class.getName ()); props.put ("value.serializer", stringserializer.class.getName ()); this.producer = new KafkaproDucer <String, String> (Props); this.topic = topicName; } @Override public void run () {int messageno = 1; Pruebe {for (;;) {String MessageStr = "Hola, esta es la"+Messageno+"Bar of Data"; Producer.send (nuevo ProducerRecord <String, String> (tema, "Mensaje", Messagestr)); // Si se producen 100 elementos, if (MessageNo%100 == 0) {System.out.println ("enviado mensaje:" + Messagestr); } // Si se producen 1000 elementos, if (MessageNo%1000 == 0) {System.out.println ("enviado correctamente"+MessageNo+"Bar"); romper; } Messageno ++; }} catch (Exception e) {E.PrintStackTrace (); } finalmente {producer.close (); }} public static void main (string args []) {kafkaproDucertest test = new KafkaproDucTest ("kafka_test"); Hilo de hilo = nuevo hilo (prueba); Thread.Start (); }}consumidor:
import java.util.arrays; import java.util.properties; import org.apache.kafka.clients.consumer.consumerRecord; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrords; import; import; import; importar; importar; import org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.common.serialization.stringdeserializer;/**** Título: kafkaconsumertest* Descripción:* Kafka Consumer Demo* Versión: 1.0.0* @Author Pancm* @date enero, 26 de 2018*/público Kafka. Runnable {private final kafkaconsumer <string, string> consumidor; Private ConsumerRecords <String, String> msglist; Tema privado de cadena final; Private static final String groupId = "groupa"; public kafkaconsumertest (string topicName) {Properties props = new Properties (); Props.put ("Bootstrap.Servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("group.id", groupId); propssput ("enable.auto.commit", "verdadero"); props.put ("auto.commit.interval.ms", "1000"); propssput ("session.timeout.ms", "30000"); propssput ("autofoffset.reset", "más temprano"); props.put ("key.deserializer", stringDeserializer.class.getName ()); props.put ("value.deserializer", stringDeserializer.class.getName ()); this.consumer = new KafkAconsumer <String, String> (Props); this.topic = topicName; this.consumer.subscribe (arrays.aslist (tema)); } @Override public void run () {int messageno = 1; System.out.println ("----------------------------------------"); intente {for (;;) {msglist = consumer.poll (1000); if (null! = msglist && msglist.count ()> 0) {for (consumerRecord <string, string> registro: msglist) {// imprime 100 elementos cuando se consume, pero los datos impresos no son necesariamente la regla if (messageNo%100 == 0) {System.out.Println (Messageno + "========= recibe: KEY =" KEY ") registro.value ()+"offset ==="+registro.offset ()); } // Una vez que se consumen 1000 elementos, salga if (MessageNo%1000 == 0) {break; } Messageno ++; }} else {thread.sleep (1000); }}} Catch (InterruptedException e) {E.PrintStackTrace (); } Finalmente {Consumer.close (); }} public static void main (string args []) {kafkaconsumertest test1 = new KafkAconsumertest ("kafka_test"); Thread Thread1 = nuevo hilo (test1); Thread1.Start (); }}Nota: Master, Slave1, Slave2 se debe a que he hecho el mapeo de relaciones en mi propio entorno, que puede reemplazarse con la IP del servidor.
Por supuesto, puse el proyecto en GitHub, y si está interesado, puede echar un vistazo. https://github.com/xuwujing/kafka (descarga local)
Resumir
El desarrollo simple de un programa Kafka requiere los siguientes pasos:
Introducción de Kafka Consulte el documento oficial: http://kafka.apache.org/intro
Resumir
Lo anterior es todo el contenido de este artículo. Espero que el contenido de este artículo tenga cierto valor de referencia para el estudio o el trabajo de todos. Si tiene alguna pregunta, puede dejar un mensaje para comunicarse. Gracias por su apoyo a Wulin.com.