1
<依赖项> <groupId> org.apache.kafka </groupId> <Artifactid> kafka-clients </artifactid> </artifactid> <版本> 0.9.0.1 </version>
2。生产者代码
包com.lnho.example.kafka;导入org.apache.kafka.clients.producer.kafkaproducer;导入org.apache.kafka.clients.producer.producer;导入org.apache.kafka.clients.producer.producercord;导入java.util.properties; public类kafkaproducerexample {public static void main(string [] args){properties props = new properties(); props.put(“ Bootstrap.servers”,“ Master:9092”); props.put(“ acks”,“ all”); props.put(“ retries”,0); props.put(“ batch.size”,16384); props.put(“ linger.ms”,1); props.put(“ buffer.memory”,33554432); props.put(“键。 props.put(“ value.serializer”,org.apache.kafka.common.serialization.stringserializer');生产者<string,string> producer = new kafkaproducer <>(props); for(int i = 0; i <100; i ++)producer.Send(new ProducterRecord <>(“ topic1”,integer.tostring(i),integer.tostring(i))); producer.close(); }}}3。消费者代码
包com.lnho.example.kafka;导入org.apache.kafka.clients.consumer.consumerrecord;导入org.apache.kafka.clients.consumer.consumerrecords;导入org.apache.kafka.clients.consumer.kafkaconsumer;导入java.util.arrays;导入java.util.properties; public类kafkaconsumerexample {public static void main(string [] args){properties props = new properties(); props.put(“ Bootstrap.servers”,“ Master:9092”); props.put(“ group.id”,“ test”); props.put(“ enable.auto.commit”,“ true”); props.put(“ auto.commit.interval.ms”,“ 1000”); props.put(“ session.timeout.ms”,“ 30000”); props.put(“键。 props.put(“ value.deserializer”,“ org.apache.kafka.common.serialization.stringDeserializer”); kafkaconsumer <string,string> consumer = new kafkaconsumer <>(props); commuter.subscribe(arrays.aslist(“ topor1”)); while(true){commuterRecords <string,string> records = computer.poll(100); for(consumerRecord <string,字符串> record:record)system.out.printf(“ fortset =%d,key =%s,value =%s/n”,record.offset(),record.key(),record.value()); }}}}以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。,也希望大家多多支持武林网。