1. Maven 依赖包
<dependency> <proupId> org.apache.kafka </rompupid> <letifactive> kafka-clients </ artifactid> <version> 0.9.0.1 </-version> </pependance>
2. 生产者代码
package com.lnho.example.kafka; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producer; import org.apache.kafka.clients.producer.producercord; import java.util.properties; classe publique kafkaproducerexample {public static void main (String [] args) {Properties props = new Properties (); propuls.put ("bootstrap.servers", "Master: 9092"); propuls.put ("acks", "all"); Prophes.put ("Retries", 0); Prophes.put ("Batch.Size", 16384); Prophes.put ("Linger.ms", 1); Prophes.put ("Buffer.Memory", 33554432); props.put ("key.serializer", "org.apache.kafka.common.serialization.stringSerializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.stringSerializer"); Producteur <string, string> producer = new kafkaproducer <> (accessoires); pour (int i = 0; i <100; i ++) producer.send (new ProduceRecord <> ("topic1", Integer.ToString (i), Integer.ToString (i))); producteur.close (); }}3. 消费者代码
package com.lnho.example.kafka; import org.apache.kafka.clients.consumer.consumerCord; import org.apache.kafka.clients.consumer.consumeRecords; import org.apache.kafka.clients.consumer.kafkaconsumer; import java.util.arrays; import java.util.properties; classe publique kafkaconsumerexample {public static void main (String [] args) {Properties props = new Properties (); propuls.put ("bootstrap.servers", "Master: 9092"); propuls.put ("groupe.id", "test"); props.put ("activer.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("Session.Timeout.ms", "30000"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); Kafkaconsumer <string, string> Consumer = new Kafkaconsumer <> (accessoires); Consumer.Subscribe (arrays.aslist ("topic1")); while (true) {ConsumerCords <String, String> enregistres = Consumer.Poll (100); pour (ConsumerCord <String, String> enregistre: enregistrements) System.out.printf ("offset =% d, key =% s, value =% s / n", enregistre.offset (), enregistre.key (), enregistre.Value ()); }}}以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。