1. Maven 依赖包
<dependency> <GroupId> org.apache.kafka </groupid> <ArtifactId> kafka-klien </stifactid> <version> 0.9.0.1 </version> </gandendency>
2. 生产者代码
paket com.lnho.example.kafka; impor org.apache.kafka.clients.producer.kafkaproducer; impor org.apache.kafka.clients.producer.producer; impor org.apache.kafka.clients.producer.producerrecord; impor java.util.properties; kelas publik kafkaproducerexample {public static void main (string [] args) {properties props = new properties (); props.put ("bootstrap.server", "master: 9092"); props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("linger.ms", 1); props.put ("buffer.memory", 33554432); props.put ("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); Produser <String, String> produser = kafkaproducer baru <> (props); untuk (int i = 0; i <100; i ++) produser.send (ProduceRecord baru <> ("Topic1", Integer.ToString (i), Integer.tostring (i))); produser.close (); }}3. 消费者代码
paket com.lnho.example.kafka; impor org.apache.kafka.clients.consumer.consumeRecord; impor org.apache.kafka.clients.consumer.consumeRecords; impor org.apache.kafka.clients.consumer.kafkaconsumer; impor java.util.arrays; impor java.util.properties; kelas publik kafkaconsumerexample {public static void main (string [] args) {properties props = new properties (); props.put ("bootstrap.server", "master: 9092"); props.put ("grup.id", "test"); props.put ("enable.auto.Commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); Kafkaconsumer <string, string> konsumen = kafkaconsumer baru <> (props); konsumen.subscribe (arrays.aslist ("Topic1")); while (true) {consumeRecords <string, string> records = consumer.poll (100); untuk (ConsumerRecord <String, String> Record: Records) System.out.printf ("Offset = %d, Key = %S, Value = %S/N", Record.Offset (), Record.Key (), Record.Value ()); }}}以上就是本文的全部内容 , 希望对大家的学习有所帮助 , 也希望大家多多支持武林网。