写了个 KAFKA 的 Java Demo 顺便记录下 , 仅供参考
1. 创建 Maven 项目
目录如下 :
2. POM 文件 :
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Kafka-Maven</groupId> <artifactId>Kafka-Maven</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId> org.apache.kafka </GroupId> <ArtifactId> kafka_2.11 </artifactid> <serse> 0.10.1.1 </version> </argefactid> <dehype> <groupid> org.apache.hadoop </GroupD> <pretafactid> hadoop-common </artifactid> <sers >2. <groupId> org.apache.hadoop </GroupId> <ArtifactId> HADOOP-hdfs </artifactid> <serse> 2.2.0 </version> </argeOp> <dehype> <groupid> org.apache.hadoop </GroupId> <strifactid> hadoop-client </artifactid> <serse> 2.2.0 </Group> </artifactid> зависимость> </artifactid> <seriate> 2.2.0 </version> </artifactid> </artifactid> <seriate>. <groupId> org.apache.hbase </GroupId> <ArtifactId> hbase-client </artifactid> <sersiod> 1.0.3 </version> </depervice> <dehyedence> <groupid> org.apache.hbase </GroupId> <strifactid> hbase-server </artifactid> <series> 1.0.3 </версия> </resemency> </artifactid> <seriate> 1.0.0.3 </artifactid> hbase </artifactid> <series> 1.0. <groupId> org.apache.hadoop </GroupId> <ArtifactId> HADOOP-hdfs </artifactid> <serse> 2.2.0 </version> </depervice> <dehydency> <groupid> jdk.tools </GroupId> <strifactid> jdk.tools </artifactId> <serse> 1.7 <sers> <serse> </rapope> </rappope> </racpope> </rappope> </rapope> </racpope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.3.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId> org.apache.maven.plugins </GroupId> <ArtifactId> maven-compiler-plugin </artifactid> <ponfiguration> <source> 1.7 </source> <Target> 1.7 </target> </configuration> </plugin> </plucins> </build> </project> </target> </configuration> </placin> </plugins> </build> </project> 3. Кафка 生产者 kafkaproduce:
Пакет com.lijie.producer; import java.io.file; import java.io.fileinputstream; import java.util.properties; импорт org.apache.kafka.clients.producer.callback; import.apache.kafka.clients.producer.kafkaproducer; org.apache.kafka.clients.producer.producerRecord; import org.apache.kafka.clients.producer.recordmetadata; импорт org.slf4j.logger; импорт org.slf4j.loggerfactory; public class kafkaproduce; static {свойства = новые свойства (); String path = kafkaproducer.class.getresource ("/"). GetFile (). ToString () + "kafka.properties"; try {fileInputStream fis = new FileInputStream (новый файл (path)); Свойства. загрузка (FIS); } catch (Exception e) {e.printstackTrace (); }} / ** * 发送消息 * * * @param Тема * @param ключа * @param value * / public void sendmsg (тема строки, byte [] key, byte [] value) {// 实例化 создает kafkaproducer <byte [], byte []> kp = new kafkaproducer <byte [], []> (properties); // 消息封装 ProducterRecord <byte [], byte []> pr = new Productercord <byte [], byte []> (тема, ключ, значение); // 发送数据 kp.send (pr, new callback () {// 回调函数 @override public void oncomplotion (metadata recordmetadata, исключение исключения) {if (null! = Exception) {System.out.println ("记录的 offset 在:" + metadata.offset ()); system.out.println (execultmess); }); // 关闭 производить kp.close (); }} 4. Кафка 消费者 kafkaconsume :
Пакет com.lijie.consumer; импорт java.io.file; импорт java.io.fileinputstream; import java.util.hashmap; import java.util.list; import java.util.map; import.moutil.properties; import orgache.htrace.fasterxmal.shivind.opersemperse.operperties. com.lijie.pojo.user; import com.lijie.utils.jsonutils; импорт kafka.consumer.consumerconfig; импорт kafka.consumer.consumeriterator; импорт kafka.consumer.kafkastream; import kafka.javaapi.consumerconconconcomnector kafka.serializer.stringdecoder; import kafka.utils.VerififuleProperties; открытый класс kafkaconsume {private final Static String topic = "lijietest"; частные статические свойства свойства; static {свойства = новые свойства (); String path = kafkaconsume.class.getresource ("/"). Getfile (). ToString () + "kafka.properties"; try {fileInputStream fis = new FileInputStream (новый файл (path)); Свойства. загрузка (FIS); } catch (Exception e) {e.printstackTrace (); }} / ** * 获取消息 * * @Throws Exception * / public void getMsg () throws exection {consumerConfig config = new ConsumerConfig (Properties); ConsumerConnector Consumer = kafka.consumer.consumer .createjavaconsumerconnector (config); Map <string, integer> topencountmap = new hashmap <string, integer> (); topencountmap.put (тема, новое целое число (1)); StringDecoder keyDecoder = new StringDecoder (new VerifiableProperties ()); StringDecoder valueDecoder = new StringDecoder (new VerifiableProperties ()); Map <string, list <kafkastream <string, string >>> consumermap = consumer .createmessAgeStreams (The TapeCountMap, KeyDecoder, ValueDecoder); Kafkastream <string, string> stream = consumermap.get (тема) .get (0); ConsumerIterator <String, String> it = Stream.iterator (); while (it.hasnext ()) {string json = it.next (). message (); Пользователь пользователь = (пользователь) jsonutils.jsontoobj (json, user.class); System.out.println (пользователь); }}} 5. Кафка. Прозрачные 文件
## Производство bootstrap.servers = 192.168.80.123: 9092producer.type = syncrequest.required.acks = 1serializer.class = kafka.serializer.defaultenco derkey.serializer = org.apache.kafka.common.serialization.bytearrayserializervalue.serializer = org.apache.kafka.common.serialization.byt Earrayserializerbak.partitioner.class = kafka.producer.defaultpartitionerbak.key.serializer = org.apache.kafka.common.serialization.stri ngserializerbak.value.serializer = org.apache.kafka.common.serialization.stringserializer ## unsumezookeeper.connect = 192.168.80.123: 2181 group.id = lijiegroup Zookeeper.session.timeout.ms = 4000 zookeeper.sync.time.ms = 200 auto.commit.interval.ms = 1000 auto.offset.reset = наименьший serializer.class = kafka.serializer.stringencoder
以上就是本文的全部内容 , 希望对大家的学习有所帮助 也希望大家多多支持武林网。