В этой статье представлено, как интегрировать отправку и получение сообщений Kafka в проекте Springboot.
KAFKA-это высокопроизводительная система распределенных сообщений о публикации-подписке со следующими характеристиками: обеспечить постоянство сообщения через структуру данных O (1), которая может долго поддерживать стабильную производительность, даже если хранилище сообщений является терабайтами. Высокая пропускная способность: даже очень обычное оборудование Kafka может поддерживать миллионы сообщений в секунду. Поддерживает разделение сообщений через серверы Kafka и потребительские кластеры. Поддержка Hadoop Параллельная загрузка данных.
Установите Kafka
Поскольку установка Kafka требует поддержки Zookeeper, при установке Windows вам нужно сначала установить Zookeeper, а затем установить Kafka. Ниже я дам вам шаги для установки Mac и точек, на которые обратите внимание. Конфигурация Windows почти ничем не отличается, за исключением различных мест.
Вареть установить кафку
Да, это так просто. Вы можете справиться с этим с помощью команды на Mac. Этот процесс установки может потребоваться некоторое время, и он должен быть связан со статусом сети. В сообщении при приглашении на установку может быть сообщение об ошибке, например «Ошибка: не может связать:/usr/local/share/doc/homebrew». Это не имеет значения, это будет автоматически игнорироваться. Наконец, мы преуспели, когда увидели, что было ниже.
==> Сводка ð º/usr/local/cellar/kafka/1.1.0: 157 файлы, 47,8 МБ
Расположение файла конфигурации установки следующее, просто измените номер порта в соответствии с вашими потребностями.
Установлено Zoopeeper и Kafka location/usr/local/cellar/
Файл конфигурации/USR/Local/etc/kafka/server.properties/usr/local/etc/kafka/zookeeper.properties
Начните Zookeeper
Скопируйте код следующим образом: ./ bin/zookeeper-server-start/usr/local/etc/kafka/zookeeper.properties &
Начни Кафку
./bin/kafka-server-start/usr/local/etc/kafka/server.properties &
Создайте тему для Кафки. Тема называется тестом. Вы можете настроить его на имя, которое вы хотите. Вернитесь и настройте его в коде правильно.
Скопируйте код следующим образом: ./ bin/kafka-topics-create --zookeeper localhost: 2181-Фактор-репликация 1-Партия 1-Тест-Topic
1. Решите зависимости сначала
Мы не будем упомянуть зависимости, связанные с Springboot. Зависимости, связанные с Кафкой, полагаются только на пакет интеграции Spring-Kafka.
<depervice> <groupid> org.springframework.kafka </groupid> <artifactid> spring-kafka </artifactid> <sersive> 1.1.1.release </version> </sepect>
Здесь мы сначала покажем файл конфигурации
#================ Кафка ================================================ kafka.consumer.servers = 10.93.21.21: 2181kafka.consumer.enable.auto.commit = truekafka.consumer.session.timeout = 6000kafka.consumer.auto.commit.interval = 100kafka.consumer.auto.offset .reset = aldykafka.consumer.topic = testkafka.consumer.group.id = testkafka.consumer.concurrency = 10kafka.producer.servers = 10.9 3.21.21: 9092Kafka.Producer.retries = 0Kafka.Producer.Batch.Size = 4096Kafka.Producer.linger = 1Kafka.Producer.buffer.Memory = 40960
2. Конфигурация: производитель кафки
1) Объявите конфигурацию и откройте возможность kafkatemplate через @configuration и @enablekafka.
2) Введите конфигурацию KAFKA в файл конфигурации Application.properties через @Value.
3) генерировать бон, @bean
пакет com.kangaroo.sentinel.collect.configuration; import java.util.hashmap; import java.util.map; import org.apache.kafka.clients.producer.producerconfig; импорт org.apache.kafka.common.serialization.Stringseriatorize org.springframework.beans.factory.annotation.value; импорт org.springframework.context.annotation.bean; импорт org.springframework.context.annotation.configuration; импорт org.springframework.kafka.annotation.enablekafka; импорт org.springframework.kafka.annotation.enablekafka; org.springframework.kafka.core.defaultkafkaproducerfactory; import org.springframework.kafka.core.kafkatemplate; импорт org.springframework.kafka.core.producerfactor @Value ("$ {kafka.producer.servers}") частные строковые серверы; @Value ("$ {kafka.producer.retries}") private int retries; @Value ("$ {kafka.producer.batch.size}") private int pactorsize; @Value ("$ {kafka.producer.linger}") private int linger; @Value ("$ {kafka.producer.buffer.memory}") private int buffermemory; public map <string, object> preseerconfigs () {map <string, object> props = new hashmap <> (); props.put (producterconfig.bootstrap_servers_config, серверы); props.put (producterconfig.retries_config, retries); props.put (producterconfig.batch_size_config, patchsize); props.put (producterconfig.linger_ms_config, linger); props.put (producterconfig.buffer_memory_config, buffermemory); props.put (producterconfig.key_serializer_class_config, stringserializer.class); props.put (producterconfig.value_serializer_class_config, stringserializer.class); вернуть реквизит; } public productionFactory <string, string> preseermerFactory () {return new DefaultKafKaproDucerFactory <> (PresecterConfigs ()); } @Bean public kafkatemplate <string, string> kafkatemplate () {return new kafkatemplate <string, string> (provecterfactory ()); }}Экспериментируйте с нашим производителем и напишите контроллер. Хочу тему = тест, ключ = ключ, отправить сообщение
пакет com.kangaroo.sentinel.collect.controller; import com.kangaroo.sentinel.common.response.response; import com.kangaroo.sentinel.common.response.resultcode; импорт org.slf4j.logger; импорт org.slf4.loggerfactory; org.springframework.beans.factory.annotation.autowired; импорт org.springframework.kafka.core.kafkatemplate; импорт org.springframework.web.bind.annotation. javax.servlet.http.httpservletresponse;@restcontroller@requestmapping ("/kafka") public class collectontroller {защищенный окончательный логист logger = loggerfactory.getlogger (this.getClass ()); @Autowired частный kafkatemplate kafkatemplate; @Requestmapping (value = "/send", method = requestMethod.get) public response sendkafka (httpservlectrequest, httpservletresponse response) {try {string message = request.getParameter ("message"); logger.info ("kafka message = {}", message); kafkatemplate.send ("test", "key", сообщение); logger.info ("успешно отправить кафку."); вернуть новый ответ (resultcode.success, "Отправить кафку успешно", null); } catch (Exception e) {logger.error ("Отправить kafka не удалось", e); вернуть новый ответ (resultcode.exception, "отправить kafka не удалось", null); }}}3. Конфигурация: потребитель KAFKA
1) Объявите конфигурацию и откройте возможность kafkatemplate через @configuration и @enablekafka.
2) Введите конфигурацию KAFKA в файл конфигурации Application.properties через @Value.
3) генерировать бон, @bean
пакет com.kangaroo.sentinel.collect.configuration; import org.apache.kafka.colients.consumer.consumerconfig; импорт org.apache.kafka.common.serialization.stringdeserializ org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enablekafka; import org.springframework.kafka.config.concurrentkafkalistorner org.springframework.kafka.config.kafkalistenercontainerfactory; импорт org.springframework.kafka.core.consumerfactory; импорт org.springframework.kafka.core.defaultkafkaconcefactory; импорт org.springframework.kafka.listener.concurrentmessagelistenercontainer; import java.util.hashmap; import java.util.map;@configuration@enyablekafkublic class kafkaconsumerconfig {@value ("$ {kafka.consumer.servers {@value (" $ {kafka.consumer. @Value ("$ {kafka.consumer.enable.auto.commit}") private boolean enableautocommit; @Value ("$ {kafka.consumer.session.timeout}") private String SessionTimeout; @Value ("$ {kafka.consumer.auto.commit.interval}") частная строка AutoCommitInterval; @Value ("$ {kafka.consumer.group.id}") private String GroupId; @Value ("$ {kafka.consumer.auto.offset.reset}") частная строка AutooffsetReset; @Value ("$ {kafka.consumer.concurrency}") частная параллелизация; @Bean public kafkalistenercontainerfactory <concurrentmessagelistenercontainer <string, string >> kafkalistenercontainerfactory () {comprountkafkalistenercontainerfactory <string, string> facory = new ComparrentKafkalistenerContainfactory <> (); factory.setConsumerFactory (ConsumerFactory ()); Factory.SetConcurrency (параллелизм); factory.getContainerProperties (). SetPollTimeOut (1500); вернуть фабрику; } public consumerFactory <string, string> consumerFactory () {return new DefaultKafkAconsumerFactory <> (ConsumerConfigs ()); } public map <string, object> consumerconfigs () {map <string, object> propsmap = new hashmap <> (); propsmap.put (consumerconfig.bootstrap_servers_config, servers); propsmap.put (consumerconfig.enable_auto_commit_config, eNableAutoCommit); propsmap.put (consumerconfig.auto_commit_interval_ms_config, autocommitinterval); propsmap.put (consumerconfig.session_timeout_ms_config, sessiontimeout); propsmap.put (consumerconfig.key_deserializer_class_config, stringdeserializer.class); propsmap.put (consumerconfig.value_deserializer_class_config, stringdeserializer.class); propsmap.put (consumerconfig.group_id_config, groupid); propsmap.put (consumerconfig.auto_offset_reset_config, autooffsetreset); вернуть propsmap; } @Bean Public Slireer Slireer () {return New Slieder (); }}Новый слушатель () генерирует компонент для обработки данных с чтением от Kafka. Простая демонстрация прослушивателя выглядит следующим образом: просто прочтите и распечатайте и распечатайте ключ и значения сообщений
@Атрибут «Темы Кафкалистенер» используется для указания названия темы Кафки. Имя темы указывается производителем сообщений, то есть оно указывается Kafkatemplate при отправке сообщения.
пакет com.kangaroo.sentinel.collect.configuration; import org.apache.kafka.colients.consumer.consumerRecord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.spramework. kafka.annotation. kfaflistener; Loggerfactory.getlogger (this.getClass ()); @Kafkalistener (themics = {"test"}) public void слушание (ConsumerRecord <?,?> Record) {logger.info ("Kafka's Key:" + record.key ()); logger.info ("kafka's value:" + record.value (). toString ()); }}Советы:
1) Я не представил, как установить и настраивать Kafka. Лучше всего использовать IP полностью привязкой сети при настройке Kafka, а не Localhost или 127.0.0.1
2) Лучше всего не использовать собственный Zookeeper Kafka для развертывания Kafka, поскольку это может вызвать доступ к недоступности.
3) Теоретически, потребитель должен прочитать Кафку через Zookeeper, но здесь мы используем адрес Kafkaserver, почему мы не углубились в него?
4) При определении конфигурации сообщения мониторинга значение элемента конфигурации Group_id_config используется для указания имени группы потребителей. Если в одной группе есть несколько объектов слушателя, только один объект прослушивания может получить сообщение.
Выше всего содержание этой статьи. Я надеюсь, что это будет полезно для каждого обучения, и я надеюсь, что все будут поддерживать Wulin.com больше.