บทความนี้แนะนำวิธีการรวม Kafka ส่งและรับข้อความในโครงการ Springboot
Kafka เป็นระบบข้อความเผยแพร่ที่มีการกระจายข้อมูลสูงโดยมีลักษณะดังต่อไปนี้: ให้การคงอยู่ของข้อความผ่านโครงสร้างข้อมูลดิสก์ของ O (1) ซึ่งสามารถรักษาประสิทธิภาพที่มั่นคงเป็นเวลานานแม้ว่าการจัดเก็บข้อความจะเป็นเทราไบต์ ปริมาณงานที่สูง: แม้กระทั่งฮาร์ดแวร์ธรรมดามาก Kafka ก็สามารถรองรับข้อความหลายล้านข้อความต่อวินาที รองรับการแบ่งพาร์ติชันของข้อความผ่านเซิร์ฟเวอร์ Kafka และกลุ่มผู้บริโภค สนับสนุนการโหลดข้อมูลแบบขนาน Hadoop
ติดตั้ง kafka
เนื่องจากการติดตั้ง Kafka ต้องการการสนับสนุนของ Zookeeper เมื่อติดตั้ง Windows คุณต้องติดตั้ง Zookeeper ก่อนจากนั้นติดตั้ง Kafka ด้านล่างนี้ฉันจะให้ขั้นตอนในการติดตั้ง Mac และคะแนนเพื่อให้ความสนใจ การกำหนดค่าของ Windows แทบจะไม่แตกต่างกันยกเว้นสถานที่ต่าง ๆ
การติดตั้ง Kafka
ใช่มันง่ายมาก คุณสามารถจัดการกับคำสั่งบน Mac กระบวนการติดตั้งนี้อาจต้องใช้เวลาสักครู่และควรเกี่ยวข้องกับสถานะเครือข่าย อาจมีข้อความแสดงข้อผิดพลาดในข้อความพรอมต์การติดตั้งเช่น "ข้อผิดพลาด: ไม่สามารถเชื่อมโยง:/usr/local/share/doc/homebrew" สิ่งนี้ไม่สำคัญว่าจะถูกละเว้นโดยอัตโนมัติ ในที่สุดเราก็ประสบความสำเร็จเมื่อเราเห็นสิ่งที่อยู่ด้านล่าง
==> สรุปðÿº/usr/local/cellar/kafka/1.1.0: 157 ไฟล์, 47.8mb
ตำแหน่งไฟล์การกำหนดค่าการติดตั้งมีดังนี้เพียงแก้ไขหมายเลขพอร์ตตามความต้องการของคุณ
ติดตั้ง zoopeeper และ kafka location/usr/local/cellar/
ไฟล์การกำหนดค่า /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
เริ่ม zookeeper
คัดลอกรหัสดังนี้: ./ bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
เริ่ม Kafka
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
สร้างหัวข้อสำหรับ Kafka หัวข้อชื่อทดสอบ คุณสามารถกำหนดค่าเป็นชื่อที่คุณต้องการ ย้อนกลับและกำหนดค่าในรหัสอย่างถูกต้อง
คัดลอกรหัสดังต่อไปนี้: ./ bin/kafka-topics-Create-Zookeeper LocalHost: 2181-การจำลองแบบปัจจัย 1-พาร์ติชัน 1-การทดสอบหัวข้อ
1. แก้ปัญหาการพึ่งพาก่อน
เราจะไม่พูดถึงการพึ่งพาที่เกี่ยวข้องกับ Springboot การพึ่งพาที่เกี่ยวข้องกับ Kafka ขึ้นอยู่กับแพ็คเกจการรวม Spring-Kafka เท่านั้น
<Ederency> <roupID> org.springframework.kafka </groupid> <ratifactid> Spring-Kafka </artifactId> <version> 1.1.1.release </เวอร์ชัน>
ที่นี่เราจะแสดงไฟล์การกำหนดค่าก่อน
#=================== Kafka ====================================================== kafka.consumer.servers = 10.93.21.21: 2181kafka.consumer.enable.auto.Commit = Truekafka.consumer.session.timeout = 6000kafka.consumer.auto.Commit.Interval = 100kafka.consumer.consumer .reset = Failkafka.consumer.topic = testkafka.consumer.group.id = testkafka.consumer.concurrency = 10kafka.producer.servers = 10.9 3.21.21: 9092Kafka.producer.Retries = 0kafka.producer.batch.size = 4096kafka.producer.linger = 1kafka.producer.buffer.memory = 40960
2. การกำหนดค่า: Kafka Producer
1) ประกาศความสามารถในการกำหนดค่าและเปิด kafkatemplate ผ่าน @configuration และ @enablekafka
2) ฉีดการกำหนดค่า kafka ในแอปพลิเคชันไฟล์การกำหนดค่า properties ผ่าน @Value
3) สร้างถั่ว @bean
แพ็คเกจ com.kangaroo.sentinel.collect.configuration; นำเข้า java.util.hashmap; นำเข้า java.util.map; นำเข้า org.apache.kafka.clients.producer.producerconfig; org.springframework.beans.factory.annotation.value; นำเข้า org.springframework.context.annotation.bean; นำเข้า org.springframework.context.annotation.configuration; org.springframework.kafka.core.defaultkafkaproducerfactory นำเข้า org.springframework.kafka.core.kafkatemplate; นำเข้า org.springframework.kafka.core.producerfactory; @Value ("$ {kafka.producer.servers}") เซิร์ฟเวอร์สตริงส่วนตัว; @Value ("$ {kafka.producer.retries}") การตอบกลับแบบส่วนตัว @Value ("$ {kafka.producer.batch.size}") ส่วนตัว int batchsize; @Value ("$ {kafka.producer.linger}") int linger ส่วนตัว; @Value ("$ {kafka.producer.buffer.memory}") int buffermemory ส่วนตัว; แผนที่สาธารณะ <String, Object> ProducerConfigs () {MAP <String, Object> Props = new HashMap <> (); props.put (producterfig.bootstrap_servers_config, เซิร์ฟเวอร์); props.put (producerconfig.retries_config, retries); props.put (producererConfig.batch_size_config, batchsize); props.put (producerconfig.linger_ms_config, linger); props.put (producterfig.buffer_memory_config, buffermemory); props.put (producererConfig.key_serializer_class_config, stringserializer.class); props.put (producterfig.value_serializer_class_config, stringserializer.class); กลับอุปกรณ์ประกอบฉาก; } Public ProducerFactory <String, String> ProducerFactory () {ส่งคืนใหม่ defaultKafKapRoducerFactory <> (producerConfigs ()); } @Bean Public Kafkatemplate <String, String> Kafkatemplate () {ส่งคืน kafkatemplate ใหม่ <สตริง, สตริง> (ProducerFactory ()); -ทดลองผู้ผลิตของเราและเขียนคอนโทรลเลอร์ ต้องการหัวข้อ = ทดสอบคีย์ = คีย์ส่งข้อความ
แพ็คเกจ com.kangaroo.sentinel.collect.controller; นำเข้า com.kangaroo.sentinel.common.response.response; นำเข้า com.kangaroo.sentinel.common.response.resultcode; นำเข้า org.slf4j.logger; org.springframework.beans.factory.annotation.autowired; นำเข้า org.springframework.kafka.core.kafkatemplate; นำเข้า org.springframework.web.bind.annotation*; นำเข้า Javax.servlet.htt javax.servlet.http.httpservletResponse;@restcontroller@requestmapping ("/kafka") คลาสสาธารณะ collectcontroller {ป้องกัน logger final logger = loggerFactory.getLogger (this.getClass ()); @autowired ส่วนตัว kafkatemplate kafkatemplate; @RequestMapping (value = "/send", method = requestMethod.get) การตอบสนองสาธารณะ sendkafka (คำขอ httpservletRequest, การตอบกลับ httpservletResponse) {ลอง {string message = request.getParameter ("ข้อความ"); logger.info ("kafka message = {}", ข้อความ); kafkatemplate.send ("ทดสอบ", "key", ข้อความ); logger.info ("ส่ง Kafka สำเร็จ"); ส่งคืนการตอบกลับใหม่ (resultcode.success, "ส่ง Kafka สำเร็จ", null); } catch (Exception e) {logger.error ("ส่ง kafka ล้มเหลว", e); ส่งคืนการตอบกลับใหม่ (resultcode.exception, "ส่ง kafka ล้มเหลว", null); -3. การกำหนดค่า: ผู้บริโภค Kafka
1) ประกาศความสามารถในการกำหนดค่าและเปิด kafkatemplate ผ่าน @configuration และ @enablekafka
2) ฉีดการกำหนดค่า kafka ในแอปพลิเคชันไฟล์การกำหนดค่า properties ผ่าน @Value
3) สร้างถั่ว @bean
แพ็คเกจ com.kangaroo.sentinel.collect.configuration; นำเข้า org.apache.kafka.clients.consumer.consumerconfig; นำเข้า org.apache.kafka.common.serialization.stringdeserializer; org.springframework.context.annotation.bean; นำเข้า org.springframework.context.annotation.configuration; นำเข้า org.springframework.kafka.annotation.enablekafka; org.springframework.kafka.config.kafkalistenercontainerfactory นำเข้า org.springframework.kafka.core.consumerfactory นำเข้า org.springframework.kafka.core.defaultkafkaconsumerer org.springframework.kafka.listener.CurrentMessageListenerContainer; นำเข้า java.util.hashmap; นำเข้า java.util.map;@การกำหนดค่า@enablekafkapublic คลาส kafkaconsumerconfigig {@value @value ("$ {kafka.consumer.enable.auto.commit}") บูลีนส่วนตัว enableautocommit; @value ("$ {kafka.consumer.session.timeout}") สตริงส่วนตัวเซสชัน stesstimeout; @Value ("$ {kafka.consumer.auto.commit.interval}") สตริงส่วนตัว AutocommitInterval; @Value ("$ {kafka.consumer.group.id}") สตริงส่วนตัว GroupId; @Value ("$ {kafka.consumer.auto.offset.reset}") สตริงส่วนตัว autooffsetReset; @Value ("$ {kafka.consumer.concurrency}") การพร้อมกัน int ส่วนตัว; @bean สาธารณะ kafkalistenercontainerfactory <concurrentMessageListenerContainer <สตริง, สตริง >> kafkalistenercontainerFactory () {concurrentKafkalistenerContainerFactory <String, String> Factory = ใหม่ Factory.SetConsumerFactory (ConsumerFactory ()); Factory.SetConcurrency (พร้อมกัน); Factory.getContainerProperties (). SetPollTimeOut (1500); โรงงานกลับมา; } Public ConsumerFactory <String, String> ConsumerFactory () {ส่งคืนใหม่ defaultKafkAconsumerFactory <> (consumerConfigs ()); } แผนที่สาธารณะ <String, Object> ConsumerConFigs () {MAP <String, Object> propSmap = new HashMap <> (); propsmap.put (consumerconfig.bootstrap_servers_config, เซิร์ฟเวอร์); propsmap.put (consumerconfig.enable_auto_commit_config, enableautocommit); propsmap.put (consumerconfig.auto_commit_interval_ms_config, autocommitinterval); propsmap.put (consumerconfig.session_timeout_ms_config, sessiontimeout); propsmap.put (consumerconfig.key_deserializer_class_config, stringdeserializer.class); propsmap.put (consumerconfig.value_deserializer_class_config, stringdeserializer.class); propsmap.put (consumerconfig.group_id_config, GroupId); propsmap.put (consumerconfig.auto_offset_reset_config, autooffsetReset); กลับ propsmap; } @Bean Public Listener () {return New Listener (); -ใหม่ผู้ฟัง () สร้างถั่วเพื่อประมวลผลข้อมูลที่อ่านจาก Kafka การสาธิตการใช้งานอย่างง่ายของ Listener มีดังนี้เพียงแค่อ่านและพิมพ์ค่าคีย์และค่าข้อความ
แอตทริบิวต์หัวข้อของ @kafkalistener ใช้เพื่อระบุชื่อหัวข้อ Kafka ชื่อหัวข้อถูกระบุโดยผู้ผลิตข้อความนั่นคือมันถูกระบุโดย Kafkatemplate เมื่อส่งข้อความ
แพ็คเกจ com.kangaroo.sentinel.collect.configuration; นำเข้า org.apache.kafka.clients.consumer.consumerrecord; นำเข้า org.slf4j.logger; นำเข้า org.slf4j.loggerfactory; logger logger = loggerFactory.getLogger (this.getClass ()); @kafkalistener (หัวข้อ = {"ทดสอบ"}) โมฆะสาธารณะฟัง (comverrecord <?,?> บันทึก) {logger.info ("คีย์ของ Kafka:" + record.key ()); logger.info ("ค่าของ Kafka:" + record.value (). toString ()); -เคล็ดลับ:
1) ฉันไม่ได้แนะนำวิธีการติดตั้งและกำหนดค่า Kafka เป็นการดีที่สุดที่จะใช้ IP เครือข่ายที่มีการเชื่อมโยงอย่างสมบูรณ์เมื่อกำหนดค่า kafka มากกว่า localhost หรือ 127.0.0.1
2) เป็นการดีที่สุดที่จะไม่ใช้ Zookeeper ของ Kafka เพื่อปรับใช้ Kafka เนื่องจากอาจทำให้เข้าถึงการเข้าถึงได้
3) ในทางทฤษฎีผู้บริโภคควรอ่าน Kafka ผ่าน Zookeeper แต่ที่นี่เรากำลังใช้ที่อยู่ของ Kafkaserver ทำไมเราไม่เข้าไปในเชิงลึก?
4) เมื่อกำหนดการกำหนดค่าข้อความการตรวจสอบค่าของรายการการกำหนดค่า group_id_config จะใช้เพื่อระบุชื่อของกลุ่มผู้บริโภค หากมีวัตถุผู้ฟังหลายคนในกลุ่มเดียวกันวัตถุผู้ฟังเพียงคนเดียวเท่านั้นที่สามารถรับข้อความได้
ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่ามันจะเป็นประโยชน์ต่อการเรียนรู้ของทุกคนและฉันหวังว่าทุกคนจะสนับสนุน wulin.com มากขึ้น