1. Alibaba Cloud 공식 웹 사이트 --- 도움말 문서
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
공식 웹 사이트의 단계를 따라 주제를 만들고 게시를 신청하고 (프로듀서) 구독을 신청하십시오 (소비자)
2. 코드
1. 구성 :
공개 클래스 mqconfig {/** * 테스트를 시작하기 전에 다음 xxx를 교체하십시오 */public static final String public_topic = "test"; // public static final String public_producer_id = "pid_scheduler"; public static final String public_consumer_id = "cid_service"; 공개 정적 최종 문자열 access_key = "123"; 공개 정적 최종 문자열 Secret_key = "123"; 공개 정적 최종 문자열 태그 = ""; 공개 정적 최종 문자열 thread_num = "25"; // 소비자 스레드 수/*** onsaddr 다른 지역에 따라 구성하십시오* 공개 네트워크 테스트 : http://onsaddr-internet.aliyun.com/rocketm/nsaddr4clientinternet* 퍼블릭 클라우드 생산 : http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * hangzhou 금융 클라우드 : http://jbponsadddr-internalal.aliyun.com:8080/rocketmq/nsaddr4client-internal * shenzhen Cloud : http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */public static final String onsaddr = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client internal";};OnSaddr Alibaba Cloud는 퍼블릭 클라우드 제작을 사용하며 테스트는 공개 네트워크를 사용합니다.
다른 서비스는 다른 태그를 설정할 수 있지만 메시지 볼륨이 크면 새로운 주제를 만드는 것이 좋습니다.
2. 생산자
방법 1 : 방법 1
구성 파일 : producer.xml
<? xml version = "1.0"alcoding = "utf-8"?> <! doctype beans public "-// spring // dtd bean // en" "http://www.springframework.org/dtd/spring-beans.dtd"> <beans> <bean id = "produce"init-method = "startod" name = "properties"> <map> <Entry Key = "ProducerId"value = "" /> <!-PID, 교체-> <Entry Key = "accessKey"value = " /> <!-Access_Key, 교체-> <Entry Key ="SecretKey "Value =" "" "!-Secret_key, 대체-> <!-PropertyKeyConst.onsad Differture Recons에 대한 공개 네트워크 테스트를 제발하십시오. http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet 퍼블릭 클라우드 생산 : http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4clientinternal hangzhou financial cloud : http://jbponsadddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal shenzhen financial cloud : http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4clienternernal-> eletr key = onsaddr "onsaddr" value = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </property> </bean> </beans>
클래스의 글로벌 설정에서 설정된 시작 방법 1 :
// 생산자 개인 애플리케이션 콘텐츠 CTX를 초기화합니다. 민간 생산자 생산자; @Value ( "$ {produceRconfig.enabled}") // 스위치, 스프링 구성 항목, true, false 끄기 개인 부울 ProducerConfigenabled; @PostConstruct public void init () {if (true == produceRconFigEnabled) {ctx = new ClassPathXmlApplicationContext ( "producer.xml"); Producer = (ProducerBean) ctx.getBean ( "Producer"); }}추신 : 최근에 구덩이를 발견했습니다. 생산자가 위의 방법으로 시작되면 더 시작되면 FullGC가 발생합니다. 따라서 다음 주석 방법으로 변경하여 수동으로 시작하고 사용하는 곳에서 종료 될 수 있습니다.
방법 2 : 클래스 구성 (XML 필요 없음)
@ConfigurationPublic Class ProducerBeanConfig {@Value ( "$ {OpenServices.ons.ProduceRbean.ProduceRid}") 개인 문자열 producerId; @Value ( "$ {OpenServices.ons.producerbean.accesskey}") 개인 문자열 액세스키; @Value ( "$ {OpenServices.ons.producerbean.secretkey}") 개인 문자열 비밀; 민간 생산자 생산자 생산자; @Value ( "$ {OpenServices.ons.producerbean.onsaddr}") 개인 문자열 onsaddr; @Bean Public ProducerBean OneProducer () {ProducerBean ProducerBean = New ProducerBean (); 속성 속성 = 새로운 속성 (); Properties.SetProperty (PropertyKeyConst.ProduceRid, ProducerId); Properties.SetProperty (PropertyKeyConst.Accesskey, AccessKey); Properties.SetProperty (PropertyKeyConst.Secretkey, SecretKey); properties.setProperty (PropertyKeyConst.onsAddr, onsaddr); ProducerBean.SetProperties (속성); 귀환 생산국; }}PS : 이중 11 이후, 위의 두 가지 방법은 큰 데이터 볼륨과 다중 스레딩 상황에 적합하지 않으며 성능이 매우 열악하므로 3을 사용하는 것이 좋습니다.
방법 3 : (XML이 필요하지 않음)
@ComponentPublic Class ProducerBeanSingleton {@Value ( "$ {OpenServices.ons.ProduceRbean.ProducerId}") 개인 문자열 producerId; @Value ( "$ {OpenServices.ons.producerbean.accesskey}") 개인 문자열 액세스키; @Value ( "$ {OpenServices.ons.producerbean.secretkey}") 개인 문자열 비밀; @Value ( "$ {OpenServices.ons.producerbean.onsaddr}") 개인 문자열 onsaddr; 개인 정적 생산자 프로듀서; 개인 정적 클래스 Singletonholder {Private STATIC Final ProducerBeansingLeTon 인스턴스 = New ProducerBeanSingleton (); } private ProducerBeansingEton () {} 공개 정적 최종 프로듀서 비안 링턴 GetInstance () {return singletonholder.instance; } @postConstruct public void init () {// 생산자 인스턴스 구성 초기화 속성 속성 = 새로운 속성 (); // Producer ID Properties.SetProperty (PropertyKeyConst.ProduceRid, ProducerId); // AccessKey Alibaba Cloud 인증, 속성 작성 .setProperty (PropertyKeyConst.Accesskey, AccessKey); // Secretkey Alibaba Cloud 인증, 속성 작성 .setProperty (PropertyKeyConst.Secretkey, SecretKey); // Secretkey Alibaba Cloud 인증, 속성 작성 .setProperty (PropertyKeyConst.Secretkey, SecretKey); // 보내기 시간을 보내기 시간, 단위 밀리 초 특성을 설정합니다 .SetProperty (PropertyKeyConst.SendMsgTimeOutmillis, "3000"); // TCP 액세스 도메인 이름을 설정합니다 (여기서는 퍼블릭 클라우드 프로덕션 환경 참조) properties.setProperty (PropertyKeyConst.onsAddr, onsaddr); producer = onsfactory.createproducer (속성); // 메시지를 보내기 전에 시작 메소드를 호출하려면 프로듀서를 시작해야하며 Producer.start ()에게 한 번만 호출하면됩니다. } 공개 프로듀서 getProducer () {return Producer; }}봄 구성
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialectconsumerconfig.enabled = trueproduceRconfig.enabled = true #method 1 : enabled = false #메소드 2, 3 : /u516c/u7f51/u914d/u7f6eopenservices.ons.produceRbean.producerid = pidopenservices.ons.produceRbean.accesskey = OpenServices.ons.producerbean.secretkey = Openservices.ons.producerbean.secretkey = openservices.ons.ons.ons.ons.ons.ons.onsecretkeys. 항저우 퍼블릭 클라우드 생산
방법 1 메시지 코드 전달 :
try {string jsonc = jsonutils.tojson (elevenmessage); 메시지 메시지 = 새 메시지 (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); sendresult sendresult = producer.send (메시지); if (sendResult! = null) {logger.info ( ". MQ 메시지 성공을 보냅니다!";} else {logger.warn ( ". SendResult는 ......";}} catch (예외 e) {logger.warn ( "DoublevenvenAllPreservice"); thread.sleep (1000);방법 2 전달 메시지 코드 : (1000 번마다 시작/닫을 수 있음)
producerbean.start (); try {String jsonc = jsonutils.tojson (elevenmessage); 메시지 메시지 = 새 메시지 (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); sendresult sendresult = producer.send (메시지); if (sendResult! = null) {logger.info ( ". MQ 메시지 성공을 보냅니다!";} else {logger.warn ( ". SendResult는 ......";}} catch (예외 e) {logger.warn ( "DoublevenvenAllPreservice"); thread.sleep (1000); ProducerBean.shutdown ();방법 3 : 메시지를 전달하십시오
try {string jsonc = jsonutils.tojson (elevenmessage); 메시지 메시지 = 새 메시지 (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); Producer Producer = ProducerBeansingleton.getInstance (). getProducer (); sendresult sendresult = producer.send (메시지); if (sendResult! = null) {logger.info ( "d "+e.getMessage (), e); thread.sleep (1000); // 예외가 있으면 1 초 동안 수면}}메시지를 보내는 코드는 예외를 포착해야합니다. 그렇지 않으면 반복적으로 전송됩니다.
여기의 주제는 혼자서 만들어집니다. elevenmessage는 전송 될 내용입니다. 나는 내가 혼자서 만든 대상이다.
3. 소비자
시작 클래스 구성 :
@configuration@conditionAlonProperty (value = "considerConfig.enabled", hadingValue = "true", matchifmissing = true) public class considerconfig {private logger = loggerfactory.getLogger (loggerappenderType.smsdist.name ()); @Bean Public Consumer ConsumerFactory () {// 다른 소비자는 속성 이름을 여기에 소비자 프 로피트 = 새로운 속성 ()로 바꿀 수 없습니다. ConsumerProperties.SetProperty (PropertyKeyConst.ConsumerId, mqConfig.consumer_id); ConsumerProperties.SetProperty (PropertyKeyConst.Accesskey, mqConfig.access_key); ConsumerProperties.SetProperty (PropertyKeyConst.Secretkey, MQConfig.secret_key); //consumerProperties.setProperty(propertykeyConst.consumeThreadnums,MqConfig.thread_num); ConsumerProperties.SetProperty (PropertyKeyConst.OnsAddr, mqConfig.onsaddr); 소비자 소비자 = onsfactory.createConsumer (소비자 - 프로파르); Consumer.Subscribe (mqconfig.topic, mqconfig.tag, new doublevenmessagelistener ()); // 새로운 해당 청취자 소비자 .Start (); logger.info ( "ConsiderConfig 시작 성공."); 소비자를 반환하십시오. }}올바른 CID와 OnSADDR을 선택해야합니다. 자신의 소비자 스레드 수 등을 사용하여 여기에서 구성 할 수 있습니다.
메시지 리스너 클래스를 만들고 메시지를 소비합니다.
@ComponentPublic Class MessageListener는 MessageListener를 구현합니다. 보호 된 정적 정전기의 11 개의 반복성; @Resource public void setelevenReposit (elevenvenposit elevenReposit) {messagelistener .elevenReposit = elevenbeposit; } @override public Action Sopcomption (메시지 메시지, 소비자 텍스트 CONPONECONTEXT) {if (message.getTopic (). Equals ( "Own Topic")) {// 다른 메시지 소비를 피하십시오 json 변환 오류 {byte [] body = message.getBody (); 문자열 res = 새 문자열 (body); // res는 생산자가 보낸 메시지 내용입니다 // 비즈니스 코드} else {logger.warn ( "!"); }} catch (예외 e) {logger.error ( "messagelistener.consume error :" + e.getMessage (), e); } logger.info ( "MessageListener.receive 메시지"); // 메시지 reposting의 함수를 테스트하려면 action.commitMessage를 action.reconsumelater return action.commitmessage로 바꿀 수 있습니다. } else {logger.warn (); 반환 action.reconsumelater; }}소비자는 다중 스레드이기 때문에 객체는 객체 레벨을 프로세스로 올리기 위해 정적+세트를 주입하여 여러 스레드를 공유 할 수 있지만 상위 클래스의 메소드와 변수를 호출 할 수 없습니다.
소비자 상태는 소비자가 성공적으로 연결되어 있는지, 소비가 지연되었는지, 소비 속도 등을 확인할 수 있습니다.
소비 사이트를 재설정하면 모든 메시지를 지울 수 있습니다
3. 주목할만한 것들
1. 전송 된 최대 메시지 본체는 256KB입니다
2. 메시지는 최대 3 일 동안 존재합니다
3. 소비자 쪽의 기본 스레드 번호는 20입니다.
4. Java가 전화를 끊거나 CPU가 실행 중에 매우 많은 양을 차지하면 보낼 때 1,000 개의 메시지 중 1 초에 스레드를 보낼 수 있습니다.
5. 로컬 테스트 또는 시작시 OnSADDR을 공개 네트워크로 바꾸면 오류가 시작되지 않습니다.
위는이 기사의 모든 내용입니다. 모든 사람의 학습에 도움이되기를 바랍니다. 모든 사람이 wulin.com을 더 지원하기를 바랍니다.