1. Maven 依赖包
<Depopenty> <gruppe> org.apache.kafka </Groupid> <artifactId> Kafka-Clients </artifactId> <version> 0.9.0.1 </Version> </abhängig>
2. 生产者代码
Paket com.lnho.example.kafka; import org.apache.kafka.clients.Producer.kafkaproducer; import org.apache.kafka.clients.Producer.Producer; import org.apache.kafka.clients.Producer.Producerrecord; Import Java.util.Properties; public class kafkaproducerexample {public static void main (String [] args) {Properties props = neue Eigenschaften (); props.put ("Bootstrap.servers", "Master: 9092"); props.put ("acks", "alles"); props.put ("returns", 0); props.put ("batch.size", 16384); props.put ("dinger.ms", 1); props.put ("buffer.memory", 33554432); props.put ("key.serializer", "org.apache.kafka.common.serialization.Stringserializer"); props.put ("value.serializer", "org.apache.kafka.common.Serialization.Stringserializer"); Produzent <String, String> Produzent = neuer KafkaproDucer <> (Requisiten); für (int i = 0; i <100; i ++) produzierer.send (new protocerrecord <> ("topic1", Integer.toString (i), Integer.toString (i))); Produzent.CLOSE (); }}3. 消费者代码
Paket com.lnho.example.kafka; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.kafkaconsumer; Import Java.util.Arrays; Import Java.util.Properties; public class kafkaconsumerexample {public static void main (String [] args) {Properties props = new Properties (); props.put ("Bootstrap.servers", "Master: 9092"); props.put ("Gruppe.id", "test"); props.put ("enable.auto.commit", "wahr"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.stringDeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.stringDeserializer"); Kafkaconsumer <String, String> Consumer = new Kafkaconsumer <> (Requisiten); Consumer.Subscribe (arrays.aslist ("topic1")); while (true) {condenterRecords <String, String> records = conbraucher.poll (100); für (condenterRecord <string, string> record: records) system.out.printf ("offset = %d, key = %s, value = %s/n", record.Offset (), record.key (), record.value ()); }}}以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。 也希望大家多多支持武林网。