写了个kafka
1. maven项目
目录如下:
2.POM:
<Project XMLNS = "http://maven.apache.org/pom/4.0.0" xmlns:xsi = "http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation = "http://maven.apach/4.0. http://maven.apache.org/xsd/maven-4.0.0.0.xsd "> <modelversion> 4.0.0 </modelversion> <groupid> kafka-maven </groupid> <artifactid> kafka-maven </artifactid> <バージョン<groupid> org.apache.kafka </groupid> <artifactid> kafka_2.11 </artifactid> <bersion> 0.10.1.1 </version> </dependency> <dependency> <groupid> org.apache.hadoop </groupid> <artifapid> <groupid> org.apache.hadoop </groupid> <artifactid> hadoop-hdfs </artifactid> <bersion> 2.2.0 </version> </dependency> <dependency> groupid> org.apache.hadoop </groupid> <artifactid> hadoop-client </artifactid> <.2.2.0 </</</</</</</< <groupid> org.apache.hbase </groupid> <artifactid> hbase-client </artifactid> <bersion> 1.0.3 </version> </dependency> <dependency> groupid> org.apache.hbase <groupid> org.apache.hadoop </groupid> <artifactid> hadoop-hdfs </artifactid> <version> 2.2.0 </version> </dependency> <dependency> <groupid> jdk.tools </groupid> <artifactid> jdk.tools </artifactid> <バージョン> </scop <SystemPath> $ {java_home} /lib/tools.jar </systempath> </dependency> <dependency> groupid> org.apache.httpcomponents> httpclient </artifactid> <バージョン> 4.3.6 </バージョン<GroupId> org.apache.maven.plugins </groupid> <artifactid> maven-compiler-plugin </artifactid> <configuration> 1.7 </source> <target> 1.7 </target> </configuration> </plugin> </blagins> </build> </bults> </build> </build> 3.Kafka生产者Kafkaproduce:
パッケージcom.lijie.producer; Import java.io.file; Import java.io.fileinputStream; Import java.util.properties; Import org.apache.kafka.clients.producer.callback; import org.apache.kafka.clients.producer.kafducer; apache.kafka.clients.producer.producerrecord; import org.apache.kafka.clients.producer.recordmetadata; Import org.slf4j.logger; Import org.slf4j.loggeractory; public class kafkaproduce {prive stistic propicties; static {properties = new Properties(); string path = kafkaproducer.class.getResource( "/")。getFile()。toString() + "kafka.properties"; try {fileInputStream fis = new fileInputStream(new file(path)); properties.load(fis); } catch(Exception e){e.printstacktrace(); }} / ** * * * @param Topic * @param key * @param value * / public void sendmsg(string topic、byte [] key、byte [] value){// //消息封装プロデューサーコード<byte []、byte []> pr = new producerecord <byte []、byte []>(topic、key、value); //发送数据kp.send(pr、new callback(){// }); // }} 4.Kafka消费者Kafkaconsume:
パッケージcom.lijie.consumer;インポートjava.io.file; import java.io.fileinputStream; Import java.util.hashmap; Import java.util.list; Import java.util.map; import java.util.porties; import org.apache.htrace.fatrace.fatrace.fatrace.fatrace.fatrace.fatrace.fatrace.fatrace.fasterxml.jackson. com.lijie.pojo.user; import com.lijie.utils.jsonutils; Import kafka.consumer.consumerconfig; Import kafka.consumer.consumeritorator; Import kafka.consumer.kafkastream; Import kafka.javaapi.consumer.consumercector kafka.serializer.stringdecoder; import kafka.utils.veriableProperties; public class kafkaconsume {private final static string topic = "lijietest";プライベート静的プロパティプロパティ。 static {properties = new Properties(); string path = kafkaconsume.class.getResource( "/")。getFile()。toString() + "kafka.properties"; try {fileInputStream fis = new fileInputStream(new file(path)); properties.load(fis); } catch(Exception e){e.printstacktrace(); }} / ** * * * * @Throws例外 * / public void getMSG()スロー例外{consumerConfig config = new ConsumerConfig(Properties); ConsumerConnector Consumer = Kafka.Consumer.Consumer .CreateJavaconsumerConnector(config); map <string、integer> topiccountmap = new Hashmap <String、integer>(); topiccountmap.put(topic、new Integer(1)); stringdecoder keydecoder = new StringDecoder(new verifiableProperties()); StringDecoder ValueDeCoder = new StringDecoder(new verifiableProperties()); Map <string、list <kafkastream <string、string >>> consumermap = consumer .createmessagestreams(topiccountmap、keydecoder、valuedeCoder); kafkastream <string、string> stream = consumermap.get(トピック).get(0); consumeriterator <string、string> it = stream.iterator(); while(it.hasnext()){string json = it.next()。message();ユーザーユーザー=(ユーザー)jsonutils.jsontoobj(json、user.class); System.out.println(user); }}} 5.Kafka.Properties文件
## producebootstrap.servers = 192.168.80.123:9092Producer.Type = syncRequest.Required.acks = 1Serializer.class = kafka.serializer.defaultenco derkey.serializer = org.apache.kafka.common.serialization.bytearrayserializervalue.serializer = org.apache.kafka.common.serialization.byt earrayserializebak.partitioner.class = kafka.producer.defaultpartitionerbak.key.key.serializer = org.apache.kafka.common.serialization.stri ngserializerbak.value.serializer = org.apache.kafka.common.serialization.stringserializer ## Consumezookeeper.connect = 192.168.80.123:2181 group.id = lijiegroup zookeeper.session.timeout.ms = 4000 zookeeper.sync.time.ms = 200 auto.commit.interval.ms = 1000 auto.offset.reset.class = kafka.serializer.stringencoder
以上就是本文的全部内容、希望对大家的学习有所帮助、也希望大家多多支持武林网。