1. مافن 依赖包
<Rependency> <roupeD> org.apache.kafka </rougiD> <StifactId> kafka-clients </artifactid> <sophy> 0.9.0.1 </version> </sependency>
2. 生产者代码
حزمة com.lnho.example.kafka ؛ استيراد org.apache.kafka.clients.producer.kafkaproducer ؛ استيراد org.apache.kafka.clients.producer.producer ؛ استيراد org.apache.kafka.clients.producer.producerRecord ؛ استيراد java.util.properties ؛ الفئة العامة kafkaproducerexample {public static void main (string [] args) {properties props = new properties () ؛ props.put ("bootstrap.servers" ، "Master: 9092") ؛ props.put ("Acks" ، "All") ؛ props.put ("Retries" ، 0) ؛ props.put ("batch.size" ، 16384) ؛ props.put ("Linger.ms" ، 1) ؛ props.put ("Buffer.Memory" ، 33554432) ؛ props.put ("key.serializer" ، "org.apache.kafka.common.Serialization.StringSerializer") ؛ props.put ("value.serializer" ، "org.apache.kafka.common.Serialization.StringSerializer") ؛ المنتج <string ، string> producer = new Kafkaproducer <> (الدعائم) ؛ لـ (int i = 0 ؛ i <100 ؛ i ++) producer.send (new producerrecord <> ("topic1" ، integer.toString (i) ، integer.tostring (i))) ؛ producer.close () ؛ }}3. 消费者代码
حزمة com.lnho.example.kafka ؛ استيراد org.apache.kafka.clients.consumer.consumerRecord ؛ استيراد org.apache.kafka.clients.consumer.consumerRecords ؛ استيراد org.apache.kafka.clients.consumer.kafkaconsumer ؛ استيراد java.util.arrays ؛ استيراد java.util.properties ؛ الفئة العامة kafkaconsumerexample {public static void main (string [] args) {properties props = new properties () ؛ props.put ("bootstrap.servers" ، "Master: 9092") ؛ props.put ("group.id" ، "test") ؛ props.put ("enable.auto.commit" ، "true") ؛ props.put ("Auto.Commit.Interval.ms" ، "1000") ؛ props.put ("session.timeout.ms" ، "30000") ؛ props.put ("key.deserializer" ، "org.apache.kafka.common.Serialization.StringDeserializer") ؛ props.put ("value.deserializer" ، "org.apache.kafka.common.Serialization.StringDeserializer") ؛ kafkaconsumer <string ، string> consumer = new Kafkaconsumer <> (الدعائم) ؛ المستهلك. بينما (صواب) {consumerRecords <string ، string> records = consumer.poll (100) ؛ لـ (consumerRecord <String ، String> Record: Records) System.out.printf ("Offset = ٪ d ، key = ٪ s ، value = ٪ s/n" ، record.offset () ، record.key () ، record.value ()) ؛ }}}以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。