Kafka ka Java Demo 顺便记录下, 仅供参考
1. 项目 Maven 创建
目录如下 :
2. POM : :
<project xmlns = "http://maven.apache.org/pom/4.0.0"xmlns : xsi = "http://www.w3.org/2001/xmlschema-instance"xsi : schemalocation = "http://maven.apache.org/pom/0.0.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd "> <modelversion> 4.0.0 </modelversion> <groupid> kafka-maven </groupid> <artifactid> kafka-maven </artifactid> <version> 0.0.1-snapshot> <-snapshot> <-snapshot> <groupid> org.apache.kafka </groupid> <artifactid> kafka_2.11 </artifactid> <bersion> 0.10.1.1 </version> </dependency> <prectionency> <groupid> org.apache.hadoop </groupid> <artifactid> hadoop-common </artifactid> 2.0 </version> 2.0 </version> 2.0 </version> 2.0 <groupid> org.apache.hadoop </groupid> <artifactid> hadoop-hdfs </artifactid> <bersion> 2.2.0 </version> </fectionency> <groupidency> <groupid> org.apache.hadoop </groupid> <artifactid> hadoop-client </version> </version> </dependency> </version> </version> </version> <groupId> org.apache.hbase </groupid> <artifactid> hbase-client </artifactid> version> 1.0.3 </version> </fectionency> <groupidency> <groupid> org.apache.hbase </groupid> <artifactid> hbase-server </arevelency> 1.0.3 </version> 1.0.3 </version> </version> 1.0.3 <groupid> org.apache.hadoop </groupid> <artifactid> hadoop-hdfs </artifactid> <bersion> 2.2.0 </version> </dependency> <pectionement> <groupid> jdk.tools </groupid> <tritifactid> jdk.tools </artifactid> <7 </versic> </version> </systo> </scope> <SystemPath> $ {java_home} /lib/tools.jar </systempath> </dependency> <pectionement> <groupid> org.apache.httpcomponents </groupid> <artifactid> httpclient </artifactid> <6 </version </dependency </flugins> <build> <build> <groupid> org.apache.maven.plugins </groupid> <artifactid> maven-compiler-plugin </artifactid> <configuration> <source> </source> <garget> 1.7 </target> </configuration> </plugin> </plugins> </build> </project> 3. Kafka 生产者 kafkaproction :
package com.lijie.producer; import java.io.file; import java.io.fileInputStream; import java.util.properties; import org.apache.kafka.clients.callback; import org.apache.kafka.clients.producer.kafkapprocer; 수입 org.apache.kafka.clients.producer.produceRecord; import org.apache.kafka.clients.recordmetadata; import org.slf4j.logger; import org.slf4j.loggerfactory; public kafkaprodrice {개인 정적 특성 특성 특성; static {properties = new Properties (); 문자열 path = kafkaproducer.class.getResource ( "/"). getFile (). toString () + "kafka.properties"; try {fileInputStream fis = new FileInputStream (새 파일 (path)); 속성 (FIS); } catch (예외 e) {e.printstacktrace (); }} / ** * * * @param key * @param key * @param value * / public void sendmsg (문자열 topic, byte [] key, byte [] value) {// 实例化 kafkaproducer <byte [], byte []> kp = new kafkaproducer <byte [], byte []; // produceRrecord <byte [], byte []> pr = new ProducerRecord <byte [], byte []> (주제, 키, 값); // 发送数据 kp.send (pr, new Callback () {// 回调函数 @override public void oncompletion (recordmetadata metadata, Exception Exception) {if (null! = Exception) {system.out.println ( "记录的 오프셋 在 :" + metadata.offset ()); println (exception.gegage ())); // 关闭 생산 KP.Close (); }} 4. Kafka 消费者 Kafkaconsume :
package com.lijie.consumer; import java.io.file; import java.io.fileInputStream; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties; import org.apache.htrace.fasterx.jackson.jackson.jackson.jackson com.lijie.pojo.user; import com.lijie.utils.jsonutils; import kafka.consumer.consumerconfig; import kafka.consumer.consumeriterator; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumornector; kafka.serializer.stringdecoder; import kafka.utils.verifiableProperties; public class kafkaconsume {private final static string topic = "lijietest"; 개인 정적 특성 특성; static {properties = new Properties (); 문자열 path = kafkaconsume.class.getResource ( "/"). getFile (). toString () + "kafka.properties"; try {fileInputStream fis = new FileInputStream (새 파일 (path)); 속성 (FIS); } catch (예외 e) {e.printstacktrace (); }} / ** * * * * @throws Exception * / public void getmsg () throws exception {ConsiderConfig config = new ConsiderConfig (Properties); 소비자 소비자 = kafka.consumer.consumer .createjavaconsumerconnector (config); Map <String, integer> topiccountmap = new Hashmap <String, integer> (); TopicCountMap.put (Topic, New Integer (1)); StringDecoder keydecoder = new StringDecoder (new verifibleProperties ()); StringDecoder ValueDecoder = new StringDecoder (new verifibleProperties ()); map <string, list <kafkastream <string, string >>> consumermap = consumer .createmessagestreams (topiccountmap, keydecoder, valuedecoder); kafkastream <string, String> stream = ConsiperMap.get (Topic) .get (0); Consumeriterator <String, String> it = stream.iterator (); while (it.hasnext ()) {string json = it.next (). message (); 사용자 user = (사용자) jsonutils.jsontoobj (json, user.class); System.out.println (사용자); }}} 5. Kafka. Properties 文件
## producebootstrap.servers = 192.168.80.123 : 9092Producer.Type = syncRequest.Required.acks = 1serializer.class = kafka.serializer.defaultenco derkey.serializer = org.apache.kafka.common.serialization.bytearrayserializervalue.serializer = org.apache.kafka.common.serialization.byt Earrayserializerbak.partititioner.class = kafka.producer.defaultpartitionerbak.key.serializer = org.apache.kafka.common.serialization.stri ngserializerbak.value.serializer = org.apache.kafka.common.serialization.stringserializer ## coomzookeeper.connect = 192.168.80.123 : 2181 group.id = lijiegroup Zookeeper.session.timeout.ms = 4000 Zookeeper.sync.time.ms = 200 auto.commit.interval.ms = 1000 auto.offset.reset = littleest serializer.class = kafka.serializer.stringencoder
以上就是本文的全部内容 以上就是本文的全部内容, 希望对大家的学习有所帮助, 也希望大家多多支持武林网。