1. Pendahuluan
Baru -baru ini, perusahaan memiliki kebutuhan untuk menggunakan antrian pesan alibaba cloud. Untuk membuatnya lebih nyaman untuk digunakan, saya telah menghabiskan beberapa hari merangkum antrian pesan ke dalam metode panggilan API untuk memfasilitasi panggilan sistem internal. Sudah selesai sekarang. Di sini kami mencatat proses dan teknologi yang relevan yang digunakan, dan berbagi dengan Anda.
Alibaba Cloud sekarang menyediakan dua layanan pesan: layanan MNS dan layanan ONS. Saya pikir MNS adalah versi ONS yang disederhanakan, dan konsumsi pesan MNS membutuhkan strategi pemungutan suara khusus. Sebaliknya, fungsi mode penerbitan dan berlangganan ONS lebih kuat (misalnya, dibandingkan dengan MNS, ONS menyediakan pelacakan pesan, penebangan, pemantauan dan fungsi lainnya), dan API -nya lebih nyaman digunakan. Juga telah didengar bahwa Alibaba tidak akan lagi mengembangkan MN di masa depan, tetapi hanya mempertahankannya. Layanan ONS secara bertahap akan menggantikan layanan MNS dan menjadi produk utama dari layanan pesan Alibaba. Oleh karena itu, jika ada kebutuhan untuk menggunakan antrian pesan, disarankan untuk tidak menggunakan MNS lagi. Menggunakan ONS adalah pilihan terbaik.
Teknik yang terlibat: musim semi, refleksi, proxy dinamis, serialisasi dan deserialisasi Jackson
Sebelum membaca artikel berikut, Anda perlu membaca dokumentasi di atas untuk memahami konsep yang relevan (topik, konsumen, produser, tag, dll.) Dan implementasi kode pengiriman dan penerima yang disediakan dalam dokumentasi.
Posting blog ini hanya untuk teman yang memiliki basis pengetahuan tentang antrian pesan. Saya secara alami sangat senang membantu semua orang. Jangan memarahi siapa pun yang tidak memahaminya, karena itu berarti jalan Anda salah.
2. Rencana Desain
1. Pengiriman Pesan
Dalam arsitektur CSS sederhana, dengan asumsi bahwa server akan mendengarkan pesan yang dikirim oleh produser topik, pertama -tama harus memberikan API klien. Klien hanya perlu hanya memanggil API dan dapat menghasilkan pesan melalui produser.
2. Penerimaan pesan
Karena API dirumuskan oleh server, server tentu saja juga tahu cara mengkonsumsi pesan -pesan ini.
Dalam proses ini, server sebenarnya memainkan peran konsumen, dan klien sebenarnya memainkan peran produsen, tetapi aturan bagi produsen untuk menghasilkan pesan dirumuskan oleh konsumen untuk memenuhi kebutuhan konsumsi konsumen.
3. Tujuan akhir
Kami ingin membuat paket JAR terpisah bernama antrian-core untuk memberikan implementasi spesifik dependensi dan menerbitkan langganan untuk produsen dan konsumen.
3. Pengiriman Pesan
1. Konsumen menyediakan antarmuka
@Topic (name = "kdyzm", produserId = "kdyzm_producer") antarmuka publik userqueueSource {@tag ("test1") public handleUserInfo (@body @key ("userInfoHandler") userRuseSerinInd; @Tag ("test2") public void handleUserInfo1 (@body @key ("userInfoHandler1") userModel pengguna);}Karena topik dan produser berada dalam hubungan N: 1, ProducerID secara langsung digunakan sebagai properti topik; Tag adalah kondisi penyaringan yang sangat kritis, dan konsumen menggunakannya untuk mengklasifikasikan pesan untuk melakukan pemrosesan bisnis yang berbeda, sehingga tag digunakan sebagai kondisi perutean di sini.
2. Produser mengirim pesan menggunakan API yang disediakan oleh konsumen
Karena konsumen hanya menyediakan antarmuka untuk digunakan produsen, tidak ada cara untuk menggunakan antarmuka secara langsung karena tidak ada cara untuk instantiate mereka. Di sini kami menggunakan proxy dinamis untuk menghasilkan objek. Di API yang disediakan oleh konsumen, tambahkan konfigurasi berikut untuk memfasilitasi produsen untuk secara langsung mengimpor konfigurasi dan menggunakannya. Di sini kami menggunakan konfigurasi pegas berdasarkan java. Harap diketahui.
@ConfigurationPublic kelas queueconfig {@autowired @bean UserqueueLeSource userqueueLeSource () {return queueresourceFactory.createProxyqueuereSource (userqueueresource.class); }}3. Enkapsulasi antrian-core untuk pengiriman pesan produsen
Semua anotasi dalam 1 di atas (topik, tag, tubuh, kunci) dan kelas queueresourceFactory yang digunakan dalam 2 harus didefinisikan dalam antrian-core. Definisi anotasi hanya mendefinisikan aturan. Implementasi sebenarnya sebenarnya ada di QueueresourceFactory.
import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.producer; import com.aliyun.openservices.ons.api.sendResult; impor com.wy.queue.core.api.mqconnection; impor com.wyeue.core.utils.Jacksonserizer; com.wy com.wyeue.core.utils.Jacksonserizer; com.wy.queue.core.utils.queuecorespringutils; kelas publik queueresourceFactory mengimplementasikan InvocationHandler {private static final Logger Logger = loggerFactory.getLogger (queueresourceFactory.class); Private String TopicName; produsen string pribadi; serializer private jacksonserizer = jacksonserializer baru (); Private Static Final String Prefix = "PID_"; public queueresourceFactory (string topicName, string produserId) {this.topicname = topicName; this.producerid = producerId; } public static <T> t createProxyqueUeReSource (class <t> clazz) {string topicName = mqutils.gettopicname (clazz); String producerId = mqutils.getproducerid (clazz); T target = (t) proxy.newproxyInstance (queueresourceFactory.class.getClassLoader (), kelas baru <?> [] {Clazz}, queueresourceFactory baru (topicName, produserID)); target pengembalian; } @Override Objek publik Invoke (Proxy Object, Metode Metode, Object [] args) melempar lempar {if (args.length == 0 || args.length> 1) {lempar runtimeException baru ("Hanya terima satu param di antarmuka queueresource."); } String tagName = mqutils.gettagname (metode); ProduserFactory produserFactory = queuecorespringutils.getbean (produserFactory.class); MqConnection connectionInfo = queuecorespringutils.getbean (mqconnection.class); Produser produser = produserFactory.createProducer (awalan+connectionInfo.getPrefix ()+"_"+produserId); //Send message Message msg = new Message( // // The Topic created in the console, that is, the Topic name to which the message belongs. connectionInfo.getPrefix()+"_"+topicName, // Message Tag, // It can be understood as a tag in Gmail, and the message is reclassified to facilitate Consumer to specify filtering conditions to filter tagName on the MQ server, // Message Body // Any binary form Data, MQ tidak mengganggu, // produsen dan konsumen diperlukan menegosiasikan serialisasi yang konsisten dan deserialisasi metode serializer.serialize (args [0]). getBytes ()); SendResult sendResult = produser.send (msg); Logger.info ("Kirim pesan sukses. ID pesan adalah:" + sendResult.getMessageId ()); kembali nol; }}Di sini kami telah memposting paket khusus dan nama paket yang digunakan oleh pihak ketiga untuk memfasilitasi perbedaan.
Apa sebenarnya yang dilakukan di sini?
Proses mengirim pesan adalah membuat objek proxy pada proxy dinamis. Objek akan dicegat saat memanggil metode. Pertama, parse semua anotasi, seperti TopicName, ProducerID, tag dan informasi kunci lainnya dari anotasi, dan kemudian hubungi Alibaba SDK untuk mengirim pesan. Prosesnya sangat sederhana, tetapi perhatikan bahwa ketika mengirim pesan di sini, itu dibagi menjadi lingkungan. Secara umum, perusahaan sekarang membedakan tiga lingkungan: QA, pementasan, dan produk. Di antara mereka, QA dan pementasan adalah lingkungan uji. Untuk antrian pesan, ada juga tiga cincin. Namun, di lingkungan, QA dan lingkungan pementasan sering menggunakan akun Alibaba yang sama untuk mengurangi biaya, sehingga topik dan produk yang dibuat akan ditempatkan di area yang sama. Dengan cara ini, TopicName dengan nama yang sama tidak diperbolehkan, sehingga awalan lingkungan ditambahkan untuk membedakannya, seperti QA_TopicName, PID_STAGING_PRODUCERID, dll.; Selain itu, Antrian-Core menyediakan antarmuka MQConnection untuk mendapatkan informasi konfigurasi, dan layanan produser hanya perlu mengimplementasikan antarmuka ini.
4. Produser mengirim pesan
@Autowired private userqueuueresource userqueuueresource; @Override public void sendMessage () {usermodel usermodel = new usermodel (); usermodel.setname ("kdyzm"); usermodel.setage (25); userqueueresource.handleuserinfo (usermodel); }Hanya beberapa baris kode yang diperlukan untuk mengirim pesan ke topik yang ditentukan, yang jauh lebih tipis daripada kode pengiriman asli.
4. Konsumsi berita
Dibandingkan dengan pengiriman pesan, konsumsi pesan lebih rumit.
1. Desain konsumsi pesan
Karena topik dan konsumen adalah hubungan n: n, konsumen ditempatkan pada metode implementasi spesifik konsumen
@Controller@queueresourcePublic kelas userqueueresourceImpl mengimplementasikan userqueueSource {private logger logger = loggerFactory.getLogger (this.getClass ()); @Override @consumerannotation ("kdyzm_consumer") public void handleUserInfo (userModel user) {logger.info ("pesan 1 diterima: {}", GSON baru (). TOJSON (pengguna)); } @Override @consumerannotation ("kdyzm_consumer1") public void handleUserInfo1 (userModel user) {logger.info ("pesan 2 diterima: {}", baru gson (). TOJSON (pengguna)); }}Berikut adalah dua anotasi baru @Queueresource dan @consumerannotation. Kedua anotasi ini akan dibahas di masa depan. Seseorang mungkin bertanya mengapa saya harus menggunakan nama ConsumRanNotation alih -alih nama konsumen, karena nama konsumen bertentangan dengan nama dalam SDK yang disediakan oleh Aliyun. . . .
Di sini, konsumen menyediakan antarmuka API kepada produsen untuk memfasilitasi produsen untuk mengirim pesan, dan konsumen menerapkan antarmuka untuk mengkonsumsi pesan yang dikirim oleh produsen. Cara mengimplementasikan antarmuka API adalah menerapkan pemantauan, yang merupakan logika yang relatif kritis.
2.queue-core mengimplementasikan logika inti dari mendengarkan pesan antrian
Langkah 1: Gunakan metode mendengarkan wadah musim semi untuk mendapatkan semua kacang dengan anotasi queueresource
Langkah 2: Distribusikan kacang pemrosesan
Bagaimana cara menangani kacang ini? Setiap kacang sebenarnya adalah objek. Dengan objek, seperti objek UserQueUeReSourceImpl dalam contoh di atas, kita bisa mendapatkan objek bytecode antarmuka yang diimplementasikan oleh objek, dan kemudian mendapatkan anotasi pada antarmuka userqueuererouse dan anotasi pada metode dan metode. Tentu saja, anotasi pada metode implementasi UserqueuereSourceImpl juga dapat diperoleh. Di sini saya akan menggunakan ConsumerID sebagai kunci, dan informasi yang relevan yang tersisa dienkapsulasi sebagai nilai dan di -cache menjadi objek peta. Kode inti adalah sebagai berikut:
Kelas <?> Clazz = resourceImpl.getClass (); Kelas <?> Clazzif = clazz.getInterfaces () [0]; Metode [] metode = clazz.getMethods (); String TopicName = mqutils.gettopicName (clazzif); untuk (metode m: metode) {konsumsinotasi konsumeranno = m.getannotation (consumeRannotation.class); if (null == consumeranno) {// logger.error ("method = {} membutuhkan anotasi konsumen.", m.getName ()); melanjutkan; } String ConsumerId = ConsumeranAnno.Value (); if (stringutils.isempty (consuerid)) {logger.error ("method = {} consumerId tidak bisa menjadi null", m.getName ()); melanjutkan; } Class <?> [] ParameTerTypes = m.getParameterTypes (); Metode ResourceIfMethod = null; coba {resourceIfMethod = clazzif.getMethod (m.getName (), parameterTypes); } catch (noSuchMethodeException | SecurityException e) {logger.error ("Tidak dapat menemukan metode = {} di antarmuka super = {}.", m.getName (), clazzif.getCanonicalName (), e); melanjutkan; } String tagName = mqutils.gettagname (ResourceIfMethod); Consumersmap.put (consuerid, MethodInfo baru (TopicName, Tagname, M)); }Langkah 3: Tindakan konsumsi melalui refleksi
Pertama, tentukan waktu eksekusi tindakan refleksi, yaitu, dengarkan pesan baru
Kedua, bagaimana melakukan tindakan refleksi? Saya tidak akan membahas detailnya. Sepatu anak-anak dengan yayasan terkait refleksi tahu cara membuatnya. Kode inti adalah sebagai berikut:
MqConnection connectionInfo = queuecorespringutils.getbean (mqconnection.class); String topicprefix = connectionInfo.getPrefix ()+"_"; String consumeridprefix = prefix+connectionInfo.getPrefix ()+"_"; untuk (String ConsumerId: ConsumersMap.keyset ()) {MethodInfo MethodInfo = ConsumersMap.get (ConsumerId); Properties ConnectionProperties = ConvertToProPerties (ConnectionInfo); // ID Konsumen yang Anda buat di Console ConnectionProperties.put (PropertyKeyConst.ConsumerID, ConsumerIdPrefix+ConsumerId); Konsumen konsumen = onsfactory.createConsumer (ConnectionProperties); Consumer.subscribe (TopicPrefix+MethodInfo.gettopicName (), MethodInfo.gettagname (), New MessageListener () {// Berlangganan beberapa tag Tag Tindakan Publik (pesan pesan, Konteks ConsumerContext) {coba {string messagebody = new string (pesan.getBody (), "UTF-8") {string {string messageBody = new string (pesan.getBody (), "UTF-8") {String messageBody = new string (message.getbody (), "UTF-8") {string messageBody = new string (message.getBody (), "UTF-8") {string messageBody = new string (message.getBody (), "UTF-8") (MESCHERING LOVING. Topic = {}, tag = {}, consumerid = {}, pesan = {} ", topicprefix+methodInfo.gettopicName (), methodInfo.gettagname (), consumeridprefix+consumerid, messagebody); Method = methodinfo.getMethod (); class <? = Jacksonserializer.deserialize (Pesan, ParameterType); konsumen.start (); logger.info ("konsumen = {} telah dimulai.", ConsumerIdprefix+ConsumerId); }5. Lihat tautan git di bawah ini untuk kode lengkap
https://github.com/kdyzm/queue-core.git
Di atas adalah semua konten artikel ini. Saya berharap ini akan membantu untuk pembelajaran semua orang dan saya harap semua orang akan lebih mendukung wulin.com.