1. Maven 依赖包
<dependency> <groupid> org.apache.kafka </groupid> <artifactid> kafka-clients </artifactid> <sersive> 0.9.0.1 </version> </dependency>
2. 生产者代码
пакет com.lnho.example.kafka; Импорт org.apache.kafka.clients.producer.kafkaproducer; Импорт org.apache.kafka.clients.producer.producer; Импорт org.apache.kafka.clients.producer.producerRecord; импортировать java.util.properties; открытый класс kafkaproducerexample {public static void main (string [] args) {свойства props = new Properties (); props.put ("bootstrap.servers", "Master: 9092"); props.put ("Acks", "All"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("linger.ms", 1); props.put ("buffer.memory", 335544432); props.put ("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); Производитель <String, String> Preduceer = new Kafkaproducer <> (ops); для (int i = 0; i <100; i ++) продюсер.send (new ProducterRecord <> ("topic1", Integer.toString (i), Integer.toString (i))); Производитель.close (); }}3. 消费者代码
пакет com.lnho.example.kafka; Импорт org.apache.kafka.clients.consumer.consumerRecord; Импорт org.apache.kafka.clients.consumer.consemerRecords; Импорт org.apache.kafka.clients.consumer.kafkaconsumer; импортировать java.util.arrays; импортировать java.util.properties; открытый класс kafkaconsumerexample {public static void main (string [] args) {свойства props = new Properties (); props.put ("bootstrap.servers", "Master: 9092"); props.put ("group.id", "test"); props.put ("enable.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); Kafkaconsumer <string, string> consumer = new kafkaconsumer <> (ops); Consumer.SubScribe (arrays.aslist ("topic1")); while (true) {consumerRecords <string, string> records = consumer.poll (100); for (ConsumerErcord <String, String> record: records) system.out.printf ("offset = %d, key = %s, значение = %s/n", record.offset (), record.key (), record.value ()); }}}以上就是本文的全部内容 , 希望对大家的学习有所帮助 也希望大家多多支持武林网。