写了个 Kafka 的 Java Demo 顺便记录下, 仅供参考
1. 创建 maven 项目
目录如下:
2.pom 文件:
<Project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema-instance" xsi: schemalocation = "http:/MANCMMMMMMMMMMMMMMMMMMMMMMMMMMBM http://maven.apache.org/xsd/maven-4.0.0.xsd "> <moderVersion> 4.0.0 </modelversion> <moderversences> <s/modelversion> <RoupID> org.apache.kafka </groupId> <ratifactid> kafka_2.11 </artifactid> <version> 0.10.1.1 </version> </การพึ่งพา> <การพึ่งพา> <roupid> org.apache.hadoop </groupid> <RoupID> org.apache.hadoop </groupId> <ratifactid> hadoop-hdfs </artifactid> <version> 2.2.0 </เวอร์ชัน> </perdency> <pendency> <roupid> org.apache.hadoop </groupid> <RoupID> org.apache.hbase </groupId> <ratifactid> hbase-client </artifactid> <cersion> 1.0.3 </เวอร์ชัน> </การพึ่งพา> <การพึ่งพา> <roupid> org.apache.hbase </groupid> <ratifactid> hbase-server <RoupID> org.apache.hadoop </groupId> <ratifactid> hadoop-hdfs </artifactid> <version> 2.2.0 </เวอร์ชัน> </การพึ่งพา> <การพึ่งพา> <roupid> jdk.tools </groupid> <SystemPath> $ {java_home} /lib/tools.jar </systempath> </การพึ่งพา> <การพึ่งพา> <roupid> org.apache.httpComponents </groupId> <ArtifactId> httpClient </artifactid> <RoupID> org.apache.maven.plugins </groupid> <ratifactid> maven-compiler-plugin </artifactid> <การกำหนดค่า> <cornic 3.Kafka 生产者 Kafkaproduce:
แพ็คเกจ com.lijie.producer; นำเข้า java.io.file; นำเข้า java.io.fileinputstream; นำเข้า java.util.properties; นำเข้า org.apache.kafka.clients.producer.callback; นำเข้า org.apache.kafka.clients.producer.producerrecord; นำเข้า org.apache.kafka.clients.producer.recordmetadata; นำเข้า org.slf4j.logger; นำเข้า คงที่ {คุณสมบัติ = คุณสมบัติใหม่ (); String Path = Kafkaproducer.class.getResource ("/"). getFile (). toString () + "kafka.properties"; ลอง {FileInputStream FIS = ใหม่ FileInputStream (ไฟล์ใหม่ (พา ธ )); Properties.load (FIS); } catch (exception e) {e.printstacktrace (); }} / ** * 发送消息 * * @param หัวข้อ * @param key * @param value * / โมฆะสาธารณะ sendmsg (หัวข้อสตริง, byte [] key, byte [] value) {// 实例化ผลิต kafkaproducer <byte [], byte []> kp = new Kafkaproducer <byte // 消息封装 producerrecord <byte [], byte []> pr = producerrecord ใหม่ <byte [], byte []> (หัวข้อ, คีย์, ค่า); // 发送数据 kp.send (pr, การโทรกลับใหม่ () {// 回调函数 @Override โมฆะสาธารณะ oncompletion (RecordMetadata metadata, ข้อยกเว้น) {ถ้า (null! = ข้อยกเว้น) {system.out.println ("记录的 Offset 在:" + metadata. - // 关闭ผลิต kp.close (); - 4.Kafka 消费者 Kafkaconsume:
แพ็คเกจ com.lijie.consumer; นำเข้า java.io.file; นำเข้า java.io.fileinputstream; นำเข้า java.util.hashmap; นำเข้า java.util.list; นำเข้า java.util.map; นำเข้า java.util.properties; com.lijie.pojo.user; นำเข้า com.lijie.utils.jsonutils; นำเข้า kafka.consumer.consumerconfig; นำเข้า kafka.consumer.consumeritorator; นำเข้า kafka.consumer.kafkastream; kafka.serializer.stringdecoder; นำเข้า kafka.utils.verifiableproperties; คลาสสาธารณะ Kafkaconsume {ส่วนตัวสตริงคงสุดท้ายส่วนตัว = "lijietest"; คุณสมบัติคุณสมบัติคงที่ส่วนตัว; คงที่ {คุณสมบัติ = คุณสมบัติใหม่ (); String Path = kafkaconsume.class.getResource ("/"). getFile (). toString () + "kafka.properties"; ลอง {FileInputStream FIS = ใหม่ FileInputStream (ไฟล์ใหม่ (พา ธ )); Properties.load (FIS); } catch (exception e) {e.printstacktrace (); }} / ** * 获取消息 * * @throws Exception * / โมฆะสาธารณะ getMsg () พ่นข้อยกเว้น {consumerConfig config = ใหม่ consumerConfig (คุณสมบัติ); ผู้บริโภคผู้บริโภค = kafka.consumer.consumer .createJavaConsumerconnector (config); แผนที่ <สตริง, จำนวนเต็ม> topicCountMap = new hashmap <สตริง, จำนวนเต็ม> (); TopicCountMap.put (หัวข้อ, จำนวนเต็มใหม่ (1)); StringDecoder keyDecoder = new StringDecoder (ใหม่ VerifiableProperties ()); StringDecoder valuedecoder = new StringDecoder (ใหม่ VerifiableProperties ()); แผนที่ <string, รายการ <kafkastream <สตริง, สตริง >>> ผู้บริโภค = ผู้บริโภค kafkastream <string, string> stream = cumpermap.get (หัวข้อ) .get (0); ผู้ใช้งาน <สตริง, สตริง> it = stream.iterator (); ในขณะที่ (it.hasnext ()) {string json = it.next (). ข้อความ (); ผู้ใช้ user = (ผู้ใช้) jsonutils.jsontoobj (json, user.class); System.out.println (ผู้ใช้); - 5.Kafka.properties 文件
## ProduceBootstrap.Servers = 192.168.80.123: 9092Producer.type = syncrequest.Required.acks = 1Serializer.class = kafka.serializer.defaultenco derkey.serializer = org.apache.kafka.common.serialization.bytearrayserializervalue.serializer = org.apache.kafka.common.serialization.byt earrayserializerbak.partitioner.class = kafka.producer.defaultpartitionerbak.key.serializer = org.apache.kafka.common.serialization.stri ngserializerbak.value.serializer = org.apache.kafka.common.serialization.stringserializer ## Consumezookeeper.connect = 192.168.80.123: 2181 group.id = lijiegroup zookeeper.session.timeout.ms = 4000 zookeeper.sync.time.ms = 200 auto.commit.interval.ms = 1,000 auto.offset.reset = serializer ที่เล็กที่สุด
以上就是本文的全部内容, 希望对大家的学习有所帮助, 也希望大家多多支持武林网。