写了个 Kafka 的 Java Demo 顺便记录下 仅供参考 仅供参考
1. 创建 Maven 项目
目录如下 :
2.POM 文件 :
<Project xmlns = "http://maven.apache.org/pom/4.0.0" xmlns: xsi = "http://www.w3.org/2001/xmlschema-instance" xsi: schemalocation = "http://maven.apache.org/pom/4.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0. http://maven.apache.org/xsd/maven-4.0.0.xsd "> <modelVersion> 4.0.0 </modelversion> <MoupRupid> kafka-maven </proupid> <artifactid> kafka-maven </artifactid> <versión 0.0.1-snapshot </version> <shifiation> <fependency> <MoupRid> org.apache.kafka </groupid> <artifactid> kafka_2.11 </artifactid> <versión> 0.10.1.1 </versión> </pendency> <epardency> <moupid> org.apache.hadoop </groupid> <artifactid> hadoop-common </artifactid> <versersever> version> </versión> </</</versión> </dependence> <MoupRid> org.apache.hadoop </groupid> <artifactid> hadoop-hdfs </artifactid> <verserse> 2.2.0 </versewers> </dependency> <paperency> <grupoid> org.apache.hadoop </groupid> <artiFactid> hadoop-client </artifactid> <produce> 2.2.0 </version> </dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.3</version> </dependency> <dependency> <MoupRid> org.apache.hadoop </groupid> <artifactid> hadoop-hdfs </artifactid> <versions> 2.2.0 </versewers> </pendency> <pendency> <grupoid> jdk.tools </proupid> <artifactid> jdk.tools </arfactid> <veress> 1.7 </versión> <semo/scope </scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.3.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <MoupRid> org.apache.maven.plugins </groupid> <artifactid> maven-compiler-plugin </arfactid> <figuration> <overned> 1.7 </overing> <arget> 1.7 </artice> </figuration> </tugin> </glugins> </bract> </ject>> 3.kafka 生产者 kafkaproduce:
paquete com.lijie.producer; import java.io.file; import java.io.fileInputStream; import java.util.properties; import org.apache.kafka.clients.producer.callback; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerRecord; import org.apache.kafka.clients.producer.recordmetadata; import org.slf4j.logger; import org.slf4j.loggerFactory; clase pública kafkaproduce {propiedades estatales privadas propiedades; static {Properties = New Properties (); Ruta de cadena = kafkaproducer.class.getResource ("/"). GetFile (). ToString () + "kafka.properties"; intente {fileInputStream fis = new FileInputStream (nuevo archivo (PATH)); Properties.Load (FIS); } catch (Exception e) {E.PrintStackTrace (); }} / ** * 发送消息 * * @param topic * @param key * @param valor * / public void sendmsg (string topic, byte [] key, byte [] valor) {// 实例化 producir kafkaproducer <byte [], byte []> kp = new Kafkaproducer <byte [], byte []> (propiedades); // 消息封装 ProducerRecord <byte [], byte []> pr = new ProducerRecord <byte [], byte []> (tema, clave, valor); // 发送数据 kp.send (pr, new Callback () {// 回调函数 @Override public void onCompletion (RecordMetadata Metadata, Exception Exception) {if (null! = Exception) {System.out.println ("记录的 offset 在:" + metadata.offset ()); system.out.println (excepcion.getMessage () + excepción); // 关闭 producir kp.close (); }} 4.Kafka 消费者 Kafkaconsume:
paquete com.lijie.consumer; import java.io.file; import java.io.fileInputStream; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.producties; import org.apache.htrace.fasterxml.jackson.databin com.lijie.pojo.user; import com.lijie.utils.jsonutils; import kafka.consumer.consumerconfig; import kafka.consumer.consumerIterator; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnectornector; kafka.serializer.stringdecoder; import kafka.utils.verificableProperties; public class kafkaconsume {string static final privado topic = "liJietest"; Propiedades de propiedades estáticas privadas; static {Properties = New Properties (); Ruta de cadena = kafkaconsume.class.getResource ("/"). GetFile (). ToString () + "kafka.properties"; intente {fileInputStream fis = new FileInputStream (nuevo archivo (PATH)); Properties.Load (FIS); } catch (Exception e) {E.PrintStackTrace (); }} / ** * 获取消息 * * @throws Exception * / public void getMsg () lanza la excepción {ConsumeConfig config = new ConsumeConfig (propiedades); ConsumerConnector Consumer = kafka.consumer.consumer .createJavaconsumerconnector (config); Map <string, integer> topicCountMap = new HashMap <String, Integer> (); TopicCountMap.put (tema, nuevo entero (1)); StringDecoder keyDecoder = new StringDecoder (new VerifableProperties ()); StringDecoder ValueDeCoder = new StringDecoder (new VerifableProperties ()); Map <string, list <kafkastream <string, string >>> consumermap = consumidor .createMessageStreams (topicCountMap, keyDecoder, ValueDeCoder); Kafkastream <string, string> stream = consumermap.get (topic) .get (0); ConsumerIterator <string, string> it = stream.iterator (); while (it.hasnext ()) {string json = it.next (). Message (); Usuario user = (usuario) jsonutils.jsontoobj (json, user.class); System.out.println (usuario); }}} 5.Kafka. Propiedades 文件
## producoBootstrap.servers = 192.168.80.123: 9092producer.type = syncrequest.required.acks = 1serializer.class = kafka.serializer.defaultenco derkey.serializer = org.apache.kafka.common.serialization.byTearraySerializervalue.serializer = org.apache.kafka.common.serialization.byt earrayserializerbak.partitioner.class = kafka.producer.defaultPartitionerbak.key.serializer = org.apache.kafka.common.serialization.stri ngserializerbak.value.serializer = org.apache.kafka.common.serialization.stringserializer ## consumozookeepers.connect = 192.168.80.123: 2181 Group.id = liJieGroup Zookeeper.Session.TimeOut.Ms = 4000 ZOOKEEPER.SYNC.TIME.MS = 200 Auto.commit.Interval.Ms = 1000 Auto.Offset.Reset = Smallest Serializer.Class = Kafka.Serializer.StringEncoder
以上就是本文的全部内容 , 希望对大家的学习有所帮助 也希望大家多多支持武林网。 也希望大家多多支持武林网。