寫了個kafka的java demo順便記錄下,僅供參考
1.創建
目錄如下:
2.POM文件:
<Project XMLNS =“ http://maven.apache.org/pom/4.0.0” xmlns:xsi =“ http://www.org/2001/xmlschema-chema-ingschema-ingschema-ingstance” http://maven.apache.org/xsd/maven-4.0.0.0.0.0.xsd“> <modelversion> 4.0.0 </modelversion> <groupId> kafka-maven> kafka-maven </groupId> <ARTIFACTID> <ARTIFACTID> <ARTIFACTID> kafka-kafka-maven-maven-maven </artifactid </atrifactid </artifactid> 0.0.0.1-snapshotency> 0.0.1-snapshotency> <groupId> org.apache.kafka </groupId> <Artifactid> kafka_2.11 </artifactid> <版本> <版本> 0.10.1.1 </distry> </dependency> <glativeency> <groudency> <groupID> org.apache.hadoop.hadoop </groupid> <groupId> org.apache.hadoop </groupId> <artifactid> hadifactid> hadoop-hdfs </artifactid> <版本> <版本> 2.2.0 </distrifency> </dependency> <dependency> <groupId> org.apache.hadoop.hadoop </groupId> <ARTIFACTID> <ARTIFACTID> <ARTIFACTID> <groupId> org.apache.hbase </groupId> <ARTIFACTID> hbase-client </artifactid> <版本> <版本> 1.0.3 </version> </deverency> <dependency> <groudency> <groupId> org.apache.hbase.hbase </groupid> <ARTIFACTID> <ARTIFACTID> <groupId> org.apache.hadoop </groupId> <ARTIFACTID> hadoop-hdfs </artifactid> <版本> <版本> 2.2.0 </distrifency> </dependency> <dependency> <groupId> <groupId> jdk.tools </groupId> <ARTIFACTID> <ARTIFACTID> jdk.took> jdk.tools> <systempath> $ {java_home}/lib/tools.jar </systempath> </deptionency> <dependency> <groudency> <groudId> org.apache.httpcomponents </groupId> <ARTIFACTID> <ARTIFACTID> <ARTTTPCLCCLCCLIENT </artifactid> httpclient </artifactid> </artifactid> <groupId> org.apache.maven.plugins </groupId> <Artifactid> maven-compiler-plugin </artifactid> <sufficuration> <source> <source> 1.7 </source> </source> </source> </source> </target> 1.7 </target> 1.7 </target> 3.Kafka生產者kafkaproduce:
package com.lijie.producer;import java.io.File;import java.io.FileInputStream;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.producercord; import org.apache.kafka.clients.producer.recordmetadata; import org.slf4j.logger; import org.slf4j.loggerfactory; static {properties = new Properties();字符串路徑= kafkaproducer.class.getResource(“/”)。 getfile()。 toString() +“ kafka.properties”;嘗試{fileInputStream fis = new FileInputStream(new File(path)); properties.load(FIS); } catch(異常E){e.printstacktrace(); }} / ** * * * * * @param主題 * @param鍵 * @param值 * / public void sendmsg(字符串主題,byte [] key,byte [] value){//實例化cafkaproducer <byte [],byte [],byte [],byte [],byte [] //消息封裝praforErcord <byte [],byte []> pr = new ProducerErcord <byte [],byte []>(主題,鍵,value); //發送數據kp.Send(pr,new callback(){// @ @ @override public void oncompletion(recordMetAdata gecomeData,異常){if(null!= exception){system.out.out.ut.println(println) }}}); //關閉產生kp.close(); }}} 4.Kafka消費者kafkaconsume:
package com.lijie.consumer;import java.io.File;import java.io.FileInputStream;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;import com.lijie.pojo.User;import com.lijie.utils.JsonUtils;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.stringdecoder; import kafka.utils.verifable properties;公共類kafkaconsume {私人最終靜態字符串主題=“ lijietest”;私有靜態屬性屬性; static {properties = new Properties();字符串路徑= kafkaconsume.class.getResource(“/”)。 getfile()。 toString() +“ kafka.properties”;嘗試{fileInputStream fis = new FileInputStream(new File(path)); properties.load(FIS); } catch(異常E){e.printstacktrace(); }} / ** * * * * * @throws異常 * / public void getmsg()拋出異常{complocererConfig config = new comploceerConfig(properties); computerConnector consumer = kafka.consumer.consumer.createjavaconsumerconnector(config); MAP <String,Integer> topicCountMap = new Hashmap <String,Integer>(); topiccountmap.put(主題,新整數(1)); StringDecoder keyDecoder = new StringDecoder(new Verifiable Properties()); StringDecoder valitueDecoder = new StringDecoder(new Verifiable Properties());映射<字符串,列表<kafkastream <string,string >>> consumerMap = consumer .CreateMessAgeStreams(topicCountMap,keyDecoder,dualudeCoder); kafkastream <string,string> strim = computermap.get(topic).get(0); consumpereriter <string,string> it = stream.iterator(); while(it.hasnext()){string json = it.next()。 message();用戶用戶=(用戶)jsonutils.jsontoobj(json,user.class); System.out.println(用戶); }}}} 5.kafka.properties文件
## ProductBootstrap.Servers = 192.168.80.123:9092Producer.Type = Syncrequest.required.acks = 1Serialializer.Class = kafka.serialializer.defaultenco derkey.serializer = org.apache.kafka.common.serialization.bytearrayserializervalue.serializer = org.apache.kafka.common.serialization.byt earrayserializerbak.partitioner.class = kafka.producer.defaultpartitionerbak.key.serializer = org.apache.kafka.common.serialization.stri ngserializerbak.value.serializer = org.apache.kafka.common.serialization.stringserializer ## inceumezookeeper.connect = 192.168.80.80.123:2181 group.id = lijiegroup zookeeper.session.timeout.ms = 4000 zookeeper.sync.time.ms = 200 auto.commit.commit.interval.ms = 1000 auto.offset.reset.reset.reset.reset.reset = small serializer.class.class = kafka = kafka.serializer.serializer.stringencoderer.stringencoderer.stringencoderer.stringencoderer.stringencoderer.stringencoderer.stringencoderer
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。 ,也希望大家多多支持武林網。