Artikel ini memperkenalkan cara mengintegrasikan Kafka mengirim dan menerima pesan dalam proyek Springboot.
Kafka adalah sistem pesan yang distribusikan dengan throughput tinggi, dengan karakteristik berikut: memberikan persistensi pesan melalui struktur data disk O (1), yang dapat mempertahankan kinerja yang stabil untuk waktu yang lama bahkan jika penyimpanan pesan adalah terabytes. Throughput Tinggi: Bahkan perangkat keras yang sangat biasa kafka dapat mendukung jutaan pesan per detik. Mendukung partisi pesan melalui server kafka dan kelompok konsumen. Mendukung Hadoop Parallel Data Loading.
Instal kafka
Karena menginstal Kafka memerlukan dukungan dari Zookeeper, saat menginstal Windows, Anda perlu menginstal Zookeeper terlebih dahulu dan kemudian menginstal Kafka. Di bawah ini saya akan memberi Anda langkah -langkah untuk menginstal Mac dan poin yang harus diperhatikan. Konfigurasi Windows hampir tidak berbeda kecuali untuk lokasi yang berbeda.
BREW INSTAL KAFKA
Ya, sesederhana itu. Anda dapat menanganinya dengan perintah di Mac. Proses instalasi ini mungkin memerlukan waktu, dan harus terkait dengan status jaringan. Mungkin ada pesan kesalahan dalam pesan prompt instalasi, seperti "Kesalahan: tidak dapat menautkan:/usr/local/share/doc/homebrew". Ini tidak masalah, itu akan diabaikan secara otomatis. Akhirnya, kami berhasil ketika kami melihat apa yang ada di bawah.
==> Ringkasan ðÿ º/usr/local/cellar/kafka/1.1.0: 157 file, 47.8MB
Lokasi file konfigurasi instalasi adalah sebagai berikut, cukup ubah nomor port sesuai dengan kebutuhan Anda.
Lokasi Zoopeeper dan Kafka yang Diinstal/USR/Lokal/Cellar/
File konfigurasi /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zooKeeper.properties
Mulai Zookeeper
Salin kode sebagai berikut: ./ bin/zookeeperperer-server-start /usr/local/etc/kafka/zookeepereper.properties &
Mulai Kafka
./bin/kafka-server-start /usr/local/etc/kafka/server.properties &
Buat topik untuk kafka. Topiknya dinamai tes. Anda dapat mengonfigurasinya dengan nama yang Anda inginkan. Kembali dan konfigurasikan dalam kode dengan benar.
Salin kode sebagai berikut: ./ bin/kafka-topics --create --zookeeper localhost: 2181 --mana-factor 1 --partisi 1-uji topik
1. Selesaikan dependensi terlebih dahulu
Kami tidak akan menyebutkan dependensi yang terkait dengan Springboot. Ketergantungan yang terkait dengan Kafka hanya mengandalkan paket integrasi Spring-Kafka.
<dependency> <GroupId> org.springframework.kafka </groupid> <ArTifactId> Spring-kafka </artifactid> <version> 1.1.1.release </version> </dependency>
Di sini kami akan menampilkan file konfigurasi terlebih dahulu
#================== Kafka ================================================ Consumer.server = 10.93.21.21: 2181kafka.consumer.enable.auto.Commit = TrueKafka.consumer.Session.Timeout = 6000Kafka.consumer.auto.commit.interval = 100kafka.consumer.auto.commit.interval = 100kafka.consumer.auto.commit.interval = 100kafka.consumer.auto.commit.Interval = 100kafka.consumer.auto.comset .reset = LateSKafka.consumer.topic = testkafka.consumer.group.id = testkafka.consumer.concurrency = 10kafka.producer.servers = 10.9 3.21.21: 9092kafka.producer.retries = 0kafka.producer.batch.size = 4096kafka.producer.linger = 1kafka.producer.buffer.memory = 40960
2. Konfigurasi: produsen kafka
1) Deklarasikan Konfigurasi dan Buka Kemampuan Kafkatemplate melalui @configuration dan @enableKafka.
2) menyuntikkan konfigurasi kafka di file konfigurasi application.properties melalui @Value.
3) menghasilkan kacang, @bean
Paket com.kangaroo.sentinel.collect.configuration; impor java.util.hashmap; import java.util.map; impor org.apache.kafka.clients.producer.stroducerconfig; impor org.apache.kafka.common.sialization.stringsernizer; org.springframework.beans.factory.annotation.value; impor org.springframework.context.annotation.bean; impor org.springframework.context.annotation.configuration; import org.springframework.kafka.annotation.enableKure; org.springframework.kafka.core.defaultkafkaproducerFactory; impor org.springframework.kafka.core.kafkatemplate; impor org.springframework.kafka.core.producerfactory;@[email protected]; @Value ("$ {kafka.producer.servers}") server string pribadi; @Value ("$ {kafka.producer.retries}") int private int retries; @Value ("$ {kafka.producer.batch.size}") private int batchsize; @Value ("$ {kafka.producer.linger}") private int linger; @Value ("$ {kafka.producer.buffer.memory}") private int Buffermemory; peta publik <string, object> producerConfigs () {peta <string, object> props = new HashMap <> (); props.put (produserconfig.bootstrap_servers_config, server); props.put (producerConfig.Retries_config, retries); props.put (producerConfig.batch_size_config, batchsize); props.put (produserconfig.linger_ms_config, berlama -lama); props.put (producerConfig.buffer_memory_config, Buffermemory); props.put (produserconfig.key_serializer_class_config, stringserializer.class); props.put (producerConfig.value_serializer_class_config, stringSerializer.class); pengembalian alat peraga; } public produserFactory <String, String> produserFactory () {return New DefaultKafkaproducerFactory <> (ProducerConfigs ()); } @Bean public kafkatemplate <string, string> kafkatemplate () {return kafkatemplate baru <string, string> (produserFactory ()); }}
Eksperimen produser kami dan tulis pengontrol. Want topic = test, key = key, kirim pesan
Paket com.kangaroo.sentinel.collect.controller; import com.kangaroo.sentinel.common.response.response; import com.kangaroo.sentinel.common.response.resultcode; impor org.slf4j.logger; impor.ResultCode; org.springframework.beans.factory.annotation.Autowired; impor org.springframework.kafka.core.kafkatemplate; impor org.springframework.web.bind.annotation.*; impor javax.servlet.htp.httpserv. javax.servlet.http.httpservletResponse;@restcontroller@requestMapping ("/kafka") kelas public CollectController {Logger akhir yang dilindungi = loggerFactory.getLogger (this.getClass ()); @Autowired private kafkatemplate kafkatemplate; @RequestMapping (value = "/send", method = requestMethod.get) respons publik sendKafka (httpservletRequest request, httpservletResponse response) {coba {string message = request.getParameter ("pesan"); logger.info ("kafka pesan = {}", pesan); kafkatemplate.send ("tes", "kunci", pesan); Logger.info ("Kirim Kafka berhasil."); mengembalikan respons baru (resultCode.success, "Kirim Kafka berhasil", null); } catch (Exception e) {logger.error ("Kirim Kafka gagal", e); mengembalikan respons baru (resultCode.Exception, "Kirim Kafka gagal", null); }}}3. Konfigurasi: konsumen kafka
1) Deklarasikan Konfigurasi dan Buka Kemampuan Kafkatemplate melalui @configuration dan @enableKafka.
2) menyuntikkan konfigurasi kafka di file konfigurasi application.properties melalui @Value.
3) menghasilkan kacang, @bean
Paket com.kangaroo.sentinel.collect.configuration; impor org.apache.kafka.clients.consumer.consumerconfig; impor org.apache.kafka.common.serialization.stringdeserializer; impor org.springframework.geans.facporyory org.springframework.context.annotation.bean; impor org.springframework.context.annotation.onfiguration; impor org.springframework.kafka.annotation.enablefka; impor org.springframework.kafka.config.config.config.config.currentkurrenkafk org.springframework.kafka.config.kafkalistenerContainerFactory; impor org.springframework.kafka.core.consumerFactory; impor org.springframework.kafka.core.defaultKafkaconsumerfactory; impor org.springframework.kafka.listen.concurrentMessAgelistenerContainer; import java.util.hashmap; impor java.util.map;@configuration@enabekafkapublic class kafkaconsumerconfig {@value ("$ {kafka.consum.consum. @Value ("$ {kafka.consumer.enable.auto.commit}") private boolean enableAutocommit; @Value ("$ {kafka.consumer.Session.timeout}") Private String sessionTimeout; @Value ("$ {kafka.consumer.auto.Commit.Interval}") Private String AutoCommitInterVal; @Value ("$ {kafka.consumer.group.id}") private string groupId; @Value ("$ {kafka.consumer.auto.offset.reset}") Private String Autooffsetreset; @Value ("$ {kafka.consumer.concurrency}") private int concurrency; @Bean Public KafkalistenerContainererFactory <ConcurrentMessAgelistenerContainer <String, String >> KafkalistenerContainerFactory () {concurrentkaFkalistenerContainerFactory <string, string> factory = concurrentkaFkalistenerContenerFactory <string, string> factory = concurrentkaFkalistenerCeNerererFactory <string, string> factory = concurrentkaFkalistenerCeNerererFactory <string, string> factory = concurrentKaFkalistenerCaRererererFactory <string, string> factory = concurrentKaFkalistenerCalerererFactori factory.setConsumerFactory (ConsumerFactory ()); factory.setConcurrency (concurrency); factory.getContainerProperties (). SetPollTimeout (1500); Pabrik Kembali; } public ConsumerFactory <String, String> ConsumerFactory () {return New DefaultKafKaconsumerFactory <> (ConsumerConfigs ()); } peta publik <String, Object> ConsumerConfigs () {MAP <String, Object> propsmap = new HashMap <> (); propsmap.put (consumerconfig.bootstrap_servers_config, server); propsmap.put (consumerconfig.enable_auto_commit_config, enableAutocommit); propsmap.put (consumerconfig.auto_commit_interval_ms_config, autocommitinterval); propsmap.put (consumerconfig.session_timeout_ms_config, sessionTimeout); propsmap.put (consumerconfig.key_deserializer_class_config, stringdeserializer.class); propsmap.put (consumerconfig.value_deserializer_class_config, stringdeserializer.class); propsmap.put (consumerconfig.group_id_config, groupId); propsmap.put (consumerconfig.auto_offset_reset_config, autooffsetreset); mengembalikan propsmap; } @Bean listener listener () {return new listener (); }}Pendengar baru () menghasilkan kacang untuk memproses data yang dibaca dari kafka. Demo implementasi sederhana pendengar adalah sebagai berikut: Cukup baca dan cetak nilai kunci dan pesan
Atribut topik @Kafkalistener digunakan untuk menentukan nama topik Kafka. Nama topik ditentukan oleh produser pesan, yaitu, ditentukan oleh Kafkatemplate saat mengirim pesan.
Paket com.kangaroo.sentinel.collect.configuration; impor org.apache.kafka.clients.consumer.consumeRecord; impor linanur org.slf4j.logger; impor org.slf4j.loggerFactory; impor org.springframework.annkafka.nkafka. = LoggerFactory.getLogger (this.getClass ()); @Kafkalistener (topics = {"test"}) public void listen (ConsumeRecord <?,?> Record) {logger.info ("kunci kafka:" + record.key ()); Logger.info ("Nilai Kafka:" + Record.Value (). ToString ()); }}Tips:
1) Saya tidak memperkenalkan cara menginstal dan mengkonfigurasi kafka. Yang terbaik adalah menggunakan IP jaringan yang sepenuhnya mengikat saat mengkonfigurasi kafka, daripada localhost atau 127.0.0.1
2) Yang terbaik adalah tidak menggunakan zookeeper Kafka sendiri untuk menggunakan Kafka, karena dapat menyebabkan akses ke tidak dapat diakses.
3) Secara teoritis, konsumen harus membaca kafka melalui Zookeeper, tetapi di sini kita menggunakan alamat kafkaserver, mengapa kita tidak melakukannya secara mendalam?
4) Saat mendefinisikan konfigurasi pesan pemantauan, nilai item konfigurasi grup_id_config digunakan untuk menentukan nama grup konsumen. Jika ada beberapa objek pendengar dalam grup yang sama, hanya satu objek pendengar yang dapat menerima pesan.
Di atas adalah semua konten artikel ini. Saya berharap ini akan membantu untuk pembelajaran semua orang dan saya harap semua orang akan lebih mendukung wulin.com.