Este artículo presenta cómo integrar los mensajes de envío y recepción de Kafka en el proyecto SpringBoot.
Kafka es un sistema de mensajes de suscripción de subscribe de alto rendimiento de alto rendimiento, con las siguientes características: proporcionar persistencia de mensajes a través de la estructura de datos de disco O (1), que puede mantener un rendimiento estable durante mucho tiempo incluso si el almacenamiento de mensajes es terabytes. Alto rendimiento: incluso el hardware muy ordinario Kafka puede admitir millones de mensajes por segundo. Admite la partición de mensajes a través de servidores Kafka y grupos de consumo. Apoya la carga de datos paralelos de Hadoop.
Instalar kafka
Debido a que la instalación de Kafka requiere el soporte de Zookeeper, al instalar Windows, primero debe instalar Zookeeper y luego instalar Kafka. A continuación le daré los pasos para instalar Mac y los puntos a los que prestar atención. La configuración de Windows casi no es diferente, excepto las diferentes ubicaciones.
Brew instalar kafka
Sí, es así de simple. Puede manejarlo con un comando en la Mac. Este proceso de instalación puede requerir un tiempo, y debe estar relacionado con el estado de la red. Puede haber un mensaje de error en el mensaje de solicitud de instalación, como "Error: no se pudo vincular:/usr/local/share/doc/homebrew". Esto no importa, se ignorará automáticamente. Finalmente, tuvimos éxito cuando vimos lo que estaba debajo.
==> Resumen ðÿ º/usr/local/celella/kafka/1.1.0: 157 archivos, 47.8mb
La ubicación del archivo de configuración de instalación es la siguiente, simplemente modifique el número de puerto de acuerdo con sus necesidades.
Ubicación de Zoopeeper y Kafka instalada/USR/Local/Cellar/
Archivo de configuración /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
Iniciar Zookeeper
Copie el código de la siguiente manera: ./ bin/Zookeeper-Server-start /usr/local/etc/kafka/zookeeper.properties &
Comienza Kafka
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
Crea un tema para Kafka. El tema se llama prueba. Puede configurarlo al nombre que desee. Regrese y configúrelo en el código correctamente.
Copie el código de la siguiente manera: ./ bin/kafka-topics --create-zookeeper localhost: 2181--Repplication-Factor 1-Partitions 1-Tope Test
1. Resuelve las dependencias primero
No mencionaremos las dependencias relacionadas con SpringBoot. Las dependencias relacionadas con Kafka solo se basan en un paquete de integración Spring-Kafka.
<Spendency> <MoupRoMID> org.springframework.kafka </groupid> <artifactid> spring-kafka </arfactid> <versions> 1.1.1.release </versión> </pendency>
Aquí mostraremos el archivo de configuración primero
#=================== Kafka =========================================== kafka.consumer.servers = 10.93.21.21: 2181kafka.consumer.enable.auto.commit = truekafka.consumer.session.timeout = 6000kafka.consumer.auto.commit.interval = 100kafka.consumer.auto.offset .reset = ortatkafka.consumer.topic = testkafka.consumer.group.id = testkafka.consumer.concurrency = 10kafka.producer.servers = 10.9 3.21.21: 9092kafka.producer.retries = 0kafka.producer.batch.size = 4096kafka.producer.linger = 1kafka.producer.buffer.memory = 40960
2. Configuración: Productor de Kafka
1) Declare la capacidad de configuración y abra Kafkatemplate a través de @Configuration y @enablekafka.
2) Inyecte la configuración de Kafka en el archivo de configuración Application.Properties a través de @Value.
3) Generar frijoles, @bean
paquete com.kangaroo.sentinel.collect.configuration; import java.util.hashmap; import java.util.map; importar org.apache.kafka.clients.producer.producerConfig; import org.apache.kafka.common.serialization.stringserializer; import; import; org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import og.springframework.kafka.annotation.enablekafka; import org.springframework.kafka.core.defaultkafkaproducFactory; import org.springframework.kafka.core.kafkatemplate; import org.springframework.kafka.core.producerfactory;@configuration@enablekafkapublic de clase kafkaproducconfig { @Value ("$ {kafka.producer.servers}") servidores de cadena privada; @Value ("$ {kafka.producer.retries}") private int reintentos; @Value ("$ {kafka.producer.batch.size}") private int batchSize; @Value ("$ {kafka.producer.linger}") private int linger; @Value ("$ {kafka.producer.buffer.memory}") private int tufferMemory; MAP Public <String, Object> ProducerConfigs () {MAP <String, Object> Props = new HashMap <> (); props.put (producerConfig.bootstrap_servers_config, servidores); props.put (producerConfig.retries_config, reintis); props.put (producerConfig.batch_size_config, batchSize); props.put (producerConfig.linger_ms_config, linger); props.put (producerConfig.buffer_memory_config, bufferMemory); props.put (producerConfig.key_serializer_class_config, stringserializer.class); props.put (producerConfig.value_serializer_class_config, stringserializer.class); devolver accesorios; } public ProducerFactory <String, String> ProducerFactory () {return New DefaultkAfKAproDucFactory <> (ProducerConFigs ()); } @Bean public kafkatemplate <string, string> kafkatemplate () {return nuevo kafkatemplate <string, string> (producerFactory ()); }}Experimente a nuestro productor y escriba un controlador. Want Topic = test, Key = Key, Enviar mensaje
paquete com.kangaroo.sentinel.collect.controller; import com.kangaroo.sentinel.common.response.esponse; import org.springframework.beans.factory.annotation.aUtowired; import org.springframework.kafka.core.kafkatemplate; import org.springframework.web.bind.annotation.*; import javax.servlet.http.httpservletrequest; import javax.servlet.http.httpservletResponse;@RestController@requestmapping ("/kafka") public class CollectController {protegido final logger = loggerFactory.getLogger (this.getClasssS ()); @AUTOWIREDIREDIRD KAFKATEMPLATE KAFKATEMPLATE; @RequestMapping (valor = "/send", método = requestmethod.get) respuesta pública sendkafka (httpservletRequest solicitud, respuesta httpServletResponse) {try {String Message = request.getParameter ("mensaje"); logger.info ("kafka mensaje = {}", mensaje); kafkatemplate.send ("prueba", "clave", mensaje); logger.info ("Enviar kafka con éxito"); devolver una nueva respuesta (resultCode.Success, "Enviar kafka con éxito", nulo); } catch (excepción e) {logger.error ("Enviar kafka fallido", e); devolver una nueva respuesta (resultCode.Exception, "Enviar kafka fallido", nulo); }}}3. Configuración: Kafka Consumer
1) Declare la capacidad de configuración y abra Kafkatemplate a través de @Configuration y @enablekafka.
2) Inyecte la configuración de Kafka en el archivo de configuración Application.Properties a través de @Value.
3) Generar frijoles, @bean
paquete com.kangaroo.sentinel.collect.configuration; import org.apache.kafka.clients.consumer.consumerconfig; import org.apache.kafka.common.serialization.StringDeserializer; import og.springframework.beans.factory.annotation org. org.springframework.kafka.config.kafkalistenerContainerFactory; import org.springframework.kafka.core.consumerfactory; import org.springframework.kafka.core.defaultkaconsumerory; importación; org.springframework.kafka.listener.concurrentMessageListenerContainer; import java.util.hashmap; import java.util.map;@configuration@habildablekafkapublic class kafkaconsumerconfig {@value ("$ {kafka.consumer.servers})") @Value ("$ {kafka.consumer.enable.auto.commit}") privado booleano habilitaneutOcommit; @Value ("$ {kafka.consumer.session.timeout}") privado sessiontimeout; @Value ("$ {kafka.consumer.auto.commit.interval}") cadena privada autocommitinterval; @Value ("$ {kafka.consumer.group.id}") private string groupId; @Value ("$ {kafka.consumer.auto.offset.reset}") cadena privada autooffsetReset; @Value ("$ {kafka.consumer.concurrency}") private int concurrencia; @Bean public kafkalistenerContainerFactory <concurrentMessageListenerContainer <String, String >> kafkalistenerContainerFactory () {concurrentkafkalistenerContainerFactory <string, string> factory = new concurrentkafkalistenerContainerFactory <> (); factory.setConsumerFactory (ConsumerFactory ()); fábrica.setConcurrency (concurrencia); factory.getContainerProperties (). SetPollTimeOut (1500); Return Factory; } public ConsumerFactory <String, String> ConsumerFactory () {return New DefaultkAfKAconsumerFactory <> (consultorConfigs ()); } public map <string, object> consumerConfigs () {map <string, object> propSmap = new HashMap <> (); propsmap.put (consumerConfig.bootstrap_servers_config, servidores); propsmap.put (consumerConfig.enable_auto_commit_config, EnableautOcommit); propsmap.put (consumerConfig.auto_commit_interval_ms_config, autocommitterval); propsmap.put (consumerConfig.session_timeout_ms_config, sessionTimeOut); propSmap.put (consumerConfig.key_deserializer_class_config, stringDeserializer.class); propsmap.put (consumerConfig.value_deserializer_class_config, stringDeserializer.class); propsmap.put (consumerConfig.group_id_config, groupId); propsmap.put (consumerConfig.auto_offset_reset_config, autooffsetReset); devolver propsmap; } @Bean Public Listener Listener () {return New Listener (); }}New Listener () genera un frijol para procesar datos leídos de Kafka. La demostración de implementación simple del oyente es la siguiente: simplemente lea e imprima los valores de clave y mensajes
El atributo de temas de @Kafkalistener se usa para especificar el nombre del tema de Kafka. El nombre del tema es especificado por el productor de mensajes, es decir, es especificado por Kafkatemplate al enviar un mensaje.
paquete com.kangaroo.sentinel.collect.configuration; import org.apache.kafka.clients.consumer.consumerRecord; import org.slf4j.logger; import org.slf4j.loggerFactory; import org.spingframework.kafka.annotation.kafkalistener; public class oyearer; LoggerFactory.getLogger (this.getClass ()); @Kafkalistener (topics = {"test"}) public void Listen (ConsumerRecord <?,?> Record) {logger.info ("Kafka's Key:" + Record.key ()); logger.info ("El valor de Kafka:" + registro.value (). toString ()); }}Consejos:
1) No presenté cómo instalar y configurar Kafka. Es mejor usar una IP de red totalmente vinculada al configurar kafka, en lugar de localhost o 127.0.0.1
2) Es mejor no usar el propio Zlookeeper de Kafka para implementar Kafka, ya que puede causar acceso a la inaccesibilidad.
3) Teóricamente, el consumidor debería leer Kafka a través de Zookeeper, pero aquí estamos usando la dirección de Kafkaserver, ¿por qué no entramos en profundidad?
4) Al definir la configuración del mensaje de monitoreo, el valor del elemento de configuración group_id_config se utiliza para especificar el nombre del grupo de consumo. Si hay múltiples objetos de escucha en el mismo grupo, solo un objeto de oyente puede recibir el mensaje.
Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.