新建春季靴子项目
这里使用Intellij Idea
添加kafka集成
<?xml版本=“ 1.0” conding =“ utf-8”?> <project xmlns =“ http://maven.apache.org/pom/4.0.0.0” xmlns:xsi =“ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>demo</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.8.RELEASE</version> <relativePath/> <!--来自存储库的查找父 - > </parth> <properies> <project.build.sourceencoding> utf-8 </project.build.build.sourceencoding> <project.reporting.outputenconcoding.outputencodencoding> utf-8 </project.reporting.reporting.outputence.outputencecodenciie> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId> org.springframework.boot </groupId> <artifactid> spring-boot-starter-test-test </artifactid> <scope> <scope> test </scope> </deverency> </distionency> </distionencies> </disporencies> <plugins> <plugins> <plugin> <plugin> <gropsId> <groupId> <groupId> org.springfringfringfregringfrage.boot> <Artifactid> spring-boot-maven-plugin </artifactid> </plugin> </plugins> </build> </project> </project>
项目中application.properties添加
spring.kafka.bootstrap-servers = VM208:9092,VM:9092,VM50:9092SPRING.KAFKA.CONSUMER.AUTO offset-reset-reset-reset-reset-reset-reset-reset-reset = exatssspring.kafka.consumer.consumer.group.group-id = l ocal_testspring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.stringdeserializerspring.kafka.consumer.value-deseri alizer = org.apache.kafka.common.serialization.stringDeserializerspring.kafka.producer.key-serialializer = org.apache.kafkace.kafka.common.serialization .stringserializerspring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.stringserializerspring.kafka.producer.acks.acks = 1
新建kafkaconsumer消费类
包com.example.demo.consumer; import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org org.springframework.stereotype.component; @componentPublic类kafkaconsumer {private logger logger = loggerfactory.getlogger(this.getClass()); @kafkalistener(topics = {“ test”})public void listing(computerRecord <?,?> record){system.out.out.printf(“ offset =%d,key =%s,value =%s/n”,record.offse.offset(),record.key(),record.key(),record.value(value()); }}}spring-boot程序,在kafka集群,模拟发送主题,检验接收
复制代码代码如下:bin/kafka-console-producer.sh-经纪列表VM208:9092,VM210:9092,VM50:9092-主题测试
编写生产商代码
软件包com.example.demo.producer; import org.apache.kafka.clients.producer.producercord; import org.springframework.beans.beans.beans.factory.annotatory.antowired.autowired; import org.springFringframeWork.kafka.kafka.kafka.kafka.kafkafkafkatemplate; org.springframework.stereotype.component; @componentPublic类kafkaproducer {@autowired private kafkatemplate kafkatemplate kafkatemplate;字符串主题=“ test”; public void sendmessage(字符串键,字符串数据){kafkatemplate.send(new ProducerErcord(topic,key,data)); }}}建立一个RESTFUL模拟发送(//http://localhost:8080/kafka/send.do?key=2&data=allen-test-message)
软件包com.example.demo.controller; import com.example.demo.producer.kafkaproducer; import org.springframework.beans.beans.factory.annotation.Antotation.Autowired; import org.springframework.springframework.web.bind.annot.annot.bind.bind.bind.requestation.requestmappation.requestmappation; emptmapping; import.requestmapping; import; import; org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class ProducerController { @Autowired private KafkaProducer kafkaproducer; @requestmapping(value =“/kafka/send.do”,method = requestMethod.get)public string sendmessage(@requestparam( @requestParam(value =“ key”)字符串键,@requestParam( @requestParam(value =“ data data”)字符串数据){返回“成功”; }}}spring-kafka大大减少了代码工作量。
官方文档:https://docs.spring.io/spring-kafka/docs/1.2.2.2.2.release/reference/html/
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。,也希望大家多多支持武林网。