1. Maven en
<pectionency> <groupid> org.apache.kafka </groupid> <artifactid> kafka-clients </artifactid> <bersion> 0.9.0.1 </version> </fectionency>
2. 生产者代码
패키지 com.lnho.example.kafka; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producer; import org.apache.kafka.clients.producer.produceRecord; java.util.properties import; 공개 클래스 kafkaproduceerexample {public static void main (String [] args) {속성 propss = new Properties (); props.put ( "bootstrap.servers", "master : 9092"); props.put ( "acks", "all"); props.put ( "재시험", 0); props.put ( "batch.size", 16384); props.put ( "linger.ms", 1); props.put ( "buffer.memory", 33554432); props.put ( "key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put ( "value.serializer", "org.apache.kafka.common.serialization.stringserializer"); Producer <String, String> Producer = New Kafkaproducer <> (Props); for (int i = 0; i <100; i ++) producer.send (new ProducerRecord <> ( "topic1", integer.tostring (i), integer.tostring (i)); Producer.Close (); }}3. 消费者代码
패키지 com.lnho.example.kafka; import org.apache.kafka.clients.consumer.consumerRecord; import org.apache.kafka.clients.consumer.consumerRecords; import org.apache.kafka.clients.consumer.kafkaconsumer; import java.util.arrays; java.util.properties import; 공개 클래스 kafkaconsumerexample {public static void main (String [] args) {속성 propss = new Properties (); props.put ( "bootstrap.servers", "master : 9092"); props.put ( "group.id", "test"); props.put ( "enable.auto.commit", "true"); props.put ( "auto.commit.interval.ms", "1000"); props.put ( "session.timeout.ms", "30000"); props.put ( "key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put ( "value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); Kafkaconsumer <String, String> Consumer = New Kafkaconsumer <> (props); 소비자 .subscribe (arrays.aslist ( "topic1")); while (true) {ConsecerRecords <String, String> Records = Consumer.poll (100); for (consideRecord <string, string> record : records) system.out.printf ( "offset = %d, key = %s, value = %s/n", record.offset (), record.key (), record.value ()); }}}以上就是本文的全部内容 以上就是本文的全部内容, 希望对大家的学习有所帮助, 也希望大家多多支持武林网。