写了个 Kafka 的 Demo Java 顺便记录下 , 仅供参考
1. 创建 maven 项目
:
2.POM 文件 :
<project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema-instance" xsi: schemalation = "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd "> <ModelVersion> 4.0.0 </ ModelVersion> <ProupId> Kafka-Maven </rom groupem <GroupId> org.apache.kafka </rom grouped> <Artifactid> Kafka_2.11 </lefactive> <version> 0.10.1.1 </-version> </ Dependency> <Dependency> <ProupId> org.apache.hadoop </prouventid> <ArtifActid> Hadoop-Common </ ArtifActid> <version> 2.2.2.0 </ version> <GroupId> org.apache.hadoop </rompuprid> <Artifactid> hadoop-hdfs </ artifactive> <version> 2.2.0 </ version> </ Dependency> <Dedency> <ProupId> org.apache.hadoop </prouvendid> <prifactid> Hadoop-Client </ Artifactid> <version> 2.2.0 </ version> </Dependance> <GroupId> org.apache.hbase </rompuprid> <Artifactid> hbase-client </ artifactive> <version> 1.0.3 </ version> </Dependency> <Dedency> <ProupID> org.apache.hbase </proupId> <Artifactid> HBase-Server </Retifactid> <version> 1.0.3 </ version> </Dedenrence> <GroupId> org.apache.hadoop </rompuprid> <Artifactid> hadoop-hdfs </ artifactid> <version> 2.2.0 </prewing> </ Dependency> <Dendency> <proupId> jdk.tools </prouverID> <Artifactid> jdk.tools </ artifactid> <version> 1.7 </preffe <SystemPath> $ {java_home} /lib/tools.jar </systempath> </dependency> <dependency> <proupId> org.apache.httpcomponents </rom groupeid> <ArtifActid> httpclient </ artifactid> <version> 4.3.6 </priven> </dependency> </péducies> <GroupId> org.apache.maven.plugins </rompupid> <letifactive> maven-compiler-plugin </ artifactive> <figuration> <source> 1.7 </ource> <parget> 1.7 </arget> </figion> </ plugin> </glugins> </uild> </rotch> 3.Kafka 生产者 Kafkaproduce:
package com.lijie.producer; import java.io.file; import java.io.fileinputStream; import java.util.properties; import org.apache.kafka.clients.producer.callback; import org.apache.kafka.clients.producer.kafkaproducer; Importer; import; org.apache.kafka.clients.producer.producercord; import org.apache.kafka.clients.producer.recordmetadata; import org.slf4j.logger; import org.slf4j.loggerfactory; classe publique kafkaproduce {Private Static Properties Properties Properties; statique {Properties = new Properties (); String path = kafkaproducer.class.getResource ("/"). GetFile (). ToString () + "kafka.properties"; try {fileInputStream fis = new FileInputStream (nouveau fichier (path)); Properties.load (FIS); } catch (exception e) {e.printStackTrace (); }} / ** * 发送消息 * * @param Topic * @param key * @param value * / public void sendmsg (String topic, byte [] key, byte [] value) {// 实例化 produce kafkaproduner <byte [], byte []> kp = new kafkaproducer <byte [], byte []>> (properties); // 消息封装 productorRecord <byte [], byte []> pr = new produceRecord <byte [], byte []> (thème, key, valeur); // 发送数据 kp.send (pr, new callback () {// 回调函数 @Override public void onCompletion (Metadata RecordMetadata, exception exception) {if (null! = Exception) {System.out.println ("记录的 offset 在:" + métadata.offset ()); System.out.Println (exception.getMessage () +); }); // 关闭 produire kp.close (); }} 4.Kafka 消费者 Kafkaconsume:
package com.lijie.consumer; import java.io.file; import java.io.fileinputStream; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties; import org.apache.htrace.fasterxml.jackson.databind. com.lijie.pojo.user; import com.lijie.utils.jsonutils; import kafka.consumer.consumerConfig; import kafka.consumer.consumeriterator; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnector; kafka.serializer.stringdecoder; import kafka.utils.verifiableProperties; public class kafkaconsume {private final static string topic = "lijiest"; Propriétés statiques privées Propriétés; statique {Properties = new Properties (); String path = kafkaconsume.class.getResource ("/"). GetFile (). ToString () + "kafka.properties"; try {fileInputStream fis = new FileInputStream (nouveau fichier (path)); Properties.load (FIS); } catch (exception e) {e.printStackTrace (); }} / ** * 获取消息 * * @Throws Exception * / public void getmsg () lève une exception {ConsumerConfig config = new ConsumerConfig (Properties); ConsumerConnector Consumer = kafka.consumer.consumer .CreateJavaconsumerConnector (config); Map <String, Integer> topicCountMap = new HashMap <String, Integer> (); topicCountMap.put (sujet, nouvel entier (1)); StringDecOder KeyDecOder = new StringDecOder (new VerifiableProperties ()); StringDecOder ValueDeDecoder = new StringDecOder (new VerifiableProperties ()); Map <string, list <kafkastream <string, string >>> ConsumerMap = Consumer .CreateMessageStreams (topicCountMap, keydecoder, valaledecoder); Kafkastream <string, string> stream = ConsumerMap.get (topic) .get (0); ConsumerIterator <String, String> it = Stream.Iterator (); while (it.hasnext ()) {string json = it.next (). message (); User user = (utilisateur) jsonUtils.jSontoobj (json, user.class); System.out.println (utilisateur); }}} 5.Kafka.properties 文件
## producebootstrap.servers = 192.168.80.123: 9092Producer.Type = SyncRequest.Required.acks = 1Serializer.class = kafka.serializer.defaultencoco Derkey.Serializer = org.apache.kafka.common.serialization.ByTearRaySerializervalue.serializer = org.apache.kafka.common.serialization.byt EarRaySerializerbak.Partitioner.class = kafka.producer.defaultPartitionerBak.key.serializer = org.apache.kafka.common.serialization.stri ngserializerbak.value.serializer = org.apache.kafka.common.serialization.stringSerializer ## ConsumezooKeeper.Connect = 192.168.80.123: 2181 group.id = lijiegroup zookeeper.session.timeout.ms = 4000 zookeeper.sync.time.ms = 200 auto.commit.interval.ms = 1000 auto.offset.reset = Smallst Serializer.class = kafka.serializer.stRengeNcoder
以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。