1。メイブン依赖包
<Dependency> groupId> org.apache.kafka </groupid> <artifactid> kafka-clients </artifactid> <バージョン> 0.9.0.1 </version> </dependency>
2。
パッケージcom.lnho.example.kafka; org.apache.kafka.clients.producer.kafkaproducerをインポートします。 org.apache.kafka.clients.producer.producerをインポートします。 org.apache.kafka.clients.producer.producerrecordをインポートします。 java.util.propertiesをインポートします。パブリッククラスkafkaproducerexample {public static void main(string [] args){properties props = new Properties(); props.put( "Bootstrap.Servers"、 "Master:9092"); props.put( "acks"、 "all"); props.put( "retries"、0); props.put( "batch.size"、16384); props.put( "linger.ms"、1); props.put( "buffer.memory"、33554432); props.put( "key.serializer"、 "org.apache.kafka.common.serialization.stringserializer"); props.put( "value.serializer"、 "org.apache.kafka.common.serialization.stringserializer");プロデューサー<string、string> producer = new kafkaproducer <>(props); for(int i = 0; i <100; i ++)producer.send(new produceRecord <>( "topic1"、integer.tostring(i)、integer.tostring(i)); producer.close(); }}3。
パッケージcom.lnho.example.kafka; org.apache.kafka.clients.consumer.consumerrecordをインポートします。 org.apache.kafka.clients.consumer.consumerrecordsをインポートします。 org.apache.kafka.clients.consumer.kafkaconsumerをインポートします。 java.util.arraysをインポートします。 java.util.propertiesをインポートします。パブリッククラスKafkaconsumerexample {public static void main(string [] args){properties props = new Properties(); props.put( "Bootstrap.Servers"、 "Master:9092"); props.put( "group.id"、 "test"); props.put( "enable.auto.commit"、 "true"); props.put( "auto.commit.interval.ms"、 "1000"); props.put( "session.timeout.ms"、 "30000"); props.put( "key.deserializer"、 "org.apache.kafka.common.serialization.stringdeserializer"); props.put( "value.deserializer"、 "org.apache.kafka.common.serialization.stringdeserializer"); kafkaconsumer <string、string> consumer = new Kafkaconsumer <>(props); consumer.subscribe(arrays.aslist( "topic1")); while(true){consumerrecords <string、string> records = consumer.poll(100); for(counterrecord <string、string> record:record:record)system.out.printf( "offset =%d、key =%s、value =%s/n"、record.offset()、record.key()、rescord.value()); }}}以上就是本文的全部内容、希望对大家的学习有所帮助、也希望大家多多支持武林网。