이 기사에서는 SpringBoot 프로젝트에서 Kafka 송금 및 수신 메시지를 통합하는 방법을 소개합니다.
Kafka는 다음과 같은 특성을 갖는 고 처리량 분산 게시 된 게시 구독 메시지 시스템입니다. O (1) 디스크 데이터 구조를 통한 메시지 지속성 제공 메시지 저장소가 테라 바이트 인 경우에도 오랫동안 안정적인 성능을 유지할 수 있습니다. 높은 처리량 : 매우 일반적인 하드웨어 Kafka조차도 초당 수백만 개의 메시지를 지원할 수 있습니다. Kafka 서버 및 소비자 클러스터를 통한 메시지 분할을 지원합니다. Hadoop 병렬 데이터로드를 지원합니다.
Kafka를 설치하십시오
Kafka를 설치하려면 Zookeeper의 지원이 필요하기 때문에 Windows를 설치할 때 Zookeeper를 먼저 설치 한 다음 Kafka를 설치해야합니다. 아래는 Mac을 설치하기위한 단계와주의를 기울일 포인트를 제공합니다. Windows 구성은 다른 위치를 제외하고는 거의 다르지 않습니다.
브루는 카프카를 설치합니다
예, 그렇게 간단합니다. Mac에서 명령으로 처리 할 수 있습니다. 이 설치 프로세스에는 시간이 걸릴 수 있으며 네트워크 상태와 관련이 있어야합니다. 설치 프롬프트 메시지에 "오류 :/usr/local/share/doc/homebrew"와 같은 오류 메시지가있을 수 있습니다. 이것은 중요하지 않으며 자동으로 무시됩니다. 마지막으로, 우리는 아래의 것을 보았을 때 성공했습니다.
==> 요약 ðÿ º/usr/local/cellar/kafka/1.1.0 : 157 파일, 47.8MB
설치 구성 파일 위치는 다음과 같습니다. 필요에 따라 포트 번호를 수정하십시오.
Zoopeeper 및 Kafka 위치 설치/USR/Local/Cellar/
구성 파일 /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
동물원 키퍼를 시작하십시오
다음과 같이 코드를 복사하십시오 ./ bin/Zookeeper-Server-start /usr/local/etc/kafka/zookeeper.properties &
카프카를 시작하십시오
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
Kafka를위한 주제를 만듭니다. 주제의 이름은 테스트입니다. 원하는 이름으로 구성 할 수 있습니다. 돌아가서 코드에서 올바르게 구성하십시오.
코드를 다음과 같이 복사하십시오. ./ bin/kafka-topics -create -zookeeper localhost : 2181-replication factor 1-파티션 1-주제 테스트
1. 종속성을 먼저 해결하십시오
SpringBoot와 관련된 종속성은 언급하지 않습니다. Kafka와 관련된 종속성은 Spring-Kafka 통합 패키지에만 의존합니다.
<pectionency> <groupid> org.springframework.kafka </groupid> <artifactid> Spring-Kafka </artifactid> <bersion> 1.1.1. Release </version> </fectionency>
여기에 구성 파일을 먼저 표시합니다
#================== Kafka ================================================= kafka.consumer.servers = 10.93.21.21 : 2181kafka.consumer.enable.auto.commit = truekafka.consumer.session.timeout = 6000kafka.consumer.auto.commit.interval = 100kafka.consumer.auto.offset .RESET = lickeskafka.consumer.topic = testKafka.consumer.group.id = testKafka.consumer.concurrency = 10kafka.producer.servers = 10.9 3.21.21 : 9092kafka.producer.retries = 0kafka.producer.batch.size = 4096kafka.producer.linger = 1kafka.producer.buffer.memory = 40960
2. 구성 : Kafka 프로듀서
1) @configuration 및 @enablekafka를 통해 구성을 선언하고 KafkaTemplate 기능을 열십시오.
2) @Value를 통해 Application.Properties 구성 파일에 KAFKA 구성을 주입합니다.
3) Bean, @bean을 생성하십시오
package com.kangaroo.sentinel.collect.configuration; import java.util.hashmap; import java.util.map; import org.apache.kafka.clients.producer.producerconfig; import org.apache.kafka.common.serialization.stringserialize; import org.springframework.bean.beans.annotation.value; import org.springframework.context.annotation.bean; import org.spramframework.context.annotation.configuration; import org.spramework.kafka.annotation.enablekafka; import org.springframework.kafka.core.defaultkafkaproducerfactory; import org.springframework.kafka.core.kafkatemplate; import org.springframework.kafka.core.produceRation@enablekafkapublic class kafkaproconfig {enablekafkapublic class kafkaproconfig. @Value ( "$ {kafka.producer.servers}") 개인 문자열 서버; @Value ( "$ {kafka.producer.retries}") 개인 int 검색; @Value ( "$ {kafka.producer.batch.size}") private int batchsize; @Value ( "$ {kafka.producer.linger}") private int linger; @Value ( "$ {kafka.producer.buffer.memory}") private int buffermemory; public map <string, object> produceRconfigs () {map <string, object> props = new Hashmap <> (); props.put (produceRconfig.bootstrap_servers_config, 서버); props.put (produceRconfig.retries_config, Retries); props.put (produceRconfig.batch_size_config, batchsize); props.put (produceRconfig.linger_ms_config, linger); props.put (produceRconfig.buffer_memory_config, buffermemory); props.put (produceRconfig.key_serializer_class_config, stringserializer.class); props.put (produceRconfig.value_serializer_class_config, stringserializer.class); 리턴 소품; } public producerFactory <string, String> producerFactory () {return new defaultKafkApRoducerFactory <> (ProduceRConfigs ()); } @bean public kafkatemplate <string, String> kafkatemplate () {return new KafKatemplate <String, String> (producerFactory ()); }}생산자를 실험하고 컨트롤러를 작성하십시오. 주제 = 테스트, 키 = 키, 메시지 보내기를 원합니다
package com.kangaroo.sentinel.collect.controller; import com.kangaroo.sentinel.common.response.response; import com.kangaroo.sentinel.common.response.resultcode; import org.slf4j.logger; import org.slf4j.loggerfactory; org.springframework.bean.bean.bean.annotation.autowired; import org.springframework.kafka.core.kafkatemplate; import org.springframework.web.bind.annotation javax.servlet.http.httpervletresponse;@restcontroller@restontroller@"/kafka") public class CollectController {protected final logger = loggerfactory.getLogger (this.getClass ()); @autowired 개인 kafkatemplate kafkatemplate; @requestmapping (value = "/send", method = requestmethod.get) 공개 응답 sendkafka (httpservletrequest request, httpservletreponse responsk) {try {string message = request.getParameter ( "message"); logger.info ( "kafka message = {}", message); kafkatemplate.send ( "테스트", "키", 메시지); logger.info ( "Kafka를 성공적으로 보내십시오."); 새로운 응답을 반환합니다 (resultCode.Success, "Kafka를 성공적으로 보내십시오", NULL); } catch (예외 e) {logger.error ( "Kafka 실패 보내기", e); 새로운 응답을 반환합니다 (resultCode.Exception, "Kafka 실패 보내기", NULL); }}}3. 구성 : Kafka 소비자
1) @configuration 및 @enablekafka를 통해 구성을 선언하고 KafkaTemplate 기능을 열십시오.
2) @Value를 통해 Application.Properties 구성 파일에 KAFKA 구성을 주입합니다.
3) Bean, @bean을 생성하십시오
package com.kangaroo.sentinel.collect.configuration; import org.apache.kafka.clients.consumer.consumerconfig; import org.apache.kafka.common.serialization.stringdeserializer; import org.springframework.beans.annot.value; import; org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.sprameframework.kafka.annotation.enablekafka; import org.spramework.kafka.config.config.concurrentkalistenercontainer importory; org.springframework.kafka.config.kafkalistenercontainerfactory; import org.springframework.kafka.core.consumerfactory; import org.sprameframework.kafka.core.defaultkafkaconsumerfactory; import org.springframework.kafka.listener.concurrentmessagelistenercontainer; import java.util.hashmap; import java.util.map;@configuration@enablekafkapublic 클래스 kafkaconsumerconfig {@value ( "$ {kafka.consmer.server}; @Value ( "$ {kafka.consumer.enable.auto.commit}") private boolean enableautocommit; @Value ( "$ {kafka.consumer.session.timeout}") 개인 문자열 세션 타임 아웃; @Value ( "$ {kafka.consumer.auto.auto.commit.interval}") private String autocommitinterval; @Value ( "$ {kafka.consumer.group.id}") private String groupId; @Value ( "$ {kafka.consumer.auto.offset.reset}") private String autooffsetreset; @Value ( "$ {kafka.consumer.concurrency}") 개인 int 동시성; @bean public kafkalistenercontainerfactory <concurrentMessagelistenerContainer <string, String >> kafkalistenercontainerfactory () {consurentKafkalistenerContainerFactory <string, string = new ConcurrentKafkalistenerContainerFactory <(); Factory.setConsumerFactory (socialFactory ()); Factory.SetConcurrency (동시성); Factory.getContainerProperties (). SetPollTimeout (1500); 반품 공장; } public consumerfactory <string, string> consumerFactory () {return new defaultKafkAconsumerFactory <> (CopbilerConfigs ()); } public map <string, object> considerConfigs () {map <string, object> propsmap = new Hashmap <> (); propsmap.put (ConsiderConfig.bootstrap_servers_config, 서버); propsmap.put (ConsiderConfig.enable_auto_commit_config, enableautocommit); propsmap.put (ConsiderConfig.auto_commit_interval_ms_config, autocommitinterval); propsmap.put (ConsiderConfig.session_timeout_ms_config, sessiontimeout); propsmap.put (ConsiderConfig.key_deserializer_class_config, StringDeserializer.class); propsmap.put (ConsiderConfig.value_deserializer_class_config, StringDeserializer.class); propsmap.put (ConsiderConfig.group_id_config, groupId); propsmap.put (ConsiderConfig.auto_offset_reset_config, autooffsetreset); propsmap을 반환합니다. } @bean public listener lister () {return new Learer (); }}New Learger ()는 Kafka에서 읽은 데이터를 처리하기 위해 Bean을 생성합니다. 리스너의 간단한 구현 데모는 다음과 같습니다. 키 및 메시지 값을 간단히 읽고 인쇄합니다.
@kafkalistener의 주제 속성은 Kafka 주제 이름을 지정하는 데 사용됩니다. 주제 이름은 메시지 제작자에 의해 지정됩니다. 즉, 메시지를 보낼 때 kafkatemplate에 의해 지정됩니다.
package com.kangaroo.sentinel.collect.configuration; import org.apache.kafka.clients.consumer.consumerRecord; import org.slf4j.logger; import org.slf4j.logger actory; import org.springframework.kafka.annotation.kafkalistener; probliceTer {properteder“probuter gener retcated glister retcation glister retcient gistector grister glistor loggerfactory.getLogger (this.getClass ()); @kafkalistener (주제 = { "test"}) public void Listen (CoperyErecord <?,?> record) {logger.info ( "Kafka 's key :" + record.key ()); logger.info ( "Kafka 's value :" + record.value (). tostring ()); }}팁 :
1) Kafka를 설치하고 구성하는 방법을 소개하지 않았습니다. Localhost 또는 127.0.0.1 대신 Kafka를 구성 할 때 완전히 바인딩 된 네트워크 IP를 사용하는 것이 가장 좋습니다.
2) Kafka의 자체 동물원 키퍼를 사용하여 Kafka를 배치하지 않는 것이 가장 좋습니다.
3) 이론적으로 소비자는 Zookeeper를 통해 Kafka를 읽어야하지만 여기서 우리는 Kafkaserver의 주소를 사용하고 있습니다. 왜 우리가 깊이 들어 가지 않았습니까?
4) 모니터링 메시지 구성을 정의 할 때 Group_ID_CONFIG 구성 항목의 값은 소비자 그룹의 이름을 지정하는 데 사용됩니다. 같은 그룹에 여러 개의 리스너 객체가있는 경우 하나의 리스너 객체 만 메시지를받을 수 있습니다.
위는이 기사의 모든 내용입니다. 모든 사람의 학습에 도움이되기를 바랍니다. 모든 사람이 wulin.com을 더 지원하기를 바랍니다.