写了个 Kafka 的 Java Demo 顺便记录下 , 仅供参考 仅供参考
1. 创建 Maven 项目
目录如下 :
2.Pom 文件 :
<project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema-instance http://maven.apache.org/xsd/maven-4.0.0.xsd "> <modelVersion> 4.0.0 </modelversion> <gruppe> Kafka-maven </GroupID> <artifactId> Kafka-maven </artifactid> <version> 0.0.1-Snapshot </Version> </artifactid> 0.0.1-Snapshots </Version </artifactid> 0.0.1-Snapshots </Version </artifactid> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.2.0</version> </dependency> <dependency> <gruppeId> org.apache.hadoop </gruppeId> <artifactId> hadoop-hdfs </artifactId> <version> 2.2.0 </Version> </abhängig> <depecled> <GroupID> org.apache.hadoop </gruupid> <artifactid> hadoop-client </artifactid> </fusion> </artifactid> </fusion> </fusion> </artifactid> </fusion> </fusion> </artifactid> </fusion> </fusion> </artifactid> </fusion> </fusion> </artifactid> </fusion> </fusion> </artifactid> </fusion> </fusion> </artifactid> </fusion> </fusione <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7</version> <scope>system</scope> <SystemPath> $ {java_home} /lib/tools.jar </systemPath> </abhängig> <abhängigkeit> <GroupID> org.apache.httpcomponents </GroupID> <artifactID> httpclient </artifactID> <version> 4.3.6 </Version> </vl. <gruppeID> org.apache.maven.plugins </GroupId> <artifactId> maven-compiler-plugin </artifactId> <configuration> <quelle> 1.7 </soque> <target> 1.7 </target> </configuration> </plugin> </plugins> </bauen> </project> </project> </basiert> </project> </basiert> </project> </configuration> </bau> </project> 3.Kafka 生产者 Kafkaproduz:
Paket com.lijie.Producer; import Java.io.file; import Java.io.fileinputstream; Import Java.util.Properties; import org.apache.kafka.clients.Producer.Callback; org.apache.kafka.clients.Producer.Producerrecord; import org.apache.kafka.clients.Producer.RecordMetadata; import org.slf4j.logger; static {properties = new Properties (); String path = kafkaproducer.class.getResource ("/"). GetFile (). ToString () + "kafka.properties"; try {FileInputStream fis = new FileInputStream (neue Datei (Pfad)); Eigenschaften.load (FIS); } catch (Ausnahme e) {e.printstacktrace (); } } /** * 发送消息 * * @param topic * @param key * @param value */ public void sendMsg(String topic, byte[] key, byte[] value) { // 实例化produce KafkaProducer<byte[], byte[]> kp = new KafkaProducer<byte[], byte[]>( properties); // 消息封装 producereRrecord <byte [], byte []> pr = new proctratErrecord <byte [], byte []> (Thema, Schlüssel, Wert); // 发送数据 kp.send (pr, neu, callback () {// 回调函数 @Override public void oncompletion (RecordMetAdata metadata, Ausnahme) {if (null! }); // 关闭 produzieren kp.close (); }} 4.Kafka 消费者 Kafkaconsume:
Paket com.lijie.consumer; import Java.io.file; import Java.io.fileinputStream; Import Java.util.hashMap; Import Java.util.List; Import Java.util.map; com.lijie.pojo.user; import com.lijie.utils.jsonutils; import kafka.consumer.consumerconfig; kafka.consumer.consumeriterator; import kafka.consumer.kafkastream; Kafka.Serializer.StringDeCoder; Import kafka.utils.verififiableProperties; öffentliche Klasse Kafkaconsume {private endgültige statische String -Topie = "lijietest"; private statische Eigenschaften Eigenschaften; static {properties = new Properties (); String path = kafkaconsume.class.getResource ("/"). GetFile (). ToString () + "kafka.properties"; try {FileInputStream fis = new FileInputStream (neue Datei (Pfad)); Eigenschaften.load (FIS); } catch (Ausnahme e) {e.printstacktrace (); }} / ** * 获取消息 * * @throws Exception * / public void getmsg () löst Ausnahme aus {ConsumerConfig config = new ConsumerConfig (Eigenschaften); ConsumerConnector Consumer = kafka.consumer.consumer .createjavaconsumerConnector (config); Karte <String, Integer> TopicCountMap = new HashMap <String, Integer> (); topicCountmap.put (Thema, neue Ganzzahl (1)); StringDeCoder keyDeCoder = new StringDeCoder (new pur darüber, dass prüfproperties ()); StringDeCoder valueDeCoder = new StringDeCoder (new überprüfbareProperties ()); Karte <String, Liste <Kafkastream <String, String >>> Verbraucher = Verbraucher .CreateMessAgestreams (TopicCountMap, KeyDeCoder, ValueDeCoder); KafkaStream <String, String> Stream = ConsumerMap.get (Thema) .get (0); Consumeriterator <String, String> iT = Stream.Iderator (); while (it.hasNext ()) {String json = it.Next (). message (); User user = (user) jsonUtils.jsontoObj (json, user.class); System.out.println (Benutzer); }}} 5.Kafka.Properties 文件
## processbootstrap.servers = 192.168.80.123: 9092Producer.Type = Derkey.Serializer = org.apache.kafka.common.Serialization.BytearraySerializervalue.Serializer = org.apache.kafka.common.Serialization.Byt EarRayserializerbak.Partitioner.class = kafka.Producer.DefaultPartitionerbak.key.Serializer = org.apache.kafka.common.Serialization.stri NgSerializerbak.Value.Serializer = org.apache.kafka.common.Serialization.Stringserializer ## conneumzookeeper.connect = 192.168.80.123: 2181 Gruppe.id = lijieGroup zookeeper.Session.timeout.ms = 4000 Zookeeper.sync.time.ms = 200 Auto.Commit.Interval.ms = 1000 Auto.Offset.Reset = kleinste Serializer.class = kafka.Serializer.stringcoderCoderCoderscer.
以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。 也希望大家多多支持武林网。