1. Maven 依赖包
<Depencency> <PuerpId> org.apache.kafka </groupid> <ArtifactId> kafka-clients </artifactId> <versão> 0.9.0.1 </sisters> </dependency>
2. 生产者代码
pacote com.lnho.example.kafka; importar org.apache.kafka.clients.producer.kafkaproduces; importar org.apache.kafka.clients.producer.producer; importar org.apache.kafka.clients.producer.produCerRecord; importar java.util.properties; classe pública kafkaproducerexample {public static void main (string [] args) {Propriedades props = new Properties (); props.put ("bootstrap.servers", "mestre: 9092"); props.put ("acks", "all"); props.put ("tentativas", 0); props.put ("batch.size", 16384); props.put ("linger.ms", 1); props.put ("buffer.memory", 33554432); props.put ("key.serializer", "org.apache.kafka.common.serialization.stringSerializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.stringSerializer"); Produtor <String, String> Produtor = new Kafkaproduced <> (Props); for (int i = 0; i <100; i ++) produtora.send (novo ProductherRecord <> ("tópico1", Integer.toString (i), Integer.toString (i))); produtor.close (); }}3. 消费者代码
pacote com.lnho.example.kafka; importar org.apache.kafka.clients.consumer.consumerRecord; importar org.apache.kafka.clients.consumer.consumerRecords; importar org.apache.kafka.clients.consumer.kafkaconsumer; importar java.util.arrays; importar java.util.properties; classe pública kafkaconsumeRexample {public static void main (string [] args) {Propriedades props = new Properties (); props.put ("bootstrap.servers", "mestre: 9092"); props.put ("group.id", "teste"); props.put ("enable.auto.comit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.stringDeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.stringDeserializer"); Kafkaconsumer <string, string> consumer = new kafkaconsumer <> (adereços); consumer.subscribe (Arrays.asList ("tópico1")); while (true) {ConsumerRecords <string, string> registros = consumer.poll (100); para (ConsumerRecord <string, string> registros: registros) System.out.printf ("offset = %d, key = %s, value = %s/n", registro.offset (), registro.key (), registro.value ()); }}}以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。