1. Situs Web Resmi Alibaba Cloud --- Dokumen Bantuan
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
Ikuti langkah -langkah situs web resmi untuk membuat topik, mengajukan permohonan penerbitan (produser), dan melamar berlangganan (konsumen)
2. Kode
1. Konfigurasi:
kelas publik mqconfig {/** * Harap ganti xxx berikut sebelum memulai tes */string final public static public_topic = "test"; // public static final string public_producer_id = "pid_scheduler"; string final public static public_consumer_id = "cid_service"; string final public static access_key = "123"; string final public static Secret_key = "123"; tag string final statis publik = ""; Public Static Final String thread_num = "25"; // Jumlah utas konsumen/*** onsaddr Harap konfigurasikan sesuai dengan wilayah yang berbeda* Tes Jaringan Publik: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-ternet* Produksi cloud publik: Produksi cloud publik: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client.aliyun.com:8080 http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */string final public onsaddr = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4;Onsaddr Alibaba Cloud menggunakan produksi cloud publik, dan pengujian menggunakan jaringan publik
Layanan yang berbeda dapat mengatur tag yang berbeda, tetapi jika volume pesannya besar, disarankan untuk membuat topik baru.
2. Produser
Metode 1:
File Konfigurasi: Produser.xml
<? Xml Version = "1.0" encoding = "UTF-8"?> <! Doctype Beans Public "-// Spring // dtd bean // en" "http://www.springframework.org/dtd/spring-beans.dtd" <beans> <bean id id = "-pethy =" -peth "-pethy ("-pethy- " Name = "Properties"> <pap> <entry key = "producerId" value = "" /> <!-pid, silakan ganti-> <entry key = "accessKey" value = "" /> <!-access_key, tolong ganti-> <entry key = "secretkey" value = "" /> <!-secret_key, silakan ganti-> <!-propertyConst.onsadsad. http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet Public Cloud Produksi: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-ternal hangzhou Cloud keuangan: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal shenzhen cloud: http://mq4finance-sz.addr.aliyun (8080/rocketmq/nsaddr4client.aliyon.8080/rocketmq value = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </peta> </propert> </bean> </beans>Metode startup 1, diatur dalam pengaturan global kelas:
// inisialisasi produser private applicationContext CTX; produser produserus pribadi; @Value ("$ {ProduceConfig.enabled}") // switch, item konfigurasi pegas, true is on, false matikan private boolean produserConfigenabled; @PostConstruct public void init () {if (true == ProducerConfigenabled) {ctx = classpathxmlapplicationContext baru ("produser.xml"); produser = (produserBean) ctx.getBean ("produser"); }}PS: Saya baru -baru ini menemukan lubang. Jika produser dimulai dalam metode di atas, setelah dimulai lebih banyak, itu akan menyebabkan fullgc. Karena itu, Anda dapat mengubah metode anotasi berikut untuk memulai secara manual dan mematikan di mana Anda menggunakannya.
Metode 2: Konfigurasikan kelas (tidak diperlukan XML)
@ConfigurationPublic kelas produserBeanConfig {@value ("$ {openServices.ons.producerbean.producerid}") Private String ProducerIDID; @Value ("$ {OpenServices.ons.Producerbean.AccessKey}") Private String AccessKey; @Value ("$ {OpenServices.ons.producerbean.secretkey}") Private String Secretkey; Produserbean PrivateBean; @Value ("$ {OpenServices.ons.producerbean.onsaddr}") Private String OnsAddr; @Bean Public Producerbean OneProducer () {Producerbean Producerbean = new Producerbean (); Properti properti = properti baru (); Properties.setProperty (PropertyKeyConst.producerid, ProducerID); Properties.setProperty (PropertyKeyConst.AccessKey, AccessKey); Properties.setProperty (PropertyKeyConst.Secretkey, Secretkey); Properties.setProperty (PropertyKeyConst.onsaddr, OnsAddr); produserBean.setProperties (properti); Return Producerbean; }}PS: Setelah ganda 11 ini, ditemukan bahwa dua metode di atas tidak terlalu cocok untuk volume data besar dan situasi multi-threading, dan kinerjanya sangat buruk, sehingga disarankan untuk menggunakan 3.
Metode 3: (tidak diperlukan XML)
@ComponentPublic kelas produserBeansingleton {@value ("$ {OpenServices.ons.producerbean.producerid}") Private String ProducerId; @Value ("$ {OpenServices.ons.Producerbean.AccessKey}") Private String AccessKey; @Value ("$ {OpenServices.ons.producerbean.secretkey}") Private String Secretkey; @Value ("$ {OpenServices.ons.producerbean.onsaddr}") Private String OnsAddr; Produser produser statis pribadi; private static class singletonHolder {private static final producerbeansingleton instance = new Producerbeansingleton (); } private producerbeansingleton () {} public static produserBeansingleton getInstance () {return singletonHolder.instance; } @PostConstruct public void init () {// Produser Configuration Initialization Properties Properties = New Properties (); // Produser ID Properties.SetProperty (PropertyKeyConst.ProducerID, ProducerID); // AccessKey ALIBABA Cloud Authentication, buat properties.setProperty (PropertyKeyConst.AccessKey, AccessKey); // Secretkey Authentication Cloud Alibaba, Buat Properties.SetProperty (PropertyKeyConst.Secretkey, Secretkey); // Secretkey Authentication Cloud Alibaba, Buat Properties.SetProperty (PropertyKeyConst.Secretkey, Secretkey); // Atur waktu batas waktu kirim, unit Millisecond Properties.setProperty (PropertyKeyConst.Sendmsgtimeoutmillis, "3000"); // Atur nama domain Access TCP (lihat lingkungan produksi cloud publik sebagai contoh di sini) Properties. produser = onsfactory.createProducer (properti); // Sebelum mengirim pesan, Anda harus memanggil metode start untuk memulai produser, dan Anda hanya perlu memanggilnya sekali ke produser.start (); } produser publik getProducer () {return produser; }}Konfigurasi Musim Semi
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialectConsumerconfig.enabled = trueproducerconfig.enabled = true #method 1: jadwal.enabled = false #metode 2, 3: rocketmq /u516c/u7f51/u914d/u7f6eopenservices.ons.producerbean.producerid = pidopenservices.ons.producerbean.accesskey = openServices.ons.producerbean.secretkey = openserervices.ons.producer.ons.onsbean.secretkey = openservices.ons.producer.ons.onsbean.secrety OpenServices.ons.producerbean.onsaddr = Jaringan Publik, Hangzhou Public Cloud Production
Metode 1 Memberikan kode pesan:
coba {string jsonc = jsonutils.toJson (ElevenMessage); Pesan pesan = pesan baru (mqconfig.topic, mqconfig.tag, jsonc.getbytes ()); SendResult sendResult = produser.send (pesan); if (sendResult! = null) {logger.info (". Kirim pesan mq sukses!";} else {logger.warn (". sendResult adalah null ......");}} tangkap (pengecualian e) {logger.warn ("DoubleElevenAllPresservice"); thread.sleep (1000); excleon/excleon/excleon ife, excresser (1000);Metode 2 Kode Pesan Pengiriman: (dapat dimulai/ditutup setiap 1000 kali)
produserBean.start (); coba {string jsonc = jsonutils.toJson (ElevenMessage); Pesan pesan = pesan baru (mqconfig.topic, mqconfig.tag, jsonc.getbytes ()); SendResult sendResult = produser.send (pesan); if (sendResult! = null) {logger.info (". Kirim pesan mq sukses!";} else {logger.warn (". sendResult adalah null ......");}} tangkap (pengecualian e) {logger.warn ("DoubleElevenAllPresservice"); thread.sleep (1000); excleon/excleon/excleon ife, excresser (1000); produserBean.shutdown ();Metode 3: Kirim pesan
coba {string jsonc = jsonutils.toJson (ElevenMessage); Pesan pesan = pesan baru (mqconfig.topic, mqconfig.tag, jsonc.getbytes ()); Produser produser = produserBeansingleton.getInstance (). GetProducer (); SendResult sendResult = produser.send (pesan); if (sendResult! = null) {logger.info ("DoubleElevenMidService.Send MQ Pesan Sukses! Topik adalah:";} else {Logger.warn ("DoubleElevenMidService.SendResult adalah null ...");} tangkap (pengecualian e) {logger.errerr adalah null ... ");}} Catch (exception e) {logger.errorr adalah null ......");} tangkap (pengecualian e) {logger.errorr adalah null ... ");}} Catch (exception e) {LOGGER.RORRER adalah null ..." "+e.getMessage (), e); thread.sleep (1000); // Jika ada pengecualian, tidur selama 1 detik}Kode yang mengirim pesan harus menangkap pengecualian, jika tidak, itu akan dikirim berulang kali.
Topik di sini dibuat sendiri. Elevenmessage adalah konten yang akan dikirim. Saya adalah objek yang saya buat sendiri.
3. Konsumen
Konfigurasikan kelas startup:
@Configuration@conditionAnproperty (value = "consumerconfig.enabled", haveValue = "true", matchifmissing = true) public classConfig {private Logger Logger = LoggerFactory.getLogger (LoggerAppendItpe.smsdist.name ()); @Bean Public Consumer ConsumerFactory () {// Konsumen yang berbeda tidak dapat mengganti nama properti di sini ConsumerProperties = New Properties (); ConsumerProperties.SetProperty (PropertyKeyConst.Consumerid, mqconfig.consumer_id); ConsumerProperties.SetProperty (PropertyKeyConst.accesskey, mqconfig.access_key); ConsumerProperties.SetProperty (PropertyKeyConst.Secretkey, mqconfig.secret_key); //consumerproperties.setProperty(PropertyKonST.Consumethreadnums,mqconfig.thread_num); ConsumerProperties.SetProperty (PropertyKeyConst.onsaddr, mqconfig.onsaddr); Konsumen konsumen = onsfactory.createConsumer (ConsumerProperties); konsumen.subscribe (mqconfig.topic, mqconfig.tag, doubleElevenMessageListener baru ()); // konsumen pendengar baru yang sesuai (); Logger.info ("Consumconfig Mulai Sukses."); Konsumen pengembalian; }}Anda perlu memilih CID dan Onsaddr yang tepat. Anda dapat mengonfigurasinya di sini dengan menggunakan jumlah utas konsumen Anda sendiri, dll.
Buat kelas pendengar pesan dan konsumsi pesan:
@ComponentPublic Class MessageListener mengimplementasikan MessageListener {private Logger Logger = loggerFactory.getLogger ("ingatkan"); Elevenreposit ElevenReposit statis yang dilindungi; @Resource public void setelevenreposit (ElevenReposit ElevenReposit) {MessageListener .ElevenReposit = ElevenReposit; } @Override Konsumsi Tindakan Publik (pesan pesan, ConsumerContext ConsumeContext) {if (message.gettopic (). Equals ("topik sendiri")) {// hindari mengkonsumsi pesan lain kesalahan konversi json coba {byte [] body = message.getBody (); String res = string baru (tubuh); // Res adalah konten pesan yang dikirim oleh produser // kode bisnis} else {logger.warn ("!"); }} catch (Exception e) {logger.error ("MessageListener.Consume error:" + e.getMessage (), e); } Logger.info ("MessageListener.Receive Message"); // Jika Anda ingin menguji fungsi pemulihan pesan, Anda dapat mengganti tindakan. } else {logger.warn (); Return Action.Reconsumelater; }}Perhatikan bahwa karena konsumen multi-threaded, objek perlu disuntikkan dengan statis+diatur untuk meningkatkan tingkat objek ke proses, sehingga beberapa utas dapat dibagikan, tetapi metode dan variabel kelas induk tidak dapat dipanggil.
Status konsumen dapat memeriksa apakah konsumen berhasil terhubung, apakah konsumsi tertunda, kecepatan konsumsi, dll.
Mengatur ulang situs konsumsi dapat menghapus semua pesan
3. Hal -hal yang perlu diperhatikan
1. Badan pesan maksimum yang dikirim adalah 256kb
2. Pesannya ada hingga 3 hari
3. Jumlah utas default di sisi konsumen adalah 20
4. Jika Java menutup telepon atau CPU menempati jumlah yang sangat tinggi selama pelarian, Anda dapat mengirim utas untuk 1s dari setiap 1.000 pesan saat mengirimkannya.
5. Saat pengujian atau startup lokal, ganti onsaddr dengan jaringan publik, jika tidak kesalahan tidak akan dimulai.
Di atas adalah semua konten artikel ini. Saya berharap ini akan membantu untuk pembelajaran semua orang dan saya harap semua orang akan lebih mendukung wulin.com.