تقدم هذه المقالة كيفية دمج رسائل Kafka لإرسال واستلام الرسائل في Springboot Project.
Kafka هو نظام رسائل موزع عالي الإنتاجية الموزعة ، مع الخصائص التالية: توفير استمرار الرسالة من خلال بنية بيانات القرص لـ O (1) ، والتي يمكن أن تحافظ على أداء مستقر لفترة طويلة حتى لو كان تخزين الرسائل تيرابايت. إنتاجية عالية: حتى الأجهزة العادية للغاية يمكن أن تدعم Kafka ملايين الرسائل في الثانية. يدعم تقسيم الرسائل من خلال خوادم كافكا ومجموعات المستهلكين. دعم تحميل البيانات الموازية Hadoop.
تثبيت كافكا
نظرًا لأن تثبيت Kafka يتطلب دعم Zookeeper ، عند تثبيت Windows ، تحتاج إلى تثبيت Zookeeper أولاً ثم تثبيت Kafka. أدناه سأقدم لك خطوات تثبيت MAC والنقاط التي يجب الانتباه إليها. لا يختلف تكوين Windows تقريبًا باستثناء المواقع المختلفة.
مشروب تثبيت كافكا
نعم ، الأمر بهذه البساطة. يمكنك التعامل معها بأمر على جهاز Mac. قد تتطلب عملية التثبيت هذه بعض الوقت ، ويجب أن تكون مرتبطة بحالة الشبكة. قد تكون هناك رسالة خطأ في رسالة موجه التثبيت ، مثل "الخطأ: لا يمكن ربط:/usr/local/share/doc/homebrew". هذا لا يهم ، سيتم تجاهله تلقائيًا. أخيرًا ، نجحنا عندما رأينا ما كان أدناه.
==> ملخص ðÿ/usr/local/cellar/kafka/1.1.0: 157 ملفات ، 47.8 ميجابايت
موقع ملف تكوين التثبيت هو كما يلي ، فقط قم بتعديل رقم المنفذ وفقًا لاحتياجاتك.
تركيب zooopeeper و kafka الموقع/usr/محلي/قبو/
configuration file /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties
ابدأ Zookeeper
انسخ الكود على النحو التالي: ./ bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
ابدأ كافكا
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
إنشاء موضوع لكافكا. تم تسمية الموضوع. يمكنك تكوينه بالاسم الذي تريده. ارجع وتكوينه في الكود بشكل صحيح.
انسخ الرمز على النحو التالي: ./ bin/kafka-topics-create-Zookeeper localhost: 2181-replication-factor 1-partitions 1-topic test
1. حل التبعيات أولا
لن نذكر التبعيات المتعلقة بـ Springboot. تعتمد التبعيات المتعلقة بـ Kafka فقط على حزمة تكامل Spring-Kafka.
<Rependency> <roupeD> org.springframework.kafka </rougiD> <StifactId> Spring-kafka </shintifactid> <الإصدار> 1.1.1.Release </version> </dependency>
هنا سنعرض ملف التكوين أولاً
#================== Kafka ======================================== kafka.consumer.servers = 10.93.21.21: 2181kafka.consumer.enable.auto.commit = truekafka.consumer.session.timeout = 6000kafka.consumer.amommit.interval = 100kafka.consumer.offset .reset = tustkafka.consumer.topic = testkafka.consumer.group.id = testkafka.consumer.concurrency = 10kafka.producer.servers = 10.9 3.21.21: 9092Kafka.Producer.Retries = 0Kafka.producer.batch.size = 4096Kafka.Producer.linger = 1Kafka.producer.Buffer.Memory = 40960
2. التكوين: منتج كافكا
1) إعلان التكوين وفتح قدرة kafkatemplate من خلال التكوين و enablekafka.
2) حقن تكوين kafka في ملف تكوين Application.Properties من خلال Value.
3) توليد الفول ، bean
package com.kangaroo.sentinel.collect.configuration ؛ استيراد java.util.hashmap ؛ استيراد java.util.map ؛ import org.apache.kafka.clients.producer.produconfig ؛ استيراد org.kafka.common.serializer. org.springframework.beans.factory.annotation.value ؛ استيراد org.springframework.context.annotation.bean ؛ استيراد org.springframework.context.annotation.configuration ؛ استيراد org.springframework.kafka.annotation.enable ؛ org.springframework.kafka.core.defaultkafkaproducerfactory ؛ استيراد org.springframework.kafka.core.kafkatemplate ؛ import org.springframework.kafka.core.producerfactory ؛ Value ("$ {kafka.producer.servers}") خوادم السلسلة الخاصة ؛ Value ("$ {kafka.producer.retries}") private int ؛ value ("$ {kafka.producer.batch.size}") private int batchsize ؛ Value ("$ {kafka.producer.linger}") private int linger ؛ value ("$ {kafka.producer.buffer.memory}") private int buffermemory ؛ الخريطة العامة <string ، Object> producronfigs () {map <string ، Object> props = new hashmap <> () ؛ props.put (propererconfig.bootstrap_servers_config ، الخوادم) ؛ props.put (propererconfig.retries_config ، receptires) ؛ props.put (properverconfig.batch_size_config ، batchsize) ؛ props.put (properverconfig.linger_ms_config ، linger) ؛ props.put (properverconfig.buffer_memory_config ، buffermemory) ؛ props.put (properverconfig.key_serializer_class_config ، stringserializer.class) ؛ props.put (properverconfig.value_serializer_class_config ، stringserializer.class) ؛ إرجاع الدعائم } Public ProducerFactory <string ، string> producerFactory () {return new DefaultKafKaproducerFactory <> (ProperConfigs ()) ؛ } bean public kafkatemplate <string ، string> kafkatemplate () {return new KafKatemplate <string ، string> (producerFactory ()) ؛ }}تجربة منتجنا وكتابة وحدة تحكم. تريد الموضوع = الاختبار ، المفتاح = المفتاح ، أرسل رسالة
package com.kangaroo.sentinel.collect.controller ؛ استيراد com.kangaroo.sentinel.common.response.response ؛ استيراد com.kangaroo.sentinel.common.response.resultcode ؛ استيراد org.slf4j.logger ؛ org.springframework.beans.factory.annotation. javax.servlet.http.httpservletresponse ؛@restController@requestMapping ("/kafka") collectcontroller {protected final logger logger = loggerfactory.getLogger (this.getClass ()) ؛ @autowired kafkatemplate kafkatemplate ؛ requestmapping (value = "/send" ، method = requestMethod.get) استجابة عامة sendkafka (طلب httpservletrequest ، httpservletresponse استجابة) {try {string message = request.getParameter ("message") ؛ logger.info ("kafka message = {}" ، message) ؛ kafkatemplate.send ("test" ، "key" ، message) ؛ logger.info ("أرسل kafka بنجاح.") ؛ إرجاع استجابة جديدة (resultcode.sccess ، "إرسال Kafka بنجاح" ، NULL) ؛ } catch (استثناء e) {logger.error ("إرسال kafka فشل" ، e) ؛ إرجاع استجابة جديدة (resultcode.exception ، "إرسال kafka فشل" ، null) ؛ }}}3. التكوين: مستهلك كافكا
1) إعلان التكوين وفتح قدرة kafkatemplate من خلال التكوين و enablekafka.
2) حقن تكوين kafka في ملف تكوين Application.Properties من خلال Value.
3) توليد الفول ، bean
package com.kangaroo.sentinel.collect.configuration ؛ استيراد org.apache.kafka.clients.consumer.consumerConfig ؛ import org.apache.kafka.common.serialization.stringdeserializer org.springframework.context.annotation.bean ؛ استيراد org.springframework.context.annotation.configuration ؛ استيراد org.springframework.kafka.annotation.enablekafka org.springframework.kafka.config.kafkalistenercontainerfactory ؛ استيراد org.springframework.kafka.core org.springframework.kafka.listener.concurrentMessageListenerContainer ؛ استيراد java.util.hashmap ؛ استيراد java.util.map ؛@configuration@enablekafkapublic class kafkaconsumerconfig {value ($ {kafka.consumer value ("$ {kafka.consumer.enable.auto.commit}") private boolean enableeautocommit ؛ Value ("$ {kafka.consumer.session.timeout}") private string sessionTimeOut ؛ Value ("$ {kafka.consumer.auto.commit.interval}") سلسلة خاصة AutocommitInterval ؛ Value ("$ {kafka.consumer.group.id}") سلسلة خاصة ؛ value ("$ {kafka.consumer.auto.offset.reset}") سلسلة خاصة AutoOffSetReset ؛ value ("$ {kafka.consumer.concurrency}") private int concurrency ؛ bean public kafkalistenercontainerfactory <concurrentMessageListenerContainer <string ، string >> kafkalistenercontainerfactory () {concurrentkafkalistenercontainerfactory <string> factory = concurrentkafkalistenercontainerfactory <> المصنع. مصنع. Factory.getContainerProperties (). عودة المصنع } المستهلك العام <string ، string> consumerFactory () {return new DefaultKafKaconsumerFactory <> (ConsulterConfigs ()) ؛ } الخريطة العامة <string ، object> exhinperconfigs () {map <string ، object> propsmap = new hashmap <> () ؛ propsmap.put (consulterconfig.bootstrap_servers_config ، الخوادم) ؛ propsmap.put (consulterconfig.enable_auto_commit_config ، enableeautocommit) ؛ propsmap.put (consulterconfig.auto_commit_interval_ms_config ، autocommitinterval) ؛ propsmap.put (consulterconfig.session_timeout_ms_config ، sessionTimeout) ؛ propsmap.put (consulterconfig.key_deserializer_class_config ، stringDeserializer.class) ؛ propsmap.put (consulterconfig.value_deserializer_class_config ، stringDeserializer.class) ؛ propsmap.put (consulterconfig.group_id_config ، groupId) ؛ propsmap.put (consulterconfig.auto_offset_reset_config ، AutoOffSetReset) ؛ إرجاع propsmap ؛ } bean public bustener leader () {return new bearner () ؛ }}New Beasherer () يولد حبة لمعالجة البيانات قراءة من كافكا. العرض التوضيحي للتنفيذ البسيط للمستمع هو كما يلي: ما عليك سوى قراءة وطباعة قيم المفتاح والرسائل
يتم استخدام سمة موضوعات @Kafkalistener لتحديد اسم موضوع Kafka. يتم تحديد اسم الموضوع بواسطة منتج الرسائل ، أي أنه يتم تحديده بواسطة Kafkatemplate عند إرسال رسالة.
package com.kangaroo.sentinel.collect.configuration ؛ استيراد org.apache.kafka.clients.consumer.consumerrecord ؛ استيراد org.slf4j.logger ؛ استيراد org.slf4j.loggerfactory logger = loggerfactory.getLogger (this.getClass ()) ؛ kafkalistener (موضوعات = {"test"}) public void الاستماع (consumerRecord <؟ logger.info ("قيمة kafka:" + record.value (). toString ()) ؛ }}نصائح:
1) لم أقدم كيفية تثبيت وتكوين كافكا. من الأفضل استخدام IP لشبكة الربط بالكامل عند تكوين Kafka ، بدلاً من المضيف المحلي أو 127.0.0.1
2) من الأفضل عدم استخدام Zookeeper الخاص بـ Kafka لنشر كافكا ، لأنه قد يتسبب في إمكانية الوصول إلى الوصول.
3) من الناحية النظرية ، يجب على المستهلك قراءة كافكا من خلال ZookeEper ، لكننا هنا نستخدم عنوان Kafkaserver ، لماذا لم نذهب إليه بعمق؟
4) عند تحديد تكوين رسالة المراقبة ، يتم استخدام قيمة عنصر تكوين GROUP_ID_CONFIG لتحديد اسم مجموعة المستهلك. إذا كان هناك العديد من كائنات المستمع في نفس المجموعة ، يمكن فقط كائن مستمع واحد تلقي الرسالة.
ما سبق هو كل محتوى هذه المقالة. آمل أن يكون ذلك مفيدًا لتعلم الجميع وآمل أن يدعم الجميع wulin.com أكثر.