springスプリングブート项目
Intellijのアイデア
添加カフカ集成マベン
<?xml version = "1.0" encoding = "utf-8"?> <project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns:xsi = "http://www.w3.org/2001/xmlschema-instance <http://www.w3.org/2001 xsi:schemalocation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.0.sdsd"> <modeleversion> 4.0.0 </modelversion> <グループ< <バージョン> 0.0.1-snapshot </version> <packaging> jar </packaging> <name> demo </name> <description> spring boot </description> <parent> <groupid> org.springframework.boot </groupid> <artifactid> spring-boot-starter- </artifactid> </> <! - リポジトリからのlookup parent-> </parent> <properties> <properties> <project.build.sourceencoding> utf-8 </project.build.sourceencoding> <project.outputencoding> utf-8 </project.reporting.outputencodin <Dependencies> <Dependency> <GroupId> org.springframework.boot </groupid> <artifactid> spring-boot-starter-web </artifactid> </dependency> <redency> <groupid> org.springframework.kafka </groupid> spring- kafka <groupid> org.springframework.boot </groupid> <artifactid> spring-boot-starter-test </artifactid> <scope>テスト</scope> </dependency> </dependency> <blubins> <プラグイン> <groupid> org.springframework.boot </groupid> <artifactid> spring-boot-maven-plugin </artifactid> </plugin> </plugins> </build> </project>
Application.Properties添加
spring.kafka.bootstrap-servers = vm208:9092、vm:9092、vm50:9092spring.kafka.consumer.auto-offset-reset = ribrotspring.kafka.consumer.group-id = l ocal_testspring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.stringdeserializerspring.kafka.consumer.value-deseri Alizer = org.apache.kafka.common.serialization.stringdeserializerspring.kafka.producer.key-serializer = org.apache.kafka.common.serialization .StringSerializerspring.Kafka.Producer.Value-Serializer = org.apache.kafka.common.serialization.stringserializerspring.kafka.producer.acks = 1
新建kafkaconsumer消费类
パッケージcom.example.demo.consumer; Import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import org.slf4j.loggeractory; import org.springframework.kafka.annotation.kafkalistener; org.springframework.stereotype.component; @componentpublic class kafkaconsumer {private logger logger = loggerfactory.getLogger(this.getClass()); @kafkalistener(topics = {"test"})public void risten(consumerrecord <?、>> record){system.out.printf( "offset =%d、key =%s、value =%s/n"、record.offset()、rescory()、rescord.value()); }}启动スプリングブート程序、在kafka集群、模拟发送トピック、检验接收
复制代码代码如下:bin/kafka-console-producer.sh-ブローカーリストVM208:9092、VM210:9092、VM50:9092 - トピックテスト
编写プロデューサー代码
パッケージcom.example.demo.producer; Import org.apache.kafka.clients.producer.producerrecord; import org.springframework.beans.factory.annotation.autowired; Import org.springframework.kafka.core.kore.koremplate; Import; org.springframework.stereotype.component; @componentpublic class kafkaproducer {@autowired private kafkatemplate kafkatemplate;文字列トピック= "test"; public void sendmessage(string key、string data){kafkatemplate.send(new produceerrecord(topic、key、data)); }}Restful模拟发送(//http://localhost:8080/kafka/send.do?key=2&data=allen-test-message)
パッケージcom.example.demo.controller; Import com.example.demo.producer.kafkaproducer; Import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.requestmappy; springframework.web.bind.annotation.requestmethod; Import org.springframework.web.bind.annotation.requestparam; import org.springframework.web.bind.annotation.restcontroller; @RestControllerRollerPublic Class Prasublic Class PrivateRoller Kafkaproducer; @RequestMapping(value = "/kafka/send.do"、method = requestmethod.get)public string sendmessage(@requestparam(value = "key")string key、 @requestparam(value = "data"){kafkaproducer.sendmessage(key、key、data); 「成功」を返します。 }}spring-kafka大大减少了代码工作量。
官方文档:https://docs.spring.io/spring-kafka/docs/1.2.2.release/reference/html/
以上就是本文的全部内容、希望对大家的学习有所帮助、也希望大家多多支持武林网。