Este artigo apresenta como integrar as mensagens de envio e recebimento do Kafka no projeto Springboot.
O KAFKA é um sistema de mensagens Publish-Scribe distribuído de alto rendimento, com as seguintes características: Forneça persistência da mensagem através da estrutura de dados do disco O (1), que pode manter o desempenho estável por um longo tempo, mesmo que o armazenamento de mensagens seja terabytes. Alta taxa de transferência: Mesmo o hardware muito comum Kafka pode suportar milhões de mensagens por segundo. Suporta a partição de mensagens através de servidores Kafka e clusters de consumidores. Suporte o carregamento de dados paralelos do Hadoop.
Instale Kafka
Como a instalação do Kafka requer o suporte do Zookeeper, ao instalar o Windows, você precisa instalar o Zookeeper primeiro e depois instalar o Kafka. Abaixo, darei as etapas para a instalação do Mac e os pontos para prestar atenção. A configuração do Windows quase não é diferente, exceto para os diferentes locais.
Brew Install Kafka
Sim, é tão simples. Você pode lidar com um comando no Mac. Este processo de instalação pode exigir um tempo e deve estar relacionado ao status da rede. Pode haver uma mensagem de erro na mensagem de prompt de instalação, como "Erro: não foi possível vincular:/usr/local/share/doc/homebrew". Isso não importa, será automaticamente ignorado. Finalmente, conseguimos quando vimos o que estava abaixo.
==> Resumo ðÿ º/usr/local/adega/kafka/1.1.0: 157 arquivos, 47,8 MB
O local do arquivo de configuração de instalação é o seguinte, basta modificar o número da porta de acordo com suas necessidades.
Instalado zoopeeper e kafka location/usr/local/adega/
File de configuração /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
Comece o Zookeeper
Copie o código da seguinte forma: ./ bin/zookeeper-erver-start /usr/local/etc/kafka/zookeeper.properties &
Comece Kafka
./bin/kafka-sherver-start /usr/local/etc/kafka/server.properties &
Crie um tópico para Kafka. O tópico é nomeado teste. Você pode configurá -lo para o nome que deseja. Volte e configure -o no código corretamente.
Copie o código da seguinte
1. Resolva dependências primeiro
Não mencionaremos as dependências relacionadas ao Springboot. As dependências relacionadas ao Kafka dependem apenas de um pacote de integração Spring-Kafka.
<Depencency> <PuerpId> org.springframework.kafka </frugiD> <TROTIFACTID> spring-kafka </stifactId> <versão> 1.1.1.
Aqui vamos mostrar o arquivo de configuração primeiro
#================== KAFKA ========================================= kafka.consumer.servers = 10.93.21.21: 2181kafka.consumer.enable.auto.commit = truekafka.consumer.session.timeout = 6000kafka.consumer.auto.Commit.interval = 100kafka.consumer.aUso .reset = lastkafka.consumer.topic = testkafka.consumer.group.id = testkafka.consumer.concurrency = 10kafka.producer.servers = 10.9 3.21.21: 9092kafka.producer.retries = 0kafka.producer.batch.size = 4096kafka.producer.linger = 1kafka.producer.buffer.memory = 40960
2. Configuração: Produtor Kafka
1) Declare a capacidade de configuração e abra a capacidade Kafkatemplate através do @Configuration e @enablekafka.
2) Injete a configuração do kafka no arquivo de configuração do aplicativo.Properties através do @Value.
3) Gere Bean, @bean
pacote com.kangaroo.sentinel.collect.configuration; importar java.util.hashmap; importar java.util.map; importar org.apache.kafka.clients.producer.produccerconfig; import orgache.kafka.common.serialization.stringernizer; org.springframework.beans.factory.annotation.value; importar org.springframework.context.annotation.bean; importar org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.configuration; importância; org.springframework.kafka.core.defaultkafkaproducerFactory; importar org.springframework.kafka.core.kafkatemplate; importar org.springframework.kafka.core.proderFactory;@configuration@EnabilkAfkkkapublic. @Value ("$ {kafka.producer.servers}") Servidores de String Private; @Value ("$ {kafka.producer.retries}") private Int Botties; @Value ("$ {kafka.producer.batch.size}") private int batchSize; @Value ("$ {kafka.producer.linger}") private int linger; @Value ("$ {kafka.producer.buffer.memory}") private int buffermemory; mapa público <string, object> producmerConfigs () {map <string, object> props = new hashmap <> (); ProPs.put (produterConfig.bootstrap_servers_config, servidores); props.put (produterConfig.retries_config, tentativas); ProPs.put (produterConfig.batch_size_config, batchsize); ProPs.put (produterConfig.linger_ms_config, linger); ProPs.put (produterConfig.buffer_memory_config, buffermemory); props.put (produterConfig.key_serializer_class_config, stringseializer.class); props.put (producherConfig.value_serializer_class_config, stringseializer.class); retornar adereços; } public ProderFactory <String, String> ProderFactory () {Return New DefaultKafkaproducerFactory <> (ProducterConfigs ()); } @Bean public kafkatemplate <string, string> kafkatemplate () {return kafkatemplate <string, string> (ProderFactory ()); }}Experimente nosso produtor e escreva um controlador. Quer tópico = teste, chave = chave, envie mensagem
pacote com.kangaroo.sentinel.Collect.Controller; importar com.kangaroo.sentinel.common.Response.Response; importar com.kangaroo.sentinel.common.reposnse.resultcode; import org.slf4j.Logger; importação ou org.slf4j.slf.l.l.l; org.springframework.beans.factory.annotation.autowired; importar org.springframework.kafka.core.kafkatemplate; importar org.springframework.web.bind.annotation. javax.servlet.http.httpServletResponse;@RestController@requestmapping ("/kafka") classe pública colecionController {Protected Final Logger = LoggerFactory.getLogger (this.getclass ()); @Autowired Private Kafkatemplate kafkatemplate; @RequestMapping (Value = "/Send", Method = requestMethod.get) Resposta pública sendkafka (httpServletRequest Request, httpServletResponse resposta) {try {string message = request.getParameter ("mensagem"); logger.info ("kafka message = {}", mensagem); kafkatemplate.send ("teste", "chave", mensagem); Logger.info ("Send Kafka com sucesso."); retornar nova resposta (resultadocode.success, "Enviar Kafka com sucesso", nulo); } catch (Exceção e) {Logger.error ("Send Kafka falhou", e); retornar nova resposta (resultadocode.exception, "Send Kafka falhou", nulo); }}}3. Configuração: consumidor de kafka
1) Declare a capacidade de configuração e abra a capacidade Kafkatemplate através do @Configuration e @enablekafka.
2) Injete a configuração do kafka no arquivo de configuração do aplicativo.Properties através do @Value.
3) Gere Bean, @bean
pacote com.kangaroo.sentinel.collect.configuration; importar org.apache.kafka.clients.consumer.consumerConfig; import org.apache.kafka.common.serialization.stringderializer; org.springframework.context.annotation.bean; importar org.springframework.context.annotation.configuration; importar org.springframework.kafka.annotation.enablekafka; import org.springframenwork.kafkka.conofig.ConCrUntkafka; org.springframework.kafka.config.kafkalisterContainerFactory; importar org.springframework.kafka.core.consumerFactory; importar org.springframework.kafka.core.defaultkAfkaconsumerFactory; importação org.springframework.kafka.listener.concurrentMessaGelistEnerContainer; importar java.util.hashmap; importar java.util.map;@configuration@enablekafkapublic class KafkaconsumerConfig {@Value ("$ {Kafka @Value ("$ {kafka.consumer.enable.auto.commit}") Private Boolean EnableAutocomit; @Value ("$ {kafka.consumer.session.timeout}") private string sessionTimeout; @Value ("$ {kafka.consumer.auto.commit.interval}") private string autocomitinterval autocomitinterval; @Value ("$ {kafka.consumer.group.id}") Private String Groupid; @Value ("$ {kafka.consumer.auto.offset.reset}") private string autooffsetreset; @Value ("$ {kafka.consumer.concurrency}") private int concorrência; @Bean public kafkalistEnerContainerFactory <ConcurrentMessAgelistEnerContainer <String, String >> KafkalistEnerContainerFactory () {ConcurrentkAfKalistEnerContAinnerFactor fActory.SetConsumerFactory (ConsumerFactory ()); Factory.setConcurrency (concorrência); Factory.getContainerProperties (). SetpollTimeout (1500); fábrica de retorno; } public ConsumerFactory <String, String> ConsumerFactory () {Return New DefaultKAfkaconsumerFactory <> (consumerConfigs ()); } mapa public <string, object> consumerConfigs () {map <string, object> propsmap = new hashmap <> (); propsmap.put (consumerConfig.bootstrap_servers_config, servidores); propsmap.put (consumerConfig.enable_auto_commit_config, enabautocommit); propsmap.put (consumerConfig.auto_commit_interval_ms_config, autoCommitinterval); propsmap.put (consumerConfig.session_timeout_ms_config, sessionTimeout); propsmap.put (consumerConfig.key_deserializer_class_config, stringDeserializer.class); propsmap.put (consumerConfig.value_deserializer_class_config, stringDeserializer.class); propsmap.put (consumerConfig.group_id_config, groupID); propsmap.put (consumerConfig.auto_offset_reset_config, autooffsetreset); Retornar PropsMap; } @Bean Public ouvinte ouvinte () {return new ouvinte (); }}o novo ouvinte () gera um feijão para processar dados lidos em Kafka. A demonstração de implementação simples do ouvinte é a seguinte: basta ler e imprimir os valores da chave e da mensagem
O atributo de tópicos do @Kafkalistener é usado para especificar o nome do tópico Kafka. O nome do tópico é especificado pelo produtor de mensagens, ou seja, é especificado pelo Kafkatemplate ao enviar uma mensagem.
pacote com.kangaroo.sentinel.collect.configuration; importar org.apache.kafka.clients.consumer.consumerRecord; importar org.slf4j.logger; importação {slf4j.loggerFactory; importer org.springFramework.kafka.annotation.kantation.kafkafter; LoggerFactory.getLogger (this.getClass ()); @Kafkalistener (tópicos = {"test"}) public void ouça (ConsumerRecord <?,?> Registro) {logger.info ("KAFKA's Key:" + Record.key ()); Logger.info ("Valor de Kafka:" + Record.value (). ToString ()); }}Pontas:
1) Não introduzi como instalar e configurar o kafka. É melhor usar um IP de rede totalmente ligado ao configurar Kafka, em vez de localhost ou 127.0.0.1
2) É melhor não usar o próprio Zookeeper de Kafka para implantar Kafka, pois isso pode causar acesso à inacessibilidade.
3) Teoricamente, o consumidor deve ler Kafka através do Zookeeper, mas aqui estamos usando o endereço do Kafkaserver, por que não entramos em profundidade?
4) Ao definir a configuração da mensagem de monitoramento, o valor do item de configuração do grupo_ID_CONFIG é usado para especificar o nome do grupo de consumidores. Se houver vários objetos do ouvinte no mesmo grupo, apenas um objeto ouvinte poderá receber a mensagem.
O exposto acima é todo o conteúdo deste artigo. Espero que seja útil para o aprendizado de todos e espero que todos apoiem mais o wulin.com.