写了个 kafka 的 demo java 顺便记录下 , 仅供参考
1. 创建 Maven 项目
目录如下 :
2.pom : :
<Project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema-instance" xsi: schemalocation = "http:/maven.romp.romp.romp. http://maven.apache.org/xsd/maven-4.0.0.xsd "> <podelversion> 4.0.0 </podelversion> <groupid> kafka-maven </groupid> <ArTifactid> Kafka-Maven </artifactid> <version> <siversion> <TERPERENCE-SNAPSHOT </ArtifactID> <version> 0.0.1.1 <GroupId> org.apache.kafka </groupid> <ArtifactId> kafka_2.11 </arttifactid> <version> 0.10.1.1 </version> </dependency> <dependency> <groupid> org.apache.hadoop </sroupid> <ArTifacTid> Hadoop-common </Artifaccid </groupid> <ArTifacTid> Hadoop-common </Artifaccid </Groupid> <ArTifacTid> Hadoop-common </Artifaccid </GroupId> <ArTifacTid> Hadoop-common </Artifaccid </Version 2.2. <GroupId> org.apache.hadoop </groupid> <ArtifactId> Hadoop-hdfs </t ArtifactId> <version> 2.2.0 </version> </dependency> <dependency> <groupid> org.apache.hadoop </groupid> <Artifactid> HAdoop-client </Artifactid> </GroupId> <ArTifactId> HADOOP-CLIENT </Artifactid> 2.2. <groupId> org.apache.hbase </groupid> <ArTifactId> hbase-client </t Artifactid> <version> 1.0.3 </version> </dependency> <dependency> <groupid> org.apache.hbase </sroupid> <Artifactid> HBase-SERVER </arttifache.hbase </groupid> <Artifactid> HBase-SERVER </artifache.hbase </groupId> <Artifactid> HBase-SERVER </artifacheD> <TRONENCY> <TROGINGENCY <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7</version> <scope>system</scope> <systempath> $ {java_home} /lib/tools.jar </systempath> </dependency> <dependency> <groupid> org.apache.httpComponents </groupid> <ArTifactid> httpClient </arttifactid> <version> 4.3.6 </versi </versi> </drenponence </artifactid> <sentifactid> <version> 4.3.6 </versi </versi> </versi> </artifactid </artifactid> </artifactid </artifactid </artifactid <groupId> org.apache.maven.plugins </groupid> <ArTifactId> maven-compiler-plugin </artifactid> <donfiguration> <source> 1.7 </source> <bangun> 1.7 </sarget> </konfigurasi> </lugin> </plugins> </build> </proyek> 3.kafka 生产者 kafkaproduce:
Paket com.lijie.producer; import java.io.file; impor java.io.fileInputStream; import java.util.properties; import org.apache.kafka.clients.producer.callback; impor org.apache.kafka.clients.cloucer.kafrack; org.apache.kafka.clients.producer.producerrecord; impor org.apache.kafka.clients.producer.recordmetadata; impor org.slf4j.logger; impor org.slf4j.loggerFactory; kelas publik kafkaproduce {slf4j.loggerFactory; kelas publik kafkaproduce {slf4j staties staties; statis {properties = properti baru (); String path = kafkaproducer.class.getResource ("/"). GetFile (). Tostring () + "kafka.properties"; coba {fileInputStream fis = FileInputStream baru (file baru (path)); Properties.Load (FIS); } catch (Exception e) {E.PrintStackTrace (); }} / ** * 发送消息 * * @param Topic * @param Key * @param value * / public void sendMsg (string topic, byte [] key, byte [] value) {// 实例化 menghasilkan kafkaproducer <byte [], byte []> kp = new kafkaproducer <byte [], byte [] []> kp = new kafkaproducer <byte [], byte [] [] []; // 消息封装 ProduceerRecord <byte [], byte []> pr = new ProduceerRecord <byte [], byte []> (topik, kunci, nilai); // 发送数据 kp.send (pr, new callback () {// 回调函数 @Override public void onCompletion (recordMetadata metadata, pengecualian pengecualian) {if (null! = Exception) {System.out.println ("记录的 offset 在:" + Metadata.offset (); }}}); // 关闭 menghasilkan kp.close (); }} 4.kafka 消费者 kafkaconsume :
Paket com.lijie.consumer; import java.io.file; impor java.io.fileInputstream; import java.util.hashmap; impor java.util.list; import java.util.map; import java.util.proPerties; impor org.apache.htrace.htrace.htrace.htrace com.lijie.pojo.user; import com.lijie.utils.jsonutils; impor kafka.consumer.consumerconfig; impor kafka.consumer.consumeriterator; impor kafka.consumer.kafkastream; impor kafka.jaapia.consumer. kafka.serializer.stringdecoder; import kafka.utils.verifiableProperties; kelas publik kafkaconsume {private final static string topic = "lijietest"; Properti Privat Statis Pribadi; statis {properties = properti baru (); String path = kafkaconsume.class.getResource ("/"). GetFile (). Tostring () + "kafka.properties"; coba {fileInputStream fis = FileInputStream baru (file baru (path)); Properties.Load (FIS); } catch (Exception e) {E.PrintStackTrace (); }} / ** * 获取消息 * * @Throws Exception * / public void getMSG () melempar Exception {ConsumerConfig config = new ConsumerConfig (properti); Konsumen konsumen = kafka.consumer.consumer .createJavaconsumerConnector (config); Peta <String, Integer> TopicCountMap = HashMap baru <String, Integer> (); TopicCountMap.put (topik, bilangan bulat baru (1)); StringDecoder keyDecoder = stringDecoder baru (new verifikasiProperties ()); StringDecoder heVeedecoder = new StringDecoder (new verifiableProperties ()); Peta <string, daftar <kafkastream <string, string >>> consumerMap = konsumen .createMessAgeStreams (TopicCountMap, keyDecoder, heVeAdeCoder); Kafkastream <string, string> stream = consumerMap.get (topic) .get (0); Konsumeriterator <string, string> it = stream.iterator (); while (it.hasnext ()) {string json = it.next (). message (); Pengguna pengguna = (pengguna) jsonutils.jsontoobj (json, user.class); System.out.println (pengguna); }}} 5.kafka.properties 文件
## ProduceBootStrap.Servers = 192.168.80.123: 9092Producer.type = syncRequest.Required.acks = 1Serializer.class = kafka.serializer.defaultenco derkey.serializer = org.apache.kafka.common.serialization.bytearrayserializerValue.serializer = org.apache.kafka.common.serialization.byt earrayserializerbak.partitioner.class = kafka.producer.defaultpartitionerbak.key.serializer = org.apache.kafka.common.serialization.stri ngserializerbak.value.serializer = org.apache.kafka.common.serialization.stringserializer ## consumeZookeeper.connect = 192.168.80.123: 2181 group.id = LiJieGroup zooKeeper.Session.timeout.ms = 4000 zooKeeper.sync.time.ms = 200 auto.commit.interval.ms = 1000 auto.offset.reset = serializer terkecil.class = kafka.serializer.Stringencodererer
以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。