Kata pengantar
Dalam artikel sebelumnya, kita berbicara tentang cara membangun kluster kafka, dan artikel ini berbicara tentang cara menggunakan kafka. Namun, saat menggunakan kafka, Anda harus tetap memahami kafka secara singkat.
Pengantar Kafka
Kafka adalah sistem pesan berlangganan yang didistribusikan dengan throughput tinggi yang menangani semua data aliran tindakan di situs web skala konsumen.
Kafka memiliki karakteristik berikut:
istilah kafka
Kafka Core API
Kafka memiliki empat API inti
Contoh diagram adalah sebagai berikut:
Skenario Aplikasi Kafka
Untuk pengantar di atas, lihat dokumen resmi Kafka.
Persiapan Pembangunan
Jika kita mengembangkan program kafka, apa yang harus kita lakukan?
Pertama -tama, setelah membangun lingkungan kafka, kita perlu mempertimbangkan apakah kita seorang produsen atau konsumen, yaitu, pengirim atau penerima pesan.
Namun, dalam artikel ini, baik produsen maupun konsumen akan berkembang dan menjelaskan.
Setelah pemahaman yang kasar tentang Kafka, kami akan mengembangkan program pertama.
Bahasa pengembangan yang digunakan di sini adalah Java, alat konstruksi Maven.
Ketergantungan Maven adalah sebagai berikut:
<dependency> <GroupId> org.apache.kafka </groupid> <ArTifactId> kafka_2.12 </arttifactid> <version> 1.0.0 </version> <scope> Disediakan </seupope> </Artorcid> <Ruppendency> <Roupact> org.apache.kafka </scopeD> <Rependency> <RoupacTID> <version> 1.0.0 </version> </dependency> <dependency> <GroupId> org.apache.kafka </groupid> <ArTifactId> Kafka-streams </artifactid> <version> 1.0.0 </version> </dependency>
Produser Kafka
Selama pengembangan dan produksi, mari kita perkenalkan secara singkat berbagai instruksi konfigurasi Kafka:
...
Ada lebih banyak konfigurasi, Anda dapat memeriksa dokumentasi resmi, yang tidak akan dijelaskan di sini.
Maka konfigurasi produsen kafka kami adalah sebagai berikut:
Properti props = properti baru (); props.put ("bootstrap.server", "master: 9092, slave1: 9092, slave2: 9092"); props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringSerializer.class.getName ()); props.put ("value.serializer", stringserializer.class.getName ()); Kafkaproducer <string, string> produser = kafkaproducer baru <string, string> (props);Setelah menambahkan konfigurasi Kafka, kami mulai memproduksi data. Kode data produksi hanya perlu sebagai berikut:
produser.send (produserRecord baru <string, string> (topik, kunci, nilai));
Setelah menulis program produser, mari kita mulai memproduksi terlebih dahulu!
Pesan yang saya kirim di sini adalah:
String MessageStr = "Halo, ini adalah"+Messageno+"Data";
Dan hanya 1.000 pesan yang dikirim dan hasilnya adalah sebagai berikut:
Anda dapat melihat bahwa informasi tersebut telah berhasil dicetak.
Jika Anda tidak ingin menggunakan program untuk memverifikasi apakah program tersebut berhasil dikirim dan keakuratan pengiriman pesan, Anda dapat menggunakan perintah untuk melihatnya di server Kafka.
Konsumen kafka
Konsumsi Kafka harus menjadi titik kunci, bagaimanapun, sebagian besar waktu, kami terutama menggunakan konsumsi data.
Konfigurasi konsumsi kafka adalah sebagai berikut:
Maka konfigurasi konsumen Kafka kami adalah sebagai berikut:
Properti props = properti baru (); props.put ("bootstrap.server", "master: 9092, slave1: 9092, slave2: 9092"); props.put ("group.id", groupId); props.put ("enable.auto.Commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("max.poll.records", 1000); props.put ("auto.offset.reset", "paling awal"); props.put ("key.deserializer", stringdeserializer.class.getName ()); props.put ("value.deserializer", stringdeserializer.class.getName ()); Kafkaconsumer <string, string> konsumen = kafkaconsumer baru <string, string> (props); Karena saya mengatur pengiriman otomatis, kode konsumsi adalah sebagai berikut:
Kita perlu berlangganan topik terlebih dahulu, yaitu, untuk menentukan topik mana yang akan dikonsumsi.
konsumen.subscribe (arrays.aslist (topik));
Setelah berlangganan, kami menarik data dari Kafka:
ConsumerRecords <String, String> msglist = Consumer.poll (1000);
Secara umum, pemantauan digunakan saat konsumsi dilakukan. Di sini kami menggunakan (;;) untuk memantau, dan mengatur konsumsi 1.000 item dan keluar!
Hasilnya adalah sebagai berikut:
Dapat dilihat bahwa kami telah berhasil mengonsumsi data produksi di sini.
Kode
Kemudian kode untuk produsen dan konsumen adalah sebagai berikut:
Produsen:
impor java.util.properties; impor org.apache.kafka.clients.producer.kafkaproducer; impor org.apache.kafka.clients.producer.producerrecord; org.apache.kafka.common.serialization.stringerizer; Versi Demo*: 1.0.0* @author pancm* @Date 26 Januari 2018*/kelas publik Kafkaproducertest mengimplementasikan runnable {private final kafkaproducer <string, string> produser; Topik String Akhir Pribadi; publik kafkaproducertest (string topicName) {properties props = properti baru (); props.put ("bootstrap.server", "master: 9092, slave1: 9092, slave2: 9092"); props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("key.serializer", stringSerializer.class.getName ()); props.put ("value.serializer", stringserializer.class.getName ()); this.producer = kafkaproducer baru <string, string> (props); this.topic = TopicName; } @Override public void run () {int messageno = 1; coba {untuk (;;) {string messageStr = "halo, ini adalah bilah"+messageno+"data"; produser.send (produsen baru <string, string> (topik, "pesan", messageStr)); // Jika 100 item diproduksi, if (messageno%100 == 0) {System.out.println ("Kirim pesan:" + MessageStr); } // Jika 1000 item diproduksi, if (messageno%1000 == 0) {System.out.println ("berhasil dikirim"+messageno+"bar"); merusak; } Messageno ++; }} catch (Exception e) {e.printstacktrace (); } akhirnya {produser.close (); }} public static void main (string args []) {kafkaproducertest test = kafkaproducertest baru ("kafka_test"); Thread utas = utas baru (tes); thread.start (); }}konsumen:
Impor java.util.arrays; import java.util.properties; impor org.apache.kafka.clients.consumer.consumeRecord; impor org.apache.kafka.clients.consumer.consumerrecord; impor org.apache.kafka.clients.consumer.consumerrecord; org.apache.kafka.clients.consumer.kafkaconsumer; impor org.apache.kafka.common.serialization.stringdeserializer;/**** Judul: Kafkaconsumertest* Deskripsi:* Kafka Demo Konsumer* Versi: 1.0.0* @Author PancaFa, Runnable {private final kafkaconsumer <string, string> konsumen; Private ConsumerRecords <String, String> msglist; Topik String Akhir Pribadi; Private Static Final String groupID = "groupa"; public kafkaconsumerTest (string topicName) {properties props = new new Properties (); props.put ("bootstrap.server", "master: 9092, slave1: 9092, slave2: 9092"); props.put ("group.id", groupId); props.put ("enable.auto.Commit", "true"); props.put ("auto.commit.interval.ms", "1000"); props.put ("session.timeout.ms", "30000"); props.put ("auto.offset.reset", "paling awal"); props.put ("key.deserializer", stringdeserializer.class.getName ()); props.put ("value.deserializer", stringdeserializer.class.getName ()); this.consumer = kafkaconsumer baru <string, string> (props); this.topic = TopicName; this.consumer.subscribe (arrays.aslist (topik)); } @Override public void run () {int messageno = 1; System.out.println ("----------------------------------------"); coba {untuk (;;) {msglist = konsumen.poll (1000); if (null! = msglist && msglist.count ()> 0) {for (consumeRrecord <string, string> Record: msglist) {// cetak 100 item saat dikonsumsi, tetapi data yang dicetak tidak harus aturan jika (messageno%100 == 0) {System.out.println (messageno + "====== Record.Value ()+"Offset ==="+Record.Offset ()); } // Setelah 1000 item dikonsumsi, Keluar dari IF (Messageno%1000 == 0) {break; } Messageno ++; }} else {thread.sleep (1000); }}} catch (InterruptedException e) {E.PrintStackTrace (); } akhirnya {consumer.close (); }} public static void main (string args []) {kafkaconsumertest test1 = kafkaconsumertest baru ("kafka_test"); Thread thread1 = utas baru (test1); thread1.start (); }}Catatan: Master, Slave1, Slave2 adalah karena saya telah membuat pemetaan hubungan di lingkungan saya sendiri, yang dapat diganti dengan IP server.
Tentu saja, saya meletakkan proyek di GitHub, dan jika Anda tertarik, Anda dapat melihatnya. https://github.com/xuwujing/kafka (unduhan lokal)
Meringkaskan
Pengembangan sederhana program Kafka membutuhkan langkah -langkah berikut:
Kafka Pendahuluan Lihat dokumen resmi: http://kafka.apache.org/intro
Meringkaskan
Di atas adalah seluruh konten artikel ini. Saya berharap konten artikel ini memiliki nilai referensi tertentu untuk studi atau pekerjaan semua orang. Jika Anda memiliki pertanyaan, Anda dapat meninggalkan pesan untuk berkomunikasi. Terima kasih atas dukungan Anda ke wulin.com.