1. เว็บไซต์อย่างเป็นทางการของ Alibaba Cloud --- เอกสารช่วยเหลือ
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
ทำตามขั้นตอนของเว็บไซต์อย่างเป็นทางการเพื่อสร้างหัวข้อสมัครสำนักพิมพ์ (ผู้ผลิต) และสมัครสมัครสมาชิก (ผู้บริโภค)
2. รหัส
1. การกำหนดค่า:
คลาสสาธารณะ mqconfig {/** * โปรดแทนที่ xxx ต่อไปนี้ก่อนที่จะเริ่มการทดสอบ */สตริงสุดท้ายคงที่สาธารณะ public_topic = "ทดสอบ"; // สตริงสุดท้ายคงที่สาธารณะ public_producer_id = "pid_scheduler"; สตริงสุดท้ายคงที่สาธารณะ public_consumer_id = "cid_service"; สตริงสุดท้ายคงที่สาธารณะ access_key = "123"; สตริงสุดท้ายคงที่สาธารณะ secret_key = "123"; แท็กสตริงสุดท้ายคงที่สาธารณะ = ""; สตริงสุดท้ายคงที่ public String thread_num = "25"; // จำนวนเธรดผู้บริโภค/*** onsaddr โปรดกำหนดค่าตามภูมิภาคต่าง ๆ* การทดสอบเครือข่ายสาธารณะ: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet* การผลิตคลาวด์สาธารณะ http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/NSADDR http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */สตริงสุดท้ายคงที่สาธารณะ onsaddr = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4clientOnsaddr Alibaba Cloud ใช้การผลิตคลาวด์สาธารณะและการทดสอบใช้เครือข่ายสาธารณะ
บริการที่แตกต่างกันสามารถตั้งค่าแท็กที่แตกต่างกัน แต่ถ้าปริมาณข้อความมีขนาดใหญ่ขอแนะนำให้สร้างหัวข้อใหม่
2. ผู้ผลิต
วิธีที่ 1:
ไฟล์กำหนดค่า: producer.xml
<? xml version = "1.0" การเข้ารหัส = "utf-8"?> <! doctype beans สาธารณะ "-// spring // dtd bean // en" "http://www.springframework.org/dtd/spring-beans.dtd" <property name = "properties"> <map> <entry key = "producerId" value = "" /> <!-pid โปรดแทนที่-> <entry key = "accessKey" value = " /> <! http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet การผลิตเมฆสาธารณะ: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal เซินเจิ้นเมฆการเงิน: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client value = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </property> </ebean> </ebeans>
วิธีการเริ่มต้น 1 ตั้งค่าในการตั้งค่าทั่วโลกของคลาส:
// เริ่มต้นผู้ผลิต Private ApplicationContext CTX; โปรดิวเซอร์ผู้ผลิตเอกชน @Value ("$ {producerConfig.enabled}") // สวิตช์รายการการกำหนดค่าสปริง, จริงเปิด, ปิดเท็จปิดบูลีนส่วนตัว producerconfigenabled; @PostConstruct โมฆะสาธารณะ init () {ถ้า (true == ProducerConfigenabled) {ctx = ใหม่ classPathxMlapplicationContext ("producer.xml"); ผู้ผลิต = (ProducerBean) ctx.getBean ("โปรดิวเซอร์"); -PS: ฉันเพิ่งค้นพบหลุม หากผู้ผลิตเริ่มต้นในวิธีการข้างต้นเมื่อมันเริ่มขึ้นมากขึ้นมันจะทำให้ FullGC ดังนั้นคุณสามารถเปลี่ยนเป็นวิธีการอธิบายประกอบต่อไปนี้เพื่อเริ่มต้นด้วยตนเองและปิดการใช้งานที่คุณใช้
วิธีที่ 2: กำหนดค่าคลาส (ไม่จำเป็นต้องใช้ XML)
@ConfigurationPublic คลาส ProducerBeanConfig {@Value ("$ {openServices.ons.producerBean.producerid}") Private String ProducerId; @Value ("$ {openServices.ons.producerbean.accesskey}") สตริงส่วนตัว accesskey; @Value ("$ {openServices.ons.producerBean.secretkey}") สตริงส่วนตัว SecretKey; ผู้ผลิตเอกชนผู้ผลิตเบียน; @Value ("$ {openServices.ons.producerBean.onsaddr}") สตริงส่วนตัว onsaddr; @Bean Public ProducerBean OneProducer () {ProducerBean ProducerBean = New ProducerBean (); คุณสมบัติคุณสมบัติ = คุณสมบัติใหม่ (); Properties.SetProperty (Propertykeyconst.producerid, ProducerId); Properties.SetProperty (PropertyKeyConst.AccessKey, AccessKey); Properties.SetProperty (Propertykeyconst.secretkey, SecretKey); Properties.SetProperty (Propertykeyconst.onsaddr, Onsaddr); ProducerBean.setProperties (คุณสมบัติ); ผู้ผลิตกลับมา; -PS: หลังจาก Double 11 นี้พบว่าสองวิธีข้างต้นไม่เหมาะสำหรับปริมาณข้อมูลขนาดใหญ่และสถานการณ์มัลติเธรดและประสิทธิภาพการทำงานแย่มากดังนั้นจึงแนะนำให้ใช้ 3
วิธีที่ 3: (ไม่จำเป็นต้องใช้ XML)
@ComponentPublic คลาส ProducerBeansingLeton {@Value ("$ {openServices.ons.producerBean.producerid}") Private String ProducerId; @Value ("$ {openServices.ons.producerbean.accesskey}") สตริงส่วนตัว accesskey; @Value ("$ {openServices.ons.producerBean.secretkey}") สตริงส่วนตัว SecretKey; @Value ("$ {openServices.ons.producerBean.onsaddr}") สตริงส่วนตัว onsaddr; ผู้ผลิตผู้ผลิตคงที่ส่วนตัว; ชั้นเรียนแบบสแตติกส่วนตัวแบบสแตติก {Private Static Final Producerbeansingleton Instance = ใหม่ ProducerBeansingleton (); } Private Producerbeansingleton () {} ผู้ผลิตสุดท้ายคงที่สาธารณะคงที่ GetInstance () {return singletonholder.instance; } @PostConstruct โมฆะสาธารณะ init () {// // ผู้ผลิตอินสแตนซ์การกำหนดค่าคุณสมบัติการกำหนดค่าคุณสมบัติ = คุณสมบัติใหม่ (); // คุณสมบัติผู้ผลิต ID Properties.SetProperty (Propertykeyconst.producerid, ProducerId); // AccessKey Alibaba การตรวจสอบคลาวด์สร้างคุณสมบัติ SetProperty (PropertyKeyConst.AccessKey, AccessKey); // SecretKey Alibaba การตรวจสอบคลาวด์สร้างคุณสมบัติ SetProperty (Propertykeyconst.secretkey, SecretKey); // SecretKey Alibaba การตรวจสอบคลาวด์สร้างคุณสมบัติ SetProperty (Propertykeyconst.secretkey, SecretKey); // ตั้งค่าเวลาการส่งเวลาการส่งมิลลิวินาทีคุณสมบัติ SetProperty (Propertykeyconst.sendmsgtimeoutmillis, "3000"); // ตั้งค่าชื่อโดเมนการเข้าถึง TCP (ดูสภาพแวดล้อมการผลิตคลาวด์สาธารณะเป็นตัวอย่างที่นี่) Properties.SetProperty (Propertykeyconst.onsaddr, Onsaddr); ผู้ผลิต = onsfactory.createProducer (คุณสมบัติ); // ก่อนที่จะส่งข้อความคุณต้องโทรหาวิธีการเริ่มต้นเพื่อเริ่มผู้ผลิตและคุณจะต้องเรียกมันเพียงครั้งเดียวเพื่อ producer.start (); } ผู้ผลิตสาธารณะ getProducer () {ผู้ผลิตคืน; -การกำหนดค่าสปริง
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialectConsumerConfig.enabled = trueproducerconfig.enabled = True #method 1: กำหนดเวลา /U516C/U7F51/U914D/U7F6EOPENSERVICES.ONS.PRODUCERBEAN.PRODUCERID = PIDOPENSERVICES.ONS.PRODUCERBEAN.ACCESSKEY = OPENSERVICES.ONS.PRODUCERBEAN.SECRETKEYKEY openServices.ons.producerbean.onsaddr = เครือข่ายสาธารณะ, การผลิตคลาวด์สาธารณะหางโจว
วิธีที่ 1 ส่งรหัสข้อความ:
ลอง {string jsonc = jsonutils.tojson (สิบเอ็ดเมนต์); ข้อความข้อความ = ข้อความใหม่ (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); SENDRESULT SENDRESULT = PROKER.SEND (ข้อความ); if (sendresult! = null) {logger.info (". ส่ง MQ Message Success!";} else {logger.warn (". sendresult เป็น null ...... ");}} catch (exception e) {logger.warn ("doubleelevenallpreservice");วิธีการ 2 รหัสการส่งมอบรหัส: (สามารถเริ่ม/ปิดทุก ๆ 1,000 ครั้ง)
ProducerBean.start (); ลอง {String jsonc = jsonutils.tojson (Elevenmessage); ข้อความข้อความ = ข้อความใหม่ (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); SENDRESULT SENDRESULT = PROKER.SEND (ข้อความ); if (sendresult! = null) {logger.info (". ส่ง MQ Message Success!";} else {logger.warn (". sendresult เป็น null ...... ");}} catch (exception e) {logger.warn ("doubleelevenallpreservice"); ProducerBean.Shutdown ();วิธีที่ 3: ส่งข้อความ
ลอง {string jsonc = jsonutils.tojson (สิบเอ็ดเมนต์); ข้อความข้อความ = ข้อความใหม่ (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); โปรดิวเซอร์ผู้ผลิต = ProducerBeansingleton.getInstance (). getProducer (); SENDRESULT SENDRESULT = PROKER.SEND (ข้อความ); if (sendresult! = null) {logger.info ("doubleelevenmidservice.Send MQ MESSAUNCE ประสบความสำเร็จ! หัวข้อคือ:";} else {logger.warn ("doubleelevenmidservice.sendresult เป็น null ...... ");}} "+e.getMessage (), e); thread.sleep (1,000); // หากมีข้อยกเว้นให้นอนหลับ 1 วินาที}รหัสที่ส่งข้อความจะต้องจับข้อยกเว้นมิฉะนั้นจะถูกส่งซ้ำ ๆ
หัวข้อที่นี่ถูกสร้างขึ้นด้วยตัวเอง Elevenmessage เป็นเนื้อหาที่จะส่ง ฉันเป็นวัตถุที่ฉันสร้างขึ้นด้วยตัวเอง
3. ผู้บริโภค
กำหนดค่าคลาสเริ่มต้น:
@configuration@conditionalOnProperty (value = "consumerConfig.enabled", มีค่า = "true", matchifMissing = true) คลาสสาธารณะ consumerConfig {logger ส่วนตัว logger = loggerFactory.getLogger @Bean Public ConsumerFortory () {// ผู้บริโภคที่แตกต่างกันไม่สามารถเปลี่ยนชื่อคุณสมบัติได้ที่นี่ผู้บริโภค = คุณสมบัติใหม่ (); ConsumerProperties.SetProperty (Propertykeyconst.consumerid, mqconfig.consumer_id); ConsumerProperties.SetProperty (PropertyKeyConst.AccessKey, mqconfig.access_key); ConsumerProperties.SetProperty (Propertykeyconst.secretkey, mqconfig.secret_key); //consumerProperties.SetProperty(PropertyKeyConst.ConsumetHreadNums,MQCONFIG.THREAD_NUM); ConsumerProperties.SetProperty (Propertykeyconst.onsaddr, mqconfig.onsaddr); ผู้บริโภคผู้บริโภค = onsfactory.createConsumer (ผู้บริโภค); consumer.subscribe (mqconfig.topic, mqconfig.tag, ใหม่ doubleelevenmessagelistener ()); // ใหม่ผู้ฟังที่สอดคล้องกัน consumer.start (); logger.info ("consumerconfig เริ่มประสบความสำเร็จ"); ส่งคืนผู้บริโภค; -คุณต้องเลือก CID และ Onsaddr ที่เหมาะสม คุณสามารถกำหนดค่าได้ที่นี่โดยใช้จำนวนเธรดผู้บริโภคของคุณเอง ฯลฯ
สร้างคลาส Listener Message และใช้ข้อความ:
@ComponentPublic คลาส MessageListener ใช้ MessageListener {Private Logger Logger = LoggerFactory.getLogger ("เตือน"); ได้รับการป้องกันแบบคงที่สิบเอ็ดสิบเอ็ดสิบเอ็ด; @Resource โมฆะสาธารณะ setelevenreposit (Elevenreposit Elevenreposit) {Messagelistener .elevenreposit = Elevenreposit; } @Override การใช้การกระทำสาธารณะ (ข้อความข้อความ, consumerContext ConsumContext) {ถ้า (message.getTopic (). เท่ากับ ("หัวข้อของตัวเอง")) {// หลีกเลี่ยงการบริโภคข้อความอื่น ๆ String res = สตริงใหม่ (body); // res เป็นเนื้อหาข้อความที่ส่งโดยผู้ผลิต // รหัสธุรกิจ} else {logger.warn ("!"); }} catch (exception e) {logger.error ("MessageListener.consume ข้อผิดพลาด:" + E.getMessage (), e); } logger.info ("MessageListener.receive Message"); // หากคุณต้องการทดสอบฟังก์ชั่นของการโพสต์ข้อความใหม่คุณสามารถแทนที่ Action.CommitMessage ด้วย action.reconsumelater return action.CommitMessage; } else {logger.warn (); return action.reconsumelater; -โปรดทราบว่าเนื่องจากผู้บริโภคมีหลายเธรดวัตถุจะต้องถูกฉีดด้วยการตั้งค่าแบบคงที่+เพื่อเพิ่มระดับวัตถุไปยังกระบวนการเพื่อให้สามารถใช้หลายเธรดได้ แต่วิธีการและตัวแปรของคลาสแม่ไม่สามารถเรียกได้
สถานะผู้บริโภคสามารถตรวจสอบว่าผู้บริโภคเชื่อมต่อสำเร็จไม่ว่าจะเป็นการล่าช้าหรือไม่ความเร็วในการบริโภค ฯลฯ
การรีเซ็ตไซต์การบริโภคสามารถล้างข้อความทั้งหมดได้
3. สิ่งที่ควรทราบ
1. เนื้อหาข้อความสูงสุดที่ส่งคือ 256KB
2. ข้อความมีอยู่นานถึง 3 วัน
3. จำนวนเธรดเริ่มต้นที่ด้านผู้บริโภคคือ 20
4. ถ้า Java วางสายหรือ CPU มีจำนวนที่สูงมากในระหว่างการวิ่งคุณสามารถส่งเธรดสำหรับ 1s ของ 1,000 ข้อความทุกข้อความเมื่อส่ง
5. เมื่อการทดสอบหรือเริ่มต้นในพื้นที่ให้แทนที่ Onsaddr ด้วยเครือข่ายสาธารณะมิฉะนั้นข้อผิดพลาดจะไม่เริ่มขึ้น
ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่ามันจะเป็นประโยชน์ต่อการเรียนรู้ของทุกคนและฉันหวังว่าทุกคนจะสนับสนุน wulin.com มากขึ้น