Предисловие
В предыдущей статье мы говорим о том, как построить кластер Kafka, и в этой статье рассказывается о том, как просто использовать Kafka. Однако при использовании Kafka вы все равно должны кратко понять Кафку.
Введение в Кафку
Kafka-это высокопроизводительная система распределенной подписки, которая обрабатывает все данные о потоке действий на веб-сайте потребительского масштаба.
Кафка имеет следующие характеристики:
Кафка термины
KAFKA CORE API
Кафка имеет четыре основных API
Пример диаграмма заключается в следующем:
Кафка сценарии приложения
Для приведенного выше введения см. В официальном документе Кафка.
Подготовка к развитию
Если бы мы разработали программу Кафки, что мы должны делать?
Прежде всего, после создания среды Kafka, мы должны рассмотреть, являемся ли мы производителем или потребителем, то есть отправителем или получателем сообщения.
Однако в этой статье как производители, так и потребители будут разрабатывать и объяснять.
После грубого понимания Кафки мы разработаем первую программу.
Используемый здесь язык развития - Java, инструмент строительства Maven.
Зависимости Мэвена следующие:
<dependency> <groupid> org.apache.kafka </groupid> <artifactid> kafka_2.12 </artifactid> <sersive> 1.0.0 </version> <cracpe> <sersion> 1.0.0 </version> </dependency> <DeyEdency> <groupId> org.apache.kafka </GroupId> <artifactid> kafka-streams </artifactid> <sersion> 1.0.0 </version> </vehys>
Кафка -продюсер
Во время разработки и производства давайте кратко представим различные инструкции по конфигурации Кафки:
...
Есть больше конфигураций, вы можете проверить официальную документацию, которая не будет объяснена здесь.
Тогда наша конфигурация производителя Kafka заключается в следующем:
Свойства реквизит = новые свойства (); props.put ("bootstrap.servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("Acks", "All"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("key.serializer", Stringserializer.class.getName ()); props.put ("value.serializer", Stringserializer.class.getName ()); Kafkaproducer <string, string> producter = new kafkaproducer <string, string> (ops);После добавления конфигурации KAFKA мы начинаем создавать данные. Код производственных данных должен быть только следующим образом:
Производитель.send (New ProducterRecord <String, String> (тема, ключ, значение));
После написания программы продюсера давайте начнем производить первым!
Сообщение, которое я отправил здесь:
String messageStr = "Привет, это данные"+Messageno+";
И отправлено только 1000 сообщений, и результаты следующие:
Вы можете видеть, что информация была успешно напечатана.
Если вы не хотите использовать программу, чтобы проверить, отправляется ли программа успешно и точность отправки сообщения, вы можете использовать команду для просмотра ее на сервере KAFKA.
Кафка потребитель
Потребление кафки должно быть ключевым моментом, в конце концов, большую часть времени мы в основном используем потребление данных.
Конфигурация потребления кафки заключается в следующем:
Тогда наша конфигурация потребителя KAFKA выглядит следующим образом:
Свойства реквизит = новые свойства (); props.put ("bootstrap.servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("Group.id", GroupId); props.put ("enable.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("max.poll.records", 1000); props.put ("auto.offset.reset", «Самый ранний»); props.put ("key.deserializer", stringdeserializer.class.getName ()); props.put ("value.deserializer", stringDeserializer.class.getName ()); Kafkaconsumer <string, string> consumer = new kafkaconsumer <string, string> (props); Поскольку я настраиваю автоматическое представление, код потребления выглядит следующим образом:
Нам нужно сначала подписаться на тему, то есть указать, какую тему потреблять.
Consumer.SubScribe (arrays.aslist (тема));
После подписки мы получаем данные из Кафки:
ConsumerErcords <String, String> msglist = consumer.poll (1000);
Вообще говоря, мониторинг используется при выполнении потребления. Здесь мы используем для (;;) для мониторинга и настроить потребление 1000 предметов и выходить!
Результаты следующие:
Можно видеть, что мы успешно потребляли данные о производстве здесь.
Код
Тогда коды для производителей и потребителей заключаются в следующем:
Продюсер:
Импорт java.util.properties; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerRecord; импорт org.apache.kafka.common.serialization.stringserialize;/** * Демо -продюсер* Версия: 1.0.0* @author pancm* @date 26 января 2018 г.*/открытый класс kafkaproducertest реализует runnable {private final kafkaproducer <String, String> Produceer; частная финальная строка тема; public kafkaproducertest (String topicname) {Свойства props = new Properties (); props.put ("bootstrap.servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("Acks", "All"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("key.serializer", Stringserializer.class.getName ()); props.put ("value.serializer", Stringserializer.class.getName ()); this.producer = new kafkaproducer <string, string> (ops); this.topic = topicname; } @Override public void run () {int messageno = 1; try {for (;;) {string messageStr = "Привет, это панель"+messageno+"данных"; Producter.send (New ProducterRecord <String, String> (тема, "Message", MessageStr)); // Если производится 100 элементов, if (messageno%100 == 0) {System.out.println ("отправлено сообщение:" + messageStr); } // Если произведены 1000 элементов, if (messageno%1000 == 0) {System.out.println ("Успешно отправлен"+messageno+"bar"); перерыв; } messageno ++; }} catch (Exception e) {e.printstackTrace (); } наконец {ufference.close (); }} public static void main (string args []) {kafkaproducertest test = new kafkaproducertest ("kafka_test"); Потоковая потока = новый поток (тест); Thread.Start (); }}потребитель:
импортировать java.util.arrays; импортировать java.util.properties; импорт org.apache.kafka.clients.consumer.consumerRecord; импорт org.apache.kafka.clients.consumer.consumerRecord; импорт org.apache.kafka.consumer.consumercord; org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.common.serialization.stringdeserializer;/**** Название: kafkaconsumertest* Описание:* kafka consumer demo* ression: 1.0.0* @author pancmer* @date 26, 2018, 2018, 2018, 2018, 2018. реализует runnable {private final kafkaconsumer <string, string> consumer; Private ConsumerRecords <String, String> MSGlist; частная финальная строка тема; частная статическая конечная строка GroupId = "Groupa"; public kafkaconsumertest (String topicname) {Свойства props = new Properties (); props.put ("bootstrap.servers", "Master: 9092, Slave1: 9092, Slave2: 9092"); props.put ("Group.id", GroupId); props.put ("enable.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("auto.offset.reset", «Самый ранний»); props.put ("key.deserializer", stringdeserializer.class.getName ()); props.put ("value.deserializer", stringDeserializer.class.getName ()); this.consumer = new kafkaconsumer <string, string> (ops); this.topic = topicname; this.consumer.subscribe (arrays.aslist (тема)); } @Override public void run () {int messageno = 1; System.out.println ("-----------------------------------------"); try {for (;;) {msglist = consumer.poll (1000); if (null! = msglist && msglist.count ()> 0) {for (conmerceercord <String, String> recoverd: msglist) {// print 100 пунктов при использовании, но печатные данные не обязательно являются правилом, если (Messageno%100 == 0) {System.out.println (Messageno + "======= record.value ()+"offset ==="+record.offset ()); } // Однажды 1000 элементов потребляются, выйдите, if (messageno%1000 == 0) {break; } messageno ++; }} else {thread.sleep (1000); }}} catch (прерывание Exception e) {e.printstackTrace (); } наконец {consumer.close (); }} public static void main (string args []) {kafkaconsumertest test1 = new kafkaconsumertest ("kafka_test"); Thread Thread1 = новый поток (Test1); Thread1.start (); }}Примечание: Master, Slav1, Slave2 - это потому, что я сделал сопоставление отношений в своей собственной среде, которая может быть заменена на IP -адрес сервера.
Конечно, я поместил проект на GitHub, и если вы заинтересованы, вы можете посмотреть. https://github.com/xuwujing/kafka (локальная загрузка)
Суммировать
Простая разработка программы кафки требует следующих шагов:
Введение Кафки См. Официальный документ: http://kafka.apache.org/intro
Суммировать
Вышеуказанное - все содержание этой статьи. Я надеюсь, что содержание этой статьи имеет определенную справочную ценность для каждого обучения или работы. Если у вас есть какие -либо вопросы, вы можете оставить сообщение для общения. Спасибо за поддержку Wulin.com.