Prefácio
No artigo anterior, falamos sobre como construir um cluster Kafka, e este artigo fala sobre como simplesmente usar o Kafka. No entanto, ao usar Kafka, você ainda deve entender brevemente Kafka.
Introdução a Kafka
A Kafka é um sistema de mensagens de assinatura de publicação distribuída de alto rendimento que lida com todos os dados de fluxo de ação em um site em escala de consumidores.
Kafka tem as seguintes características:
Termos Kafka
Kafka Core API
Kafka tem quatro APIs principais
O diagrama de exemplo é o seguinte:
Cenários de aplicativos Kafka
Para a introdução acima, consulte o documento oficial Kafka.
Preparação de desenvolvimento
Se fôssemos desenvolver um programa Kafka, o que devemos fazer?
Primeiro de tudo, depois de construir um ambiente Kafka, precisamos considerar se somos um produtor ou um consumidor, ou seja, o remetente ou destinatário da mensagem.
No entanto, neste artigo, produtores e consumidores se desenvolverão e explicarão.
Após um entendimento difícil de Kafka, desenvolveremos o primeiro programa.
A linguagem de desenvolvimento usada aqui é Java, o Maven da ferramenta de construção.
As dependências do Maven são as seguintes:
<Depencency> <voundiD> org.apache.kafka </frupiD> <TROTIFACTID> kafka_2.12 </artifactId> <versão> 1.0.0 </sipers> <cope> fornecido </scope> </dependency> <pendency> <puperid> org.apache.kafka </grupo <artiftiD> <Versão> 1.0.0 </sisters> </dependency> <pendency> <voundid> org.apache.kafka </groupiD> <TRAFACTID> KAFKA-STREAMS </STIFACTID> <Versão> 1.0.0 </sipers> </pendence>
Produtor Kafka
Durante o desenvolvimento e a produção, vamos apresentar brevemente as várias instruções de configuração de Kafka:
...
Existem mais configurações, você pode verificar a documentação oficial, que não será explicada aqui.
Então nossa configuração de produtor Kafka é a seguinte:
Propriedades Props = new Properties (); ProPs.put ("bootstrap.servers", "mestre: 9092, escravo1: 9092, escravo2: 9092"); props.put ("acks", "all"); props.put ("tentativas", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringSerializer.class.getName ()); props.put ("value.Serializer", stringSerializer.class.getName ()); Kafkaproducedor <string, string> produtor = new kafkaproducedor <string, string> (props);Depois de adicionar a configuração Kafka, começamos a produzir dados. O código de dados de produção só precisa ser o seguinte:
Produner.Send (novo ProductErCord <String, String> (tópico, chave, valor));
Depois de escrever o programa de produtores, vamos começar a produzir primeiro!
A mensagem que enviei aqui é:
String messagest = "Olá, este é o" Messageno+"dados";
E apenas 1.000 mensagens são enviadas e os resultados são os seguintes:
Você pode ver que as informações foram impressas com sucesso.
Se você não deseja usar o programa para verificar se o programa foi enviado com sucesso e a precisão da mensagem enviando, você pode usar o comando para visualizá -lo no servidor Kafka.
Consumidor de kafka
O consumo de kafka deve ser o ponto -chave, afinal, na maioria das vezes, usamos principalmente o consumo de dados.
A configuração do consumo de Kafka é a seguinte:
Então nossa configuração de consumidor Kafka é a seguinte:
Propriedades Props = new Properties (); ProPs.put ("bootstrap.servers", "mestre: 9092, escravo1: 9092, escravo2: 9092"); props.put ("group.id", grupo); props.put ("enable.auto.comit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("max.poll.records", 1000); props.put ("auto.offset.reset", "mais antigo"); props.put ("key.deserializer", stringDeserializer.class.getName ()); props.put ("value.deserializer", stringDeserializer.class.getName ()); Kafkaconsumer <string, string> consumer = new kafkaconsumer <string, string> (adereços); Como estou configurando o envio automático, o código de consumo é o seguinte:
Precisamos assinar primeiro um tópico, ou seja, para especificar qual tópico consumir.
consumer.subscribe (Arrays.asList (tópico));
Após a assinatura, extraímos dados de Kafka:
ConsumerRecords <string, string> msglist = consumer.poll (1000);
De um modo geral, o monitoramento é usado quando o consumo é realizado. Aqui usamos (;;) para monitorar e configurar o consumo de 1.000 itens e sair!
Os resultados são os seguintes:
Pode -se observar que consumimos com sucesso dados de produção aqui.
Código
Em seguida, os códigos para produtores e consumidores são os seguintes:
Produtor:
importar java.util.properties; importar org.apache.kafka.clients.producer.kafkaproduces; importar org.apache.kafka.clients.producer.producerCordord; import orgache.kafka.common.erialization.stringSerializer; Demo* Versão: 1.0.0* @Author Pancm* @Date 26 de janeiro de 2018*/Public Classe Kafkaproducert mais implementa Runnable {Private final Kafkaproducer <String, String> Produtor; Tópico de string final privado; public kafkaproducertest (string tópiconame) {Propriedades props = new Properties (); ProPs.put ("bootstrap.servers", "mestre: 9092, escravo1: 9092, escravo2: 9092"); props.put ("acks", "all"); props.put ("tentativas", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringSerializer.class.getName ()); props.put ("value.Serializer", stringSerializer.class.getName ()); this.Producer = new Kafkaproduces <String, String> (Props); this.topic = tópicoName; } @Override public void run () {int messageno = 1; tente {for (;;) {string messageSt = "hello, este é o" bar de dados "+messageno+" dados "; Producer.send (novo ProductErCord <String, String> (Tópico, "Mensagem", Messagest)); // Se 100 itens forem produzidos, se (messageno%100 == 0) {System.out.println ("Mensagem enviou:" + messagest); } // Se 1000 itens forem produzidos, if (messageno%1000 == 0) {System.out.println ("enviado com sucesso"+messageno+"bar"); quebrar; } Messageno ++; }} catch (Exceção e) {e.printStackTrace (); } finalmente {produtor.close (); }} public static void main (string args []) {kafkaproducertest test = new kafkaproducertest ("kafka_test"); Thread Thread = novo thread (teste); thread.start (); }}consumidor:
importar java.util.arrays; importar java.util.properties; importar org.apache.kafka.clients.consumer.consumerrecord; importar org.apache.kafka.clients.consumer.consumerRrecord; import.apache.kafka.clients.consumer.consumer.Consumer. org.apache.kafka.clients.consumer.kafkaconsumer; importar org.apache.kafka.common.serialization.stringDeserializer;/**** título: kafkaconsumerTest* Descrição:* kafka consumer Demo* versão: 1.0.0* @Athor Pancor Pancm Runnable {private final kafkaconsumer <string, string> consumer; Private ConsumerRecords <String, String> MSGLIST; Tópico de string final privado; private estático final string groupid = "groupA"; public kafkaconsumertest (string tópiconame) {Propriedades props = new Properties (); ProPs.put ("bootstrap.servers", "mestre: 9092, escravo1: 9092, escravo2: 9092"); props.put ("group.id", grupo); props.put ("enable.auto.comit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("auto.offset.reset", "mais antigo"); props.put ("key.deserializer", stringDeserializer.class.getName ()); props.put ("value.deserializer", stringDeserializer.class.getName ()); this.Consumer = new Kafkaconsumer <String, String> (Props); this.topic = tópicoName; this.consumer.subscribe (Arrays.asList (tópico)); } @Override public void run () {int messageno = 1; System.out.println ("--------------------------------------"); tente {for (;;) {msglist = consumer.poll (1000); if (null! = msglist && msglist.count ()> 0) {for (consumerCord <string, string> registro: msglist) {// imprima 100 itens quando consumidos, mas os dados impressos não são necessariamente a regra (messageno%100 == 0) {System.out.println (Messageno + " registro.value ()+"offset ==="+registro.offset ()); } // Quando 1000 itens são consumidos, saia se (messageno%1000 == 0) {break; } Messageno ++; }} else {thread.sleep (1000); }}} catch (interruptEdException e) {e.printStackTrace (); } finalmente {consumer.close (); }} public static void main (string args []) {kafkaconsumertest test1 = new kafkaconsumertest ("kafka_test"); Thread Thread1 = novo thread (test1); Thread1.start (); }}Nota: Master, Slave1, Slave2 é porque eu fiz mapeamento de relacionamento em meu próprio ambiente, que pode ser substituído pelo IP do servidor.
Obviamente, coloquei o projeto no Github e, se você estiver interessado, pode dar uma olhada. https://github.com/xuwujing/kafka (download local)
Resumir
O desenvolvimento simples de um programa Kafka requer as seguintes etapas:
Kafka Introdução Consulte o documento oficial: http://kafka.apache.org/intro
Resumir
O acima é o conteúdo inteiro deste artigo. Espero que o conteúdo deste artigo tenha certo valor de referência para o estudo ou trabalho de todos. Se você tiver alguma dúvida, pode deixar uma mensagem para se comunicar. Obrigado pelo seu apoio ao wulin.com.