This article introduces how to integrate kafka sending and receiving messages in springboot project.
Kafka is a high-throughput distributed publish-subscribe message system, with the following characteristics: Provide message persistence through the O(1) disk data structure, which can maintain stable performance for a long time even if message storage is terabytes. High Throughput: Even very ordinary hardware Kafka can support millions of messages per second. Supports partitioning of messages through Kafka servers and consumer clusters. Support Hadoop parallel data loading.
Install Kafka
Because installing kafka requires the support of zookeeper, when installing Windows, you need to install zookeeper first and then install kafka. Below I will give you the steps for installing Mac and the points to pay attention to. The configuration of Windows is almost no different except for the different locations.
brew install kafka
Yes, it's that simple. You can handle it with a command on the mac. This installation process may require a while, and it should be related to the network status. There may be an error message in the installation prompt message, such as "Error: Could not link: /usr/local/share/doc/homebrew". This does not matter, it will be automatically ignored. Finally, we succeeded when we saw what was below.
==> Summary 🠺/usr/local/Cellar/kafka/1.1.0: 157 files, 47.8MB
The installation configuration file location is as follows, just modify the port number according to your needs.
Installed zoopeeper and kafka location /usr/local/Cellar/
Configuration file /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
Start zookeeper
Copy the code as follows:./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
Start kafka
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
Create a Topic for kafka. The topic is named test. You can configure it to the name you want. Go back and configure it in the code correctly.
Copy the code as follows:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1. Solve dependencies first
We won't mention the dependencies related to springboot. The dependencies related to kafka only rely on a spring-kafka integration package.
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
Here we will show the configuration file first
#================== kafka ======================================== kafka.consumer.servers=10.93.21.21:2181kafka.consumer.enable.auto.commit=truekafka.consumer.session.timeout=6000kafka.consumer.auto.commit.interval=100kafka.consumer.auto.offset .reset=latestkafka.consumer.topic=testkafka.consumer.group.id=testkafka.consumer.concurrency=10kafka.producer.servers=10.93.21.21:9092kafka.producer.retries=0kafka.producer.batch.size=4096kafka.producer.linger=1kafka.producer.buffer.memory=40960
2. Configuration: Kafka producer
1) Declare Config and open KafkaTemplate capability through @Configuration and @EnableKafka.
2) Inject kafka configuration in the application.properties configuration file through @Value.
3) Generate bean, @Bean
package com.kangaroo.sentinel.collect.configuration;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;@Configuration@EnableKafkapublic class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); }}Experiment our producer and write a Controller. Want topic=test, key=key, send message
package com.kangaroo.sentinel.collect.controller;import com.kangaroo.sentinel.common.response.Response;import com.kangaroo.sentinel.common.response.ResultCode;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.*;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;@RestController@RequestMapping("/kafka")public class CollectController { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private KafkaTemplate kafkaTemplate; @RequestMapping(value = "/send", method = RequestMethod.GET) public Response sendKafka(HttpServletRequest request, HttpServletResponse response) { try { String message = request.getParameter("message"); logger.info("kafka message={}", message); kafkaTemplate.send("test", "key", message); logger.info("Send kafka successfully."); return new Response(ResultCode.SUCCESS, "Send kafka successfully", null); } catch (Exception e) { logger.error("Send kafka failed", e); return new Response(ResultCode.EXCEPTION, "Send kafka failed", null); } }}3. Configuration: kafka consumer
1) Declare Config and open KafkaTemplate capability through @Configuration and @EnableKafka.
2) Inject kafka configuration in the application.properties configuration file through @Value.
3) Generate bean, @Bean
package com.kangaroo.sentinel.collect.configuration;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } @Bean public Listener listener() { return new Listener(); }}new Listener() generates a bean to process data read from kafka. Listener's simple implementation demo is as follows: just simply read and print the key and message values
@KafkaListener's topics attribute is used to specify the kafka topic name. The topic name is specified by the message producer, that is, it is specified by kafkaTemplate when sending a message.
package com.kangaroo.sentinel.collect.configuration;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;public class Listener { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics = {"test"}) public void listen(ConsumerRecord<?, ?> record) { logger.info("kafka's key: " + record.key()); logger.info("kafka's value: " + record.value().toString()); }}Tips:
1) I did not introduce how to install and configure kafka. It is best to use a fully bind network ip when configuring kafka, rather than localhost or 127.0.0.1
2) It is best not to use kafka's own zookeeper to deploy kafka, as it may cause access to inaccessibility.
3) Theoretically, the consumer should read kafka through zookeeper, but here we are using the address of kafkaserver, why didn't we go into it in depth?
4) When defining the monitoring message configuration, the value of the GROUP_ID_CONFIG configuration item is used to specify the name of the consumer group. If there are multiple listener objects in the same group, only one listener object can receive the message.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.