1. Site oficial da Alibaba Cloud --- Help Document
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
Siga as etapas do site oficial para criar um tópico, solicitar publicação (produtor) e solicitar assinatura (consumidor)
2. Código
1. Configuração:
classe pública mqconfig {/** * Substitua o seguinte xxx antes de iniciar o teste */public static final string public_topic = "test"; // public static string final public_producer_id = "pid_scheduler"; public static final string public_consumer_id = "cid_service"; public static final string access_key = "123"; public static final string secret_key = "123"; public static final string tag = ""; public static final string thread_num = "25"; // número de threads de consumo/*** onsaddr, configure de acordo com diferentes regiões* Teste de rede pública: http://onsaddr-nenternet.aliyun.com/rocketmq/nsaddr4client-internet* Produção pública de nuvem: produção pública: Produção pública: http://onsaddr-nenternal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-inzhenzhen.com:8080/rocketmq: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */ public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";}Onsaddr Alibaba Cloud usa a produção em nuvem pública e os testes usam rede pública
Serviços diferentes podem definir tags diferentes, mas se o volume da mensagem for grande, é recomendável criar um novo tópico.
2. Produtor
Método 1:
Arquivo de configuração: Producer.xml
<? xml versão = "1.0" coding = "utf-8"?> <! name = "Propriedades"> <Pap> <entradas key = "produterID" value = "" /> <!-pid, substitua-> <entradas key = "accessKey" value = "" /> <!-access_key, substitua-> <-key = "secretKey" http://onsaddr-nenternet.aliyun.com/rocketmq/nsaddr4client-internet Produção em nuvem pública: http://onsaddr-nenternal.aliyun.com:8080/rocketmq/nsaddr4client-nalternal Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal Shenzhen Cloud: http://mq4finance-sz.addr.aliyun.com: value = "http://onsaddr-nenternal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </silper
Método de inicialização 1, definido no cenário global da classe:
// Inicialize o produtor ApplicationContext CTX; produtor privado produtor; @Value ("$ {ProducterConfig.enabled}") // Switch, Item de configuração da mola, True está ligado, FALSE DESLIGUE OS DESLIGUEIROS PRODUTERCONFIGENLADOS DE BOOLEANOS PRIVADOS; @PostConstruct public void init () {if (true == ProducMerConfigEnabled) {ctx = new ClassPathXMLApplicationContext ("produtor.xml"); Produtor = (ProdutorBean) CTX.getBean ("Produtor"); }}PS: Eu descobri recentemente um poço. Se o produtor for iniciado no método acima, uma vez que ele iniciar mais, causará o FullGC. Portanto, você pode mudar para o seguinte método de anotação para iniciar manualmente e desligar onde o usa.
Método 2: Configurar a classe (não é necessário XML)
@ConfigurationPublic Class ProducerBeanConfig {@Value ("$ {OpenServices.ons.producerbean.producerId}") Private String ProdunerID; @Value ("$ {openServices.ons.producerBean.accessKey}") private string accessKey; @Value ("$ {openServices.ons.producerbean.secretkey}") private string secretKey; Produtor Produtor Privador Produtor; @Value ("$ {OpenServices.ons.producerbean.onsaddr}") String privada Onsaddr; @Bean Public ProderBean OneProducer () {ProducerBean ProducerBean = new ProduclerBean (); Propriedades Propriedades = new Properties (); Properties.SetProperty (PropertyKeyConst.ProducerId, ProderID); Properties.SetProperty (PropertyKeyConst.AccessKey, AccessKey); Propriedades.SetProperty (PropertyKeyConst.SecretKey, SecretKey); Properties.SetProperty (PropertyKeyConst.Onsaddr, Onsaddr); ProducerBean.SetProperties (Propriedades); produtor de retorno; }}PS: Após esse dobro 11, verificou-se que os dois métodos acima não são muito adequados para grandes situações de volume de dados e multi-threading, e o desempenho é muito ruim, por isso é recomendável usar 3.
Método 3: (Nenhum XML é necessário)
@ComPonenPublic Class Proderbeansingleton {@Value ("$ {OpenServices.ons.producerbean.producerId}") Private String ProderID; @Value ("$ {openServices.ons.producerBean.accessKey}") private string accessKey; @Value ("$ {openServices.ons.producerbean.secretkey}") private string secretKey; @Value ("$ {OpenServices.ons.producerbean.onsaddr}") String privada Onsaddr; Produtor estático privado Produtor; Classe estática privada SingletoNholder {Private Producerleningleton Instância de Produceansingleton = New ProducerBeansingleton (); } Producer ProducerBeansingleton () {} public static final ProducerBeansingleton getInstance () {return singletonholder.instance; } @PostConstruct public void init () {// Produtor Propriedades da Inicialização da Configuração de Instância do Produtor Propriedades = new Properties (); // Produtor ID Properties.SetProperty (PropertyKeyCnst.ProducerId, ProderID); // AccessKey Alibaba Cloud Authentication, crie Properties.SetProperty (PropertyKeyConst.AccessKey, AccessKey); // SecretKey Alibaba Cloud Authentication, crie Properties.SetProperty (PropertyKeyConst.SecretKey, SecretKey); // SecretKey Alibaba Cloud Authentication, crie Properties.SetProperty (PropertyKeyConst.SecretKey, SecretKey); // Defina o tempo de envio no tempo limite, unidade milissegunds Properties.setProperty (PropertyKeyConst.SendmsgTimeoutMillis, "3000"); // Defina o nome do domínio de acesso TCP (consulte o ambiente de produção de nuvem pública como exemplo aqui) Properties.SetProperty (PropertyKeyConst.Onsaddr, Onsaddr); Produtor = OnsFactory.CreateProducer (Propriedades); // Antes de enviar uma mensagem, você deve ligar para o método inicial para iniciar o produtor e só precisará ligar para o produtor.start (); } produtor público getProducer () {return Producer; }}Configuração da primavera
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialectConsumerConfig.enabled = trueproduCerConfig.enabled = true #method 1: cronograma.enabled = false #Método 2, 3: RocketMQ /u516C/u7F51/u914D/u7F6Eopenservices.ons.producerBean.producerId = pidopenservices.ons.producerBean.accessKey = openservices.ons.producerBean.secretKey = openservices.ons.producerBean.secretKey = OpenServices.Ons.ProducerBean.Onsaddr = Public Network, Hangzhou Public Cloud Production
Método 1 Entregue o código da mensagem:
tente {string jsonnc = jsonutils.tojson (elevenmessage); Mensagem mensagem = nova mensagem (mqconfig.topic, mqconfig.tag, jsonnc.getbytes ()); SendResult sendResult = produtora.send (mensagem); if (sendResult! = null) {Logger.info (". Envie o sucesso da mensagem do MQ!";} else {Logger.warn (". SendResult é nulo ......");}} Catch (Exceção e) {Logger.warn ("duplaLevenlPerService Ano); Thread (1000);Método 2 Código da mensagem de entrega: (pode ser iniciado/fechado a cada 1000 vezes)
ProdunerBean.Start (); Try {String jsonC = jsonutils.tojson (Elevenmessage); Mensagem mensagem = nova mensagem (mqconfig.topic, mqconfig.tag, jsonnc.getbytes ()); SendResult sendResult = produtora.send (mensagem); if (sendResult! = null) {Logger.info (". Envie o sucesso da mensagem do MQ!";} else {Logger.warn (". SendResult é nulo ......");}} Catch (Exceção e) {Logger.warn ("duplaLevenlPerService Ano); Thread (1000); ProducerBean.shutdown ();Método 3: entregue a mensagem
tente {string jsonnc = jsonutils.tojson (elevenmessage); Mensagem mensagem = nova mensagem (mqconfig.topic, mqconfig.tag, jsonnc.getbytes ()); Produtor Produtor = ProducerBeansingleton.getInstance (). GetProducer (); SendResult sendResult = produtora.send (mensagem); if (sendResult! = null) {logger.info ("duploelevenmidservice.send MQ Mensagem Sucesso! "+e.getMessage (), e); Thread.Sleep (1000); // Se houver uma exceção, durma por 1 segundo}O código que envia a mensagem deve capturar a exceção, caso contrário, será enviada repetidamente.
O tópico aqui é criado por você mesmo. Elevenmessage é o conteúdo a ser enviado. Eu sou o objeto que criei sozinho.
3. Consumidores
Configure a aula de inicialização:
@Configuration@condicionalonProperty (value = "consumerConfig.enabled", tendovalue = "true", matchifmissing = true) public class Class ConsumerConfig {private logger Logger = LoggerFactory.getLogger (LogGerapPenderType.smsdist.name ()); @Bean Public Consumer ConsumerFactory () {// Diferentes consumidores não podem renomear propriedades aqui consumerProperties = new Properties (); ConsumerProperties.SetProperty (PropertyKeyCnst.ConsumerId, MQConfig.consumer_id); ConsumerProperties.SetProperty (PropertyKeyCnst.AccessKey, mqconfig.access_key); ConsumerProperties.SetProperty (PropertyKeyCnst.SecretKey, mqconfig.secret_key); //consumerProperties.setProperty(PropertyKeyConnst.ConsumethReadNums.MqConfig.thread_num); ConsumerProperties.SetProperty (PropertyKeyCnst.Onsaddr, mqconfig.onsaddr); Consumidor consumidor = onsfactory.createConsumer (ConsumerProperties); consumer.subscribe (mqconfig.topic, mqconfig.tag, new duploelevenMessageListener ()); // novo ouvinte correspondente consumer.start (); Logger.info ("ConsumerConfig inicia o sucesso."); retornar consumidor; }}Você precisa escolher o CID certo e o Onsaddr. Você pode configurá -lo aqui usando o seu próprio, contagem de threads de consumidores, etc.
Crie uma classe de mensagens da classe e consuma mensagens:
@ComPonentPublic Class Messagelistener implementa Messagelistener {private Logger Logger = LoggerFactory.getLogger ("Lembrete"); ElevenReposit ElevenReposit protegido; @Resource Public void setElevenReposit (ElevenReposit elevenReposit) {Messagelistener .ElevenReposit = elevenReposit; } @Override Public Action Consumo (mensagem da mensagem, ConsumerContext ConsumContext) {if (message.gettopic (). Equals ("próprio tópico")) {// Evite consumir outras mensagens JSON Erros de conversão Tente {byte [] body = message.getbody (); String res = new String (corpo); // res é o conteúdo da mensagem enviado pelo produtor // Código de negócios} else {Logger.warn ("!"); }} catch (Exceção e) {Logger.error ("Messagelistener.Consume Error:" + E.GetMessage (), e); } logger.info ("Messagelistener.receive Message"); // Se você deseja testar a função da reposição de mensagens, poderá substituir o Action.CommitMessage pelo Action.Reconsumelater Return Action.CommitMessage; } else {Logger.warn (); Return Action.ReConsumelater; }}Observe que, como os consumidores são multithreads, o objeto precisa ser injetado com o estático+definido para aumentar o nível do objeto para o processo, para que vários threads possam ser compartilhados, mas os métodos e variáveis da classe pai não podem ser chamados.
O status do consumidor pode verificar se o consumidor está conectado com sucesso, se o consumo está atrasado, a velocidade do consumo etc.
Redefinir o site de consumo pode limpar todas as mensagens
3. Coisas para observar
1. O corpo máximo de mensagem enviado é de 256kb
2. A mensagem existe por até 3 dias
3. O número padrão de threads no lado do consumidor é 20
4. Se Java desligar ou a CPU ocupar uma quantidade extremamente alta durante a corrida, você pode enviar o thread para 1s de cada 1.000 mensagens ao enviá -lo.
5. Ao testar ou inicialização local, substitua o ONSADDR por uma rede pública, caso contrário, o erro não será iniciado.
O exposto acima é todo o conteúdo deste artigo. Espero que seja útil para o aprendizado de todos e espero que todos apoiem mais o wulin.com.