新建春季靴子項目
這裡使用Intellij Idea
添加kafka集成
<? xml版本=“ 1.0” conding =“ utf-8”? > <project xmlns =“ http://maven.apache.org/pom/4.0.0.0” xmlns:xsi =“ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>demo</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.8.RELEASE</version> <relativePath/> <!--來自存儲庫的查找父 - > </parth> <properies> <project.build.sourceencoding> utf-8 </project.build.build.sourceencoding> <project.reporting.outputenconcoding.outputencodencoding> utf-8 </project.reporting.reporting.outputence.outputencecodenciie> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId> org.springframework.boot </groupId> <artifactid> spring-boot-starter-test-test </artifactid> <scope> <scope> test </scope> </deverency> </distionency> </distionencies> </disporencies> <plugins> <plugins> <plugin> <plugin> <gropsId> <groupId> <groupId> org.springfringfringfregringfrage.boot> <Artifactid> spring-boot-maven-plugin </artifactid> </plugin> </plugins> </build> </project> </project>
項目中application.properties添加
spring.kafka.bootstrap-servers = VM208:9092,VM:9092,VM50:9092SPRING.KAFKA.CONSUMER.AUTO offset-reset-reset-reset-reset-reset-reset-reset-reset = exatssspring.kafka.consumer.consumer.group.group-id = l ocal_testspring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.stringdeserializerspring.kafka.consumer.value-deseri alizer = org.apache.kafka.common.serialization.stringDeserializerspring.kafka.producer.key-serialializer = org.apache.kafkace.kafka.common.serialization .stringserializerspring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.stringserializerspring.kafka.producer.acks.acks = 1
新建kafkaconsumer消費類
包com.example.demo.consumer; import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org org.springframework.stereotype.component; @componentPublic類kafkaconsumer {private logger logger = loggerfactory.getlogger(this.getClass()); @kafkalistener(topics = {“ test”})public void listing(computerRecord <?,?> record){system.out.out.printf(“ offset =%d,key =%s,value =%s/n”,record.offse.offset(),record.key(),record.key(),record.value(value()); }}}spring-boot程序,在kafka集群,模擬發送主題,檢驗接收
複製代碼代碼如下:bin/kafka-console-producer.sh-經紀列表VM208:9092,VM210:9092,VM50:9092-主題測試
編寫生產商代碼
軟件包com.example.demo.producer; import org.apache.kafka.clients.producer.producercord; import org.springframework.beans.beans.beans.factory.annotatory.antowired.autowired; import org.springFringframeWork.kafka.kafka.kafka.kafka.kafkafkafkatemplate; org.springframework.stereotype.component; @componentPublic類kafkaproducer {@autowired private kafkatemplate kafkatemplate kafkatemplate;字符串主題=“ test”; public void sendmessage(字符串鍵,字符串數據){kafkatemplate.send(new ProducerErcord(topic,key,data)); }}}建立一個RESTFUL模擬發送(//http://localhost:8080/kafka/send.do?key=2&data=allen-test-message)
軟件包com.example.demo.controller; import com.example.demo.producer.kafkaproducer; import org.springframework.beans.beans.factory.annotation.Antotation.Autowired; import org.springframework.springframework.web.bind.annot.annot.bind.bind.bind.requestation.requestmappation.requestmappation; emptmapping; import.requestmapping; import; import; org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class ProducerController { @Autowired private KafkaProducer kafkaproducer; @requestmapping(value =“/kafka/send.do”,method = requestMethod.get)public string sendmessage(@requestparam( @requestParam(value =“ key”)字符串鍵,@requestParam( @requestParam(value =“ data data”)字符串數據){返回“成功”; }}}spring-kafka大大減少了代碼工作量。
官方文檔:https://docs.spring.io/spring-kafka/docs/1.2.2.2.2.release/reference/html/
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。 ,也希望大家多多支持武林網。