1. Maven 依赖包
<การพึ่งพา> <roupId> org.apache.kafka </groupid> <ratifactid> kafka-clients </artifactid> <cersion> 0.9.0.1 </version>
2. 生产者代码
แพ็คเกจ com.lnho.example.kafka; นำเข้า org.apache.kafka.clients.producer.kafkaproducer; นำเข้า org.apache.kafka.clients.producer.producer; นำเข้า org.apache.kafka.clients.producer.producerrecord; นำเข้า java.util.properties; คลาสสาธารณะ kafkaproducerexample {โมฆะคงที่สาธารณะหลัก (สตริง [] args) {properties props = คุณสมบัติใหม่ (); props.put ("bootstrap.servers", "Master: 9092"); props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("linger.ms", 1); props.put ("buffer.memory", 33554432); props.put ("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); ผู้ผลิต <สตริง, สตริง> โปรดิวเซอร์ = ใหม่ kafkaproducer <> (อุปกรณ์ประกอบฉาก); สำหรับ (int i = 0; i <100; i ++) โปรดิวเซอร์ SEND (PROUPREECERRECORD ใหม่ <> ("TOPIC1", Integer.toString (I), Integer.ToString (i))); Producer.close (); -3. 消费者代码
แพ็คเกจ com.lnho.example.kafka; นำเข้า org.apache.kafka.clients.consumer.consumerrecord; นำเข้า org.apache.kafka.clients.consumer.consumerrecords; นำเข้า org.apache.kafka.clients.consumer.kafkaconsumer; นำเข้า Java.util.Arrays; นำเข้า java.util.properties; คลาสสาธารณะ kafkaconsumerexample {โมฆะคงที่สาธารณะหลัก (String [] args) {properties props = คุณสมบัติใหม่ (); props.put ("bootstrap.servers", "Master: 9092"); props.put ("group.id", "ทดสอบ"); props.put ("enable.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1,000"); props.put ("session.timeout.ms", "30000"); props.put ("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put ("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); Kafkaconsumer <String, String> Consumer = New Kafkaconsumer <> (อุปกรณ์ประกอบฉาก); consumer.subscribe (array.aslist ("Topic1")); ในขณะที่ (จริง) {consumerrecords <string, string> records = cumener.poll (100); สำหรับ (consumerrecord <string, string> records: records) system.out.printf ("Offset = %d, key = %s, value = %s/n", record.offset (), record.key (), record.value ()); -以上就是本文的全部内容, 希望对大家的学习有所帮助, 也希望大家多多支持武林网。