مقدمة
في المقال السابق ، نتحدث عن كيفية بناء مجموعة كافكا ، وهذا المقال يتحدث عن كيفية استخدام Kafka ببساطة. ومع ذلك ، عند استخدام Kafka ، لا يزال يتعين عليك فهم Kafka لفترة وجيزة.
مقدمة إلى كافكا
Kafka هو نظام مراسلة للاشتراك الموزع عالي الإنتاجية يتولى جميع بيانات تدفق الإجراء في موقع ويب على نطاق المستهلك.
لدى كافكا الخصائص التالية:
شروط كافكا
kafka core API
لدى كافكا أربعة واجهات برمجة التطبيقات الأساسية
مخطط المثال هو كما يلي:
سيناريوهات تطبيق كافكا
للمقدمة أعلاه ، راجع وثيقة كافكا الرسمية.
إعداد التنمية
إذا أردنا تطوير برنامج كافكا ، فماذا يجب أن نفعل؟
بادئ ذي بدء ، بعد بناء بيئة كافكا ، نحتاج إلى التفكير فيما إذا كنا منتجًا أو مستهلكًا ، أي المرسل أو المستلم للرسالة.
ومع ذلك ، في هذه المقالة ، سيتطور كل من المنتجين والمستهلكين.
بعد فهم تقريبي لكافكا ، سنقوم بتطوير البرنامج الأول.
لغة التطوير المستخدمة هنا هي Java ، أداة البناء Maven.
تبعيات مافن هي كما يلي:
<Rependency> <roupeD> org.apache.kafka </rougiD> <StifactId> kafka_2.12 </artifactId> <splection> 1.0.0 </splement> <scope> متوفر </scope> </reperency> <sperence> <roupend> org.apache <soph> 1.0.0 </version> </sependency> <reperency> <roupiD> org.apache.kafka </rougiD> <StifactId> kafka-streams </insifactid> <soph>
منتج كافكا
أثناء التطوير والإنتاج ، دعونا نقدم بإيجاز تعليمات التكوين المختلفة لكافكا:
...
هناك المزيد من التكوينات ، يمكنك التحقق من الوثائق الرسمية ، والتي لن يتم شرحها هنا.
ثم تكوين منتج Kafka الخاص بنا كما يلي:
خصائص الدعائم = خصائص جديدة () ؛ props.put ("bootstrap.servers" ، "Master: 9092 ، slave1: 9092 ، slave2: 9092") ؛ props.put ("Acks" ، "All") ؛ props.put ("Retries" ، 0) ؛ props.put ("batch.size" ، 16384) ؛ props.put ("key.serializer" ، stringserializer.class.getName ()) ؛ props.put ("value.serializer" ، stringserializer.class.getName ()) ؛ kafkaproducer <string ، string> producer = new Kafkaproducer <string ، string> (props) ؛بعد إضافة تكوين Kafka ، نبدأ في إنتاج البيانات. يجب أن يكون رمز بيانات الإنتاج فقط على النحو التالي:
Propert.send (New ProducerRecord <String ، String> (Topic ، Key ، Value)) ؛
بعد كتابة برنامج المنتج ، لنبدأ في الإنتاج أولاً!
الرسالة التي أرسلتها هنا هي:
String messagester = "Hello ، هذا هو"+messageno+"البيانات" ؛
ويتم إرسال 1000 رسالة فقط والنتائج على النحو التالي:
يمكنك أن ترى أن المعلومات قد تم طباعتها بنجاح.
إذا كنت لا ترغب في استخدام البرنامج للتحقق مما إذا تم إرسال البرنامج بنجاح ودقة إرسال الرسالة ، فيمكنك استخدام الأمر لعرضه على خادم Kafka.
كافكا المستهلك
يجب أن يكون استهلاك Kafka هو النقطة الرئيسية ، بعد كل شيء ، في معظم الوقت ، نستخدم استهلاك البيانات بشكل أساسي.
تكوين استهلاك كافكا هو كما يلي:
ثم تكوين مستهلك كافكا لدينا كما يلي:
خصائص الدعائم = خصائص جديدة () ؛ props.put ("bootstrap.servers" ، "Master: 9092 ، slave1: 9092 ، slave2: 9092") ؛ props.put ("group.id" ، groupid) ؛ props.put ("enable.auto.commit" ، "true") ؛ props.put ("Auto.Commit.Interval.ms" ، "1000") ؛ props.put ("session.timeout.ms" ، "30000") ؛ props.put ("max.poll.records" ، 1000) ؛ props.put ("auto.offset.reset" ، "أقرب") ؛ props.put ("key.deserializer" ، stringDeserializer.class.getName ()) ؛ props.put ("value.deserializer" ، stringDeserializer.class.getName ()) ؛ kafkaconsumer <string ، string> consumer = kafkaconsumer الجديد <string ، string> (الدعائم) ؛ نظرًا لأنني أقوم بإعداد التقديم التلقائي ، فإن رمز الاستهلاك هو كما يلي:
نحتاج إلى الاشتراك في موضوع ما أولاً ، أي لتحديد الموضوع الذي يجب استهلاكه.
المستهلك.
بعد الاشتراك ، نسحب البيانات من كافكا:
ConsumerRecords <string ، string> msglist = consumer.poll (1000) ؛
بشكل عام ، يتم استخدام المراقبة عند تنفيذ الاستهلاك. هنا نستخدم (؛؛) لمراقبة ، وإعداد استهلاك 1000 عنصر والخروج!
النتائج كما يلي:
يمكن ملاحظة أننا قد استهلكنا بنجاح بيانات الإنتاج هنا.
شفرة
ثم رموز المنتجين والمستهلكين هي كما يلي:
منتج:
استيراد java.util.properties ؛ استيراد org.apache.kafka.clients.producer.kafkaproducer ؛ استيراد org.apache.kafka.clients.producer.producerrecord ؛ استيراد org.apache.kafka.common.Serialization.StringSerializer ؛ DEMO* الإصدار: 1.0.0* Author Pancm* date 26 يناير 2018*/فئة عامة kafkaproducertest runnable {private final kafkaproducer <string ، string> producer ؛ موضوع السلسلة النهائية الخاصة ؛ kafkaproducertest العامة (string topicname) {properties props = new properties () ؛ props.put ("bootstrap.servers" ، "Master: 9092 ، slave1: 9092 ، slave2: 9092") ؛ props.put ("Acks" ، "All") ؛ props.put ("Retries" ، 0) ؛ props.put ("batch.size" ، 16384) ؛ props.put ("key.serializer" ، stringserializer.class.getName ()) ؛ props.put ("value.serializer" ، stringserializer.class.getName ()) ؛ this.producer = جديد kafkaproducer <string ، string> (الدعائم) ؛ this.topic = topicname ؛ } Override public void run () {int messageno = 1 ؛ حاول {for (؛؛) {String messagester = "Hello ، هذا هو"+messageno+"شريط البيانات" ؛ Producer.send (New ProducerRecord <string ، string> (topic ، "message" ، messagester)) ؛ // إذا تم إنتاج 100 عنصر ، if (messageno ٪ 100 == 0) {system.out.println ("رسالة إرسال:" + messagester) ؛ } // إذا تم إنتاج 1000 عنصر ، إذا (messageno ٪ 1000 == 0) {system.out.println ("تم إرسالها بنجاح"+messageno+"bar") ؛ استراحة؛ } messageno ++ ؛ }} catch (استثناء e) {E.PrintStackTrace () ؛ } أخيرًا {producer.close () ؛ }} public static void main (String args []) {kafkaproducertest test = new KafkaproducerTest ("kafka_test") ؛ موضوع الموضوع = موضوع جديد (اختبار) ؛ thread.start () ؛ }}مستهلك:
استيراد java.util.arrays ؛ استيراد java.util.properties ؛ استيراد org.apache.kafka.clients.consumer.consumerrecord ؛ import org.apache.kafka.concons.consumer.consumerrecord ؛ import org.kafka.clins.consumer.consumer.consumer. org.apache.kafka.clients.consumer.kafkaconsumer ؛ import org.apache.kafka.common.serialization.stringDeserializer ؛/**** العنوان: kafkaconsumertest* الوصف:* kafka demo demo* @ethuthor prancm* RunNable {private Final Kafkaconsumer <string ، string> consumer ؛ ConsumerRecords الخاصة <string ، string> msglist ؛ موضوع السلسلة النهائية الخاصة ؛ Static Final String GroupId = "Groupa" ؛ kafkaconsumertest العامة (string topicname) {properties props = new properties () ؛ props.put ("bootstrap.servers" ، "Master: 9092 ، slave1: 9092 ، slave2: 9092") ؛ props.put ("group.id" ، groupid) ؛ props.put ("enable.auto.commit" ، "true") ؛ props.put ("Auto.Commit.Interval.ms" ، "1000") ؛ props.put ("session.timeout.ms" ، "30000") ؛ props.put ("auto.offset.reset" ، "أقرب") ؛ props.put ("key.deserializer" ، stringDeserializer.class.getName ()) ؛ props.put ("value.deserializer" ، stringDeserializer.class.getName ()) ؛ this.consumer = جديد kafkaconsumer <string ، string> (الدعائم) ؛ this.topic = topicname ؛ this.consumer.subscribe (arrays.aslist (topic)) ؛ } Override public void run () {int messageno = 1 ؛ System.out.println ("----------------------------------------------") ؛ حاول {for (؛؛) {msglist = consumer.poll (1000) ؛ if (null! = msglist && msglist.count ()> 0) {for (consumerRecord <string ، string> record: msglist) {// print 100 steam عند الاستهلاك ، ولكن البيانات المطبوعة ليست بالضرورة هي القاعدة إذا (messageno ٪ 100 == 0) {system.out.println (messageno + " record.value ()+"Offset ==="+record.offset ()) ؛ } // بمجرد استهلاك 1000 عنصر ، الخروج إذا (messageno ٪ 1000 == 0) {break ؛ } messageno ++ ؛ }} آخر {thread.sleep (1000) ؛ }}} catch (interruptedException e) {E.PrintStackTrace () ؛ } أخيرًا {consumer.close () ؛ }} public static void main (String args []) {kafkaconsumertest test1 = new KafkaconsumerTest ("kafka_test") ؛ Thread Thread1 = موضوع جديد (Test1) ؛ thread1.start () ؛ }}ملاحظة: Master ، Slave1 ، Slave2 لأنني قمت بتخطيط العلاقة في بيئتي الخاصة ، والتي يمكن استبدالها بـ IP للخادم.
بالطبع ، أضع المشروع على Github ، وإذا كنت مهتمًا ، فيمكنك إلقاء نظرة. https://github.com/xuwujing/kafka (تنزيل محلي)
لخص
يتطلب التطوير البسيط لبرنامج كافكا الخطوات التالية:
مقدمة Kafka ارجع إلى المستند الرسمي: http://kafka.apache.org/intro
لخص
ما سبق هو المحتوى الكامل لهذه المقالة. آمل أن يكون لمحتوى هذه المقالة قيمة مرجعية معينة لدراسة أو عمل الجميع. إذا كان لديك أي أسئلة ، فيمكنك ترك رسالة للتواصل. شكرا لك على دعمك إلى wulin.com.