1. Maven 依赖包
<Spendency> <MoupRoD> org.apache.kafka </groupid> <artifactid> kafka-clients </artifactid> <versión> 0.9.0.1 </versión> </pendency>
2. 生产者代码
paquete com.lnho.example.kafka; importar org.apache.kafka.clients.producer.kafkaproducer; importar org.apache.kafka.clients.producer.producer; importar org.apache.kafka.clients.producer.producerRecord; import java.util.properties; clase pública kafkaproducerexample {public static void main (string [] args) {Properties props = new Properties (); propssput ("bootstrap.servers", "maestro: 9092"); propssput ("acks", "todos"); propssput ("reintentos", 0); props.put ("batch.size", 16384); props.put ("linger.ms", 1); props.put ("buffer.memory", 33554432); props.put ("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); Productor <String, String> Producer = new Kafkaproducer <> (props); para (int i = 0; i <100; i ++) productor.send (new ProducerRecord <> ("Topic1", Integer.ToString (i), Integer.ToString (i))); productor.close (); }}3. 消费者代码
paquete com.lnho.example.kafka; importar org.apache.kafka.clients.consumer.consumerRecord; importar org.apache.kafka.clients.consumer.consumerRecords; importar org.apache.kafka.clients.consumer.kafkaconsumer; importar java.util.arrays; import java.util.properties; clase pública kafkaconsumerexample {public static void main (string [] args) {Properties props = new Properties (); propssput ("bootstrap.servers", "maestro: 9092"); propssput ("group.id", "prueba"); propssput ("enable.auto.commit", "verdadero"); props.put ("auto.commit.interval.ms", "1000"); propssput ("session.timeout.ms", "30000"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); KafkAconsumer <String, String> Consumer = new KafkAconsumer <> (Props); Consumer.subscribe (Arrays.aslist ("Topic1")); while (true) {ConsumerRecords <String, String> Records = Consumer.Poll (100); para (ConsumerRecord <String, String> Record: Records) System.out.printf ("offset = %d, key = %s, valor = %s/n", registro.offset (), registro.key (), registro.value ()); }}}以上就是本文的全部内容 , 希望对大家的学习有所帮助 也希望大家多多支持武林网。 也希望大家多多支持武林网。