คำนำ
ในบทความก่อนหน้านี้เราพูดคุยเกี่ยวกับวิธีการสร้างคลัสเตอร์ Kafka และบทความนี้พูดถึงวิธีการใช้ Kafka อย่างไรก็ตามเมื่อใช้ Kafka คุณควรเข้าใจ Kafka สั้น ๆ
รู้เบื้องต้นเกี่ยวกับ Kafka
Kafka เป็นระบบส่งข้อความการสมัครสมาชิกที่มีการกระจายความเร็วสูงซึ่งจัดการข้อมูลการไหลของแอ็คชั่นทั้งหมดในเว็บไซต์ผู้บริโภค
Kafka มีลักษณะดังต่อไปนี้:
ข้อกำหนด Kafka
Kafka Core API
Kafka มี API หลักสี่ตัว
ตัวอย่างไดอะแกรมมีดังนี้:
สถานการณ์แอปพลิเคชัน Kafka
สำหรับการแนะนำข้างต้นโปรดดูเอกสาร Kafka อย่างเป็นทางการ
การเตรียมการพัฒนา
หากเราต้องพัฒนาโปรแกรม Kafka เราควรทำอย่างไร?
ก่อนอื่นหลังจากสร้างสภาพแวดล้อม Kafka เราต้องพิจารณาว่าเราเป็นผู้ผลิตหรือผู้บริโภคนั่นคือผู้ส่งหรือผู้รับข้อความ
อย่างไรก็ตามในบทความนี้ทั้งผู้ผลิตและผู้บริโภคจะพัฒนาและอธิบาย
หลังจากความเข้าใจอย่างคร่าวๆของ Kafka เราจะพัฒนาโปรแกรมแรก
ภาษาการพัฒนาที่ใช้ที่นี่คือ Java เครื่องมือก่อสร้าง Maven
การพึ่งพาของ Maven มีดังนี้:
<การพึ่งพา> <roupId> org.apache.kafka </groupId> <ratifactid> kafka_2.12 </artifactid> <persion> 1.0.0 </version> <scope> ให้ </cope> <cersion> 1.0.0 </version> </derctency> <การพึ่งพา> <roupId> org.apache.kafka </groupId> <ratifactid> kafka-streams </artifactid>
ผู้ผลิตคาฟคา
ในระหว่างการพัฒนาและการผลิตขอแนะนำคำแนะนำการกำหนดค่าต่าง ๆ ของ Kafka สั้น ๆ :
-
มีการกำหนดค่ามากขึ้นคุณสามารถตรวจสอบเอกสารอย่างเป็นทางการซึ่งจะไม่อธิบายที่นี่
จากนั้นการกำหนดค่าผู้ผลิต Kafka ของเรามีดังนี้:
คุณสมบัติอุปกรณ์ประกอบฉาก = คุณสมบัติใหม่ (); props.put ("bootstrap.servers", "master: 9092, slave1: 9092, slave2: 9092"); props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringserializer.class.getName ()); props.put ("value.serializer", stringserializer.class.getName ()); kafkaproducer <string, string> producer = new kafkaproducer <string, string> (อุปกรณ์ประกอบฉาก);หลังจากเพิ่มการกำหนดค่า KAFKA เราเริ่มผลิตข้อมูล รหัสข้อมูลการผลิตจะต้องมีดังนี้เท่านั้น:
Producer.Send (ใหม่ ProducerRecord <String, String> (หัวข้อ, คีย์, ค่า));
หลังจากเขียนโปรแกรมผู้ผลิตให้เริ่มผลิตก่อน!
ข้อความที่ฉันส่งมาที่นี่คือ:
String messagestr = "สวัสดีนี่คือ"+messageno+"data";
และมีการส่งข้อความเพียง 1,000 ข้อความและผลลัพธ์มีดังนี้:
คุณจะเห็นว่าข้อมูลได้รับการพิมพ์สำเร็จแล้ว
หากคุณไม่ต้องการใช้โปรแกรมเพื่อตรวจสอบว่าโปรแกรมถูกส่งสำเร็จหรือไม่และความแม่นยำของการส่งข้อความคุณสามารถใช้คำสั่งเพื่อดูได้บนเซิร์ฟเวอร์ Kafka
ผู้บริโภคคาฟคา
การบริโภค Kafka ควรเป็นประเด็นสำคัญหลังจากส่วนใหญ่เราใช้การใช้ข้อมูลเป็นหลัก
การกำหนดค่าการบริโภค Kafka มีดังนี้:
จากนั้นการกำหนดค่าผู้บริโภค Kafka ของเรามีดังนี้:
คุณสมบัติอุปกรณ์ประกอบฉาก = คุณสมบัติใหม่ (); props.put ("bootstrap.servers", "master: 9092, slave1: 9092, slave2: 9092"); props.put ("group.id", GroupID); props.put ("enable.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1,000"); props.put ("session.timeout.ms", "30000"); props.put ("max.poll.records", 1,000); props.put ("auto.offset.reset", "เร็วที่สุด"); props.put ("key.deserializer", StringDeserializer.class.getName ()); props.put ("value.deserializer", StringDeserializer.class.getName ()); kafkaconsumer <string, string> consumer = new kafkaconsumer <string, string> (อุปกรณ์ประกอบฉาก); เนื่องจากฉันตั้งค่าการส่งอัตโนมัติรหัสการบริโภคมีดังนี้:
เราจำเป็นต้องสมัครรับหัวข้อก่อนนั่นคือเพื่อระบุหัวข้อที่จะบริโภค
consumer.subscribe (array.aslist (หัวข้อ));
หลังจากสมัครสมาชิกแล้วเราจะดึงข้อมูลจาก Kafka:
CerserRecords <String, String> MSGLIST = CONSPENER.POLL (1,000);
โดยทั่วไปการพูดจะใช้การตรวจสอบเมื่อดำเนินการบริโภค ที่นี่เราใช้ (;;) เพื่อตรวจสอบและตั้งค่าการบริโภค 1,000 รายการและออก!
ผลลัพธ์มีดังนี้:
จะเห็นได้ว่าเราได้ใช้ข้อมูลการผลิตที่ประสบความสำเร็จที่นี่
รหัส
จากนั้นรหัสสำหรับผู้ผลิตและผู้บริโภคมีดังนี้:
ผู้ผลิต:
นำเข้า java.util.properties; นำเข้า org.apache.kafka.clients.producer.kafkaproducer; นำเข้า org.apache.kafka.clients.producer.producerrecord; นำเข้า org.apache.kafka.Common.serialialialialialialial การสาธิตผู้ผลิต Kafka* เวอร์ชัน: 1.0.0* @author pancm* @date 26 มกราคม 2018*/คลาสสาธารณะ Kafkaproducertest unprens runnable {ส่วนตัว Kafkaproducer <สตริงสตริง> ผู้ผลิต; หัวข้อสตริงสุดท้ายส่วนตัว Public Kafkaproducertest (String topicName) {คุณสมบัติอุปกรณ์ประกอบฉาก = คุณสมบัติใหม่ (); props.put ("bootstrap.servers", "master: 9092, slave1: 9092, slave2: 9092"); props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringserializer.class.getName ()); props.put ("value.serializer", stringserializer.class.getName ()); this.producer = ใหม่ kafkaproducer <string, string> (อุปกรณ์ประกอบฉาก); this.topic = topicName; } @Override โมฆะสาธารณะ Run () {int messageno = 1; ลอง {สำหรับ (;;) {string messagestr = "สวัสดีนี่คือ"+messageno+"bar of data"; Producer.Send (ใหม่ ProducerRecord <String, String> (หัวข้อ, "ข้อความ", Messagestr)); // ถ้ามีการผลิต 100 รายการถ้า (Messageno%100 == 0) {System.out.println ("ส่งข้อความ:" + Messagestr); } // ถ้ามีการผลิต 1,000 รายการถ้า (Messageno%1000 == 0) {System.out.println ("ส่งสำเร็จ"+Messageno+"Bar"); หยุดพัก; } Messageno ++; }} catch (exception e) {e.printstacktrace (); } ในที่สุด {producer.close (); }} โมฆะคงที่สาธารณะหลัก (สตริง args []) {kafkaproducertest test = ใหม่ kafkaproducertest ("kafka_test"); เธรดเธรด = เธรดใหม่ (ทดสอบ); thread.start (); -ผู้บริโภค:
นำเข้า java.util.Arrays; นำเข้า java.util.properties; นำเข้า org.apache.kafka.clients.consumer.consumerrecord; นำเข้า org.apache.kafka.consumer.consumerrecord; นำเข้า org.apache.kafka.clients.consumer.kafkaconsumer; นำเข้า org.apache.kafka.common.serialization.stringdeserializer;/*************************************************************** KafkaconsumerTest ใช้งาน Runnable {Private Final Kafkaconsumer <String, String> Consumer; cumserrecords ส่วนตัว <string, string> msglist; หัวข้อสตริงสุดท้ายส่วนตัว สตริงสุดท้ายคงที่ส่วนตัว GroupID = "Groupa"; Public KafkaconsumerTest (String topicName) {คุณสมบัติอุปกรณ์ประกอบฉาก = คุณสมบัติใหม่ (); props.put ("bootstrap.servers", "master: 9092, slave1: 9092, slave2: 9092"); props.put ("group.id", GroupID); props.put ("enable.auto.commit", "true"); props.put ("auto.commit.interval.ms", "1,000"); props.put ("session.timeout.ms", "30000"); props.put ("auto.offset.reset", "เร็วที่สุด"); props.put ("key.deserializer", StringDeserializer.class.getName ()); props.put ("value.deserializer", StringDeserializer.class.getName ()); this.consumer = new kafkaconsumer <string, string> (อุปกรณ์ประกอบฉาก); this.topic = topicName; this.consumer.subscribe (array.aslist (หัวข้อ)); } @Override โมฆะสาธารณะ Run () {int messageno = 1; System.out.println ("----------------------------------------"); ลอง {สำหรับ (;;) {msglist = consumer.poll (1,000); if (null! = msglist && msglist.count ()> 0) {สำหรับ (consumerrecord <string, string> record: msglist) {// พิมพ์ 100 รายการเมื่อบริโภค แต่ข้อมูลที่พิมพ์ไม่จำเป็นต้องเป็นกฎถ้า (messageno%100 == 0) {system.out.println record.value ()+"ออฟเซ็ต ==="+record.offset ()); } // ครั้งเดียวที่ใช้งาน 1,000 รายการออกไปถ้า (Messageno%1000 == 0) {break; } Messageno ++; }} else {thread.sleep (1,000); }}} catch (interruptedException e) {e.printStackTrace (); } ในที่สุด {consumer.close (); }} โมฆะคงที่สาธารณะหลัก (สตริง args []) {kafkaconsumertest test1 = ใหม่ kafkaconsumertest ("kafka_test"); เธรดเธรด 1 = เธรดใหม่ (test1); Thread1.start (); -หมายเหตุ: Master, Slave1, Slave2 เป็นเพราะฉันได้ทำการแมปความสัมพันธ์ในสภาพแวดล้อมของฉันเองซึ่งสามารถแทนที่ด้วย IP ของเซิร์ฟเวอร์
แน่นอนฉันวางโครงการใน GitHub และหากคุณสนใจคุณสามารถดูได้ https://github.com/xuwujing/kafka (ดาวน์โหลดท้องถิ่น)
สรุป
การพัฒนาโปรแกรม Kafka อย่างง่ายต้องใช้ขั้นตอนต่อไปนี้:
Kafka บทนำอ้างอิงถึงเอกสารอย่างเป็นทางการ: http://kafka.apache.org/intro
สรุป
ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่าเนื้อหาของบทความนี้จะมีค่าอ้างอิงบางอย่างสำหรับการศึกษาหรือที่ทำงานของทุกคน หากคุณมีคำถามใด ๆ คุณสามารถฝากข้อความไว้เพื่อสื่อสาร ขอบคุณสำหรับการสนับสนุน Wulin.com