新建 Federschuh 项目
这里使用 Intellij Idee
添加 Kafka 集成 Maven
<? XSI: Schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion> 4.0.0 </modelversion> <gruppe> com.example; <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>demo</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <depecials> <Deponcy> <gruppe> org.springFramework <gruppe> org.springFramework.boot </GroupId> <artifactId> Spring-Boot-Starter-Test </artifactID> <Schops> Test </Scope> </abhängig> </abhängig <artifactid> Spring-Boot-Maven-Plugin </artifactId> </plugin> </plugins> </build> </project>
项目中 application.Properties 添加
spring.kafka.bootstrap-servers = vm208: 9092, vm: 9092, vm50: 9092spring.kafka.consumer.auto-offset-reset = lunsterspring.kafka.consumer.group-id = l ocal_testspring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.stringdeserializerspring.kafka.consumer.Value-Deseri alizer = org.apache.kafka.common.serialization.stringdeserializerspring.kafka.Producer.Key-serializer = org.apache.kafka.common.Serialization .Stringserializerspring.kafka.Producer.Value-serializer = org.apache.kafka.common.Serialization.Stringserializerspring.kafka.Producer.acks = 1
新建 Kafkaconsumer 消费类
paket com.example.demo.consumer; import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import org.slf4j.loggerfactory; Imporation org. org.springframework.stereotype.comPonent; @ComponentPublic -Klasse Kafkaconsumer {private logger logger = loggerfactory.getLogger (this.getClass ()); @Kafkalistener (topics = {"test"}) public void listen (condentReRecord <?,?> Record) {System.out.printf ("Offset =%d, key =%s, value =%s/n", record.offset (), record.key (), record.Value (); }}启动 Springboot 程序, 在 Kafka 集群, 模拟发送 Thema, 检验接收
复制代码代码如下: bin/kafka-console-produz.sh-Brokerliste VM208: 9092, VM210: 9092, VM50: 9092--topische Test
编写 Produzent 代码
Paket com.example.demo.Producer; import org.apache.kafka.clients.producer.Producerrecord; import org.springframework.bean.factory.annotation org.springframework.stereotype.comPonent; @ComponentPublic Class KafkaproDucer {@autowired Private Kafkatemplate Kafkatemplate; String topic = "test"; public void sendMessage (String -Schlüssel, String -Daten) {kafkatemplate.send (neue protozialeRrecord (Thema, Schlüssel, Daten)); }}建立一个 Restful 模拟发送 (//http://localhost:8080/kafka/send.do?key=2&data=allen-test-message)
Paket com.example.demo.controller; import com.example.demo.producer.kafkaproducer; import org.springframework.beans.factory.annotation org. Kafkaproducer; @RequestMapping (value = "/kafka/send.do", method = requestMethod.get) public String sendMessage (@RequestParam (value = "key") String -Taste, @RequestParam (value = "Data") String -Daten) {kafkaProcer.sendMessage (Schlüssel, Daten, Daten); zurück "Sucess"; }}可以发现 Spring-Kafka 大大减少了代码工作量.
官方文档: https://docs.spring.io/spring-kafka/docs/1.2.2.release/reference/html/
以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。 也希望大家多多支持武林网。