写了个 Kafka 的 Java Demo 顺便记录下 , 仅供参考
1. 创建 maven 项目
: :
2.POM : :
<project xmlns = "http://maven.apache.org/pom/4.0.0" http://maven.apache.org/xsd/maven-4.0.0.xsd "> <Dodeversion> 4.0.0 </modelversion> <roupiD> kafka-maven </rougiD> <StifactId> kafka-maven </ursifactid> <roughid> org.apache.kafka </rougiD> <StifactId> kafka_2.11 </shintifactid> <الإصدار> 0.10.1.1 </version> </repreadency> <reperency> <rougid> org.apache <roughid> org.apache.hadoop </rougiD> <StifactId> hadoop-hdfs </artifactid> <الإصدار> 2.2.0 </version> </respency> <sependent> <roupiD> org.apache <roughid> org.apache.hbase </rougiD> <StifactId> hbase-client </shintifactid> <الإصدار> 1.0.3 </version> </sependency> <reperation> <roupiD> org.pache.hbase </groupiD> <rougiD> org.apache.hadoop </rougiD> <StifactId> hadoop-hdfs </shintifactid> <الإصدار> 2.2.0 </version> </sependency> <sependency> </jdk.tools </groupid> <SystemPath> $ {java_home} /lib/tools.jar </systempath> </sependency> <sependency> <roupled> org.apache.httpcomponents </rougeid> </attifactid> httpclient </stifactid> <Groper> org.apache.maven.plugins </groupId> <StifactId> maven-compiler-plugin </artifactid> <configuration> <source> 1.7 </source> <target> 1.7 </target> </configuration> </sultin> </plugins> </build> </project> 3.Kafka 生产者 kafkaproduce:
package com.lijie.producer ؛ import java.io.file ؛ import java.io.fileinputStream ؛ import java.util.properties ؛ import org.apache.kafka.clients.producer.callback ؛ import org.kafka.clients.producer.kafkaper. org.apache.kafka.clients.producer.producerRecord ؛ استيراد org.apache.kafka.clients.producer.recordMetAdata ؛ استيراد org.slf4j.logger ؛ استيراد org.slf4j.loggerfactory ؛ public kafkaproduce ثابت {properties = New Properties () ؛ string path = kafkaproducer.class.getResource ("/"). getFile (). toString () + "kafka.properties" ؛ حاول {fileInputStream fis = جديد fileInputStream (ملف جديد (مسار)) ؛ Properties.Load (FIS) ؛ } catch (استثناء e) {E.PrintStackTrace () ؛ }} / ** * 发送消息 * * param topic * param key * param value * / public void sendmsg (سلسلة السلسلة ، مفتاح البايت [] ، بايت [] القيمة) {// 实例化 إنتاج kafkaproducer <byte [] ، byte []> kfkaproducer <byte [] // 消息封装 producerRecord <byte [] ، byte []> pr = new producerrecord <byte [] ، byte []> (الموضوع ، المفتاح ، القيمة) ؛ // 发送数据 kp.send (pr ، new callback () {// 回调函数 override public void onCompletion (RecordMetAdata metadata ، استثناء استثناء) {if (null! = issustice) {system.out.println (" }) ؛ // 关闭 product kp.close () ؛ }} 4.Kafka 消费者 kafkaconsume :
package com.lijie.consumer ؛ import java.io.file ؛ import java.io.fileInputStream ؛ استيراد java.util.hashmap ؛ استيراد java.Util.list ؛ import java.util.map ؛ imporg com.lijie.pojo.user ؛ استيراد com.lijie.utils.jsonutils ؛ استيراد kafka.consumer.consumerconfig ؛ استيراد kafka.consumer.consumeriterator ؛ استيراد kafka.consumer.kafkastream kafka.Serializer.StringDecoder ؛ استيراد kafka.Utils.EverifiableProperties ؛ الطبقة العامة kafkaconsume {private final static string = "lijietest" ؛ خصائص الخصائص الثابتة الخاصة ؛ ثابت {properties = New Properties () ؛ string path = kafkaconsume.class.getResource ("/"). getFile (). toString () + "kafka.properties" ؛ حاول {fileInputStream fis = جديد fileInputStream (ملف جديد (مسار)) ؛ Properties.Load (FIS) ؛ } catch (استثناء e) {E.PrintStackTrace () ؛ }} / ** * 获取消息 * * Throws استثناء * / public void getMsg () يلقي الاستثناء {conflerconfig config = new exhinperconfig (خصائص) ؛ المستهلك المستهلك = kafka.consumer.consumer .CreateJavAconsumerConnector (config) ؛ خريطة <string ، integer> topicCountMap = new hashmap <string ، integer> () ؛ TopicCountMap.put (الموضوع ، عدد صحيح جديد (1)) ؛ stringDecoder keydecoder = new StringDecoder (جديد التحقق من properties ()) ؛ stringDecoder valuedEcoder = new StringDecoder (New DepivibleProperties ()) ؛ خريطة <string ، قائمة <kafkastream <string ، string >>> consumer = consumer .CreateMessageStreams (TopicCountMap ، keyDecoder ، valuedEcoder) ؛ kafkastream <string ، string> Stream = consumermap.get (topic) .get (0) ؛ ConsumerItatorator <string ، string> it = dream.iterator () ؛ بينما (it.hasnext ()) {string json = it.next (). message () ؛ مستخدم المستخدم = (المستخدم) jsonutils.jsontoObj (json ، user.class) ؛ System.out.println (user) ؛ }}} 5.Kafka.Properties 文件
## prodebootstrap.servers = 192.168.80.123: 9092Producer.type = syncrequest.required.acks = 1serializer.class = kafka.serializer.defaultenco derkey.serializer = org.apache.kafka.common.serialization.bytearrayserializervalue.serializer = org.apache.kafka.common.serialization.byt earrayserializerbak.partitioner.class = kafka.producer.defaultPartitionerBak.key.serializer = org.apache.kafka.common.serialization.stri ngserializerbak.value.serializer = org.apache.kafka.common.Serialization.StringSerializer ## streezookeeper.connect = 192.168.80.123: 2181 group.id = lijiegroup zookeeper.session.timeout.ms = 4000 zookeeper.sync.mt.ms = 200 Auto.Commit.Interval.ms = 1000 auto.offset.reset = أصغر serializer.class = kafka.serializer.stringencoderer
以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。