1. Prefácio
Recentemente, a empresa tem a necessidade de usar a fila de mensagens em nuvem do Alibaba. Para tornar mais conveniente usar, passei alguns dias encapsulando a fila de mensagens no método de chamada de API para facilitar a chamada do sistema interno. Foi concluído agora. Aqui registramos o processo e as tecnologias relevantes usadas e compartilhamos com você.
A Alibaba Cloud agora fornece dois serviços de mensagem: serviço MNS e serviço ONS. Eu acho que o MNS é uma versão simplificada do ONS, e o consumo de mensagens do MNS requer estratégias de pesquisa personalizadas. Por outro lado, as funções do modo de publicação e assinatura da ONS são mais poderosas (por exemplo, em comparação com o MNS, ONS fornece rastreamento de mensagens, log, monitoramento e outras funções), e sua API é mais conveniente de usar. Também foi ouvido que o Alibaba não desenvolverá mais MNs no futuro, mas apenas o manterá. O Serviço ONS substituirá gradualmente o serviço MNS e se tornará o principal produto do serviço de mensagens do Alibaba. Portanto, se houver necessidade de usar filas de mensagens, é recomendável não usar o MNS novamente. Usar o ONS é a melhor escolha.
Técnicas envolvidas: primavera, reflexão, proxy dinâmico, serialização de Jackson e deserialização
Antes de ler o artigo a seguir, você precisa ler a documentação acima para entender os conceitos relevantes (tópico, consumidor, produtor, tag etc.) e as implementações simples de envio e recebimento de código fornecidas na documentação.
Esta postagem do blog é apenas para amigos que têm uma base de conhecimento em filas de mensagens. Estou naturalmente muito feliz em ajudar a todos. Não repreenda ninguém que não entenda, pois significa que seu caminho está errado.
2. Plano de projeto
1. Envio de mensagens
Em uma arquitetura CSS simples, assumindo que o servidor ouvirá a mensagem enviada por um produtor de tópicos, ele deve primeiro fornecer uma API do cliente. O cliente precisa simplesmente chamar a API e pode produzir mensagens através do produtor.
2. Recepção de mensagens
Como a API é formulada pelo servidor, o servidor, é claro, também sabe como consumir essas mensagens.
Nesse processo, o servidor realmente desempenha o papel dos consumidores, e o cliente realmente desempenha o papel dos produtores, mas as regras para os produtores produzirem mensagens são formuladas pelos consumidores para atender às necessidades de consumo de consumidores.
3. O objetivo final
Queremos criar um pacote de jar separado chamado Core-Core para fornecer implementações específicas de dependências e publicar assinaturas para produtores e consumidores.
3. Envio de mensagens
1. Os consumidores fornecem interfaces
@Topic (name = "kdyzm", produtora = "kdyzm_producer") interface pública userQueueResource {@Tag ("test1") public void handleUserinfo (@Body @Key ("userInfoHandler") UserModel UserModel); @Tag ("test2") public void handleUserinfo1 (@body @key ("userInfoHandler1") UserModel User);}Como o tópico e o produtor estão no relacionamento n: 1, o produtor é usado diretamente como uma propriedade do tópico; A tag é uma condição de filtragem muito crítica, e os consumidores a usam para classificar as mensagens para executar diferentes processamento de negócios; portanto, a tag é usada como condição de roteamento aqui.
2. O produtor envia mensagens usando a API fornecida pelo consumidor
Como os consumidores fornecem apenas interfaces para os produtores usarem, não há como usar interfaces diretamente porque não há como instanciá -las. Aqui usamos proxy dinâmico para gerar objetos. Na API fornecida pelos consumidores, adicione a seguinte configuração para facilitar os produtores a importar diretamente a Config e usá -la. Aqui usamos a Spring Config com base no Java. Por favor, saiba.
@ConfigurationPublic Classe QueuECONFIG {@AUTOWIRED @Bean Public UserQueUeReRce UserQueueResource () {return QueueReSourceFactory.CreateProxyqueueReSource (userQueueReSource.class); }}3. Encapsulamento do Corore da fila para o envio de mensagens do produtor
Todas as anotações em 1 acima (tópico, tag, corpo, chave) e as classes queusourceFactory usadas em 2 devem ser definidas na fila-core. A definição da anotação define apenas as regras. A implementação real está na verdade no QueueResourceFactory.
importar java.lang.reflect.invocationHandler; importar java.lang.reflect.method; importar java.lang.reflect.proxy; importar org.slf4j.logger; importação; com.aliyun.openservices.ons.api.producer; importar com.aliyun.openservices.ons.api.sendResult; importar com.wy.queue.core.api.mqconnection; import com.wyue.core.utils.jacksoSerializer; import.wy; com.wy.queue.core.utils.queuecorespringUtils; public classe queueResourceFactory implementos InvocationHandler {Private Static Final Logger Logger = LoggerFactory.getLogger (QueueResourceFactory.class); String privada tópicoName; ProducerID de cordas privadas; serializador privado de jacksonserializer = new JackSoSerializer (); Prefix de sequência final estática privada = "pid_"; public queueResourceFactory (String tópicoName, String ProderId) {this.topicName = tópicoName; this.ProducerId = ProderID; } public static <t> t createproxyQueueResource (classe <T> clazz) {string tópicoName = mqutils.gettopicname (clazz); String ProderId = mQutils.getProducerId (clazz); T Target = (t) proxy.NewProxyInstance (QueueResourceFactory.class.getClassLoader (), nova classe <?> [] {Clazz}, new QueueResourceFactory (TopicName, ProderID)); alvo de retorno; } @Override Public Object Invoke (proxy do objeto, método do método, objeto [] args) lança arremesso {if (args.length == 0 || args.length> 1) {lança nova runimeException ("Aceite apenas um param na interface QueueResource."); } String tagname = mqutils.getTagName (método); ProderFactory ProderFactory = QueuecorespringUtils.getBean (ProdunerFactory.class); MQConnection ConnectionInfo = QueuecorespringUtils.getBean (mQConnection.class); Produtor Produtor = ProderFactory.CreateProduced (Prefixo+ConnectionInfo.getPrefix ()+"_"+ProducerID); //Send message Message msg = new Message( // // The Topic created in the console, that is, the Topic name to which the message belongs. connectionInfo.getPrefix()+"_"+topicName, // Message Tag, // It can be understood as a tag in Gmail, and the message is reclassified to facilitate Consumer to specify filtering conditions to filter tagName on the MQ server, // Message Body // Any binary Forma de dados, o MQ não interfere em nenhum, // produtor e consumidor são necessários negociam um serializador de serialização e deserialização consistente.Serialize (args [0]). getBytes ()); SendResult SendResult = Producer.send (msg); Logger.info ("Enviar sucesso da mensagem. ID da mensagem é:" + sendResult.getMessageId ()); retornar nulo; }}Aqui, publicamos especialmente o pacote personalizado e os nomes de pacotes usados por terceiros para facilitar a distinção.
O que exatamente são feitos aqui?
O processo de envio de uma mensagem é criar um objeto proxy no proxy dinâmico. O objeto será interceptado ao chamar o método. Primeiro, analise todas as anotações, como TopicName, ProderID, TAG e outras informações importantes das anotações, e depois ligue para o Alibaba SDK para enviar a mensagem. O processo é muito simples, mas observe que, ao enviar mensagens aqui, é dividido em ambientes. De um modo geral, a empresa agora distingue três ambientes: controle de qualidade, estadiamento e produto. Entre eles, controle de qualidade e estadiamento são ambientes de teste. Para filas de mensagens, também existem três anéis. No ambiente, no entanto, os ambientes de controle de qualidade e estadiamento geralmente usam a mesma conta do Alibaba para reduzir os custos, para que o tópico e o produto criado sejam colocados na mesma área. Dessa maneira, o tópico do nome com o mesmo nome não pode existir, portanto, o prefixo do ambiente é adicionado para distingui -los, como qa_topicname, PID_STAGING_PRODUBERID, etc.; Além disso, o Core-Core fornece uma interface MQConnection para obter informações de configuração, e os serviços de produtores precisam apenas implementar essa interface.
4. Produtor envia mensagens
@AUTOWIRED PRIVADO USUSERESOURESOURCE UserQueUeResource; @Override public void sendMessage () {userModel userModel = new UserModel (); UserModel.SetName ("Kdyzm"); UserMermodel.setage (25); userQueueResource.HandleUserinfo (UserModel); }São necessárias apenas algumas linhas de código para enviar a mensagem para o tópico especificado, que é muito mais fino que o código de envio nativo.
4. Consumo de notícias
Comparado com o envio de mensagens, o consumo de mensagens é mais complicado.
1. Design de consumo de mensagens
Como tópico e consumidor são n: n relacionamento, o consumerid é colocado no método de implementação específico do consumidor
@Controlador@queueReReCepublic Classe userQueueResourceImpl implementa UserQueueResource {private logger logger = LoggerFactory.getLogger (thisgetclass ()); @Override @ConsumerAnnotation ("kdyzm_consumer") public void handleuserinfo (usuário do userModel) {Logger.info ("Mensagem 1 recebida: {}", new Gson (). Tojson (User)); } @Override @ConsumerAnnotation ("kdyzm_consumer1") public void handleUserinfo1 (usuário do userModel) {Logger.info ("Mensagem 2 recebida: {}", new GSON (). Tojson (User)); }}Aqui estão duas novas anotações @queueResource e @ConsumerArannotation. Essas duas anotações serão discutidas no futuro. Alguém pode me perguntar por que eu deveria usar o nome Consumanotation em vez do nome do consumidor, porque o nome do consumidor entra em conflito com o nome no SDK fornecido por Aliyun. . . .
Aqui, os consumidores fornecem a interface da API aos produtores para facilitar os produtores a enviar mensagens, e os consumidores implementam a interface para consumir mensagens enviadas pelos produtores. Como implementar a interface da API é implementar o monitoramento, que é uma lógica relativamente crítica.
2.Queuue-core implementa a lógica principal da fila de mensagens.
Etapa 1: Use o método de escuta do recipiente de mola para obter todos os grãos com anotações de QueueResource
Etapa 2: Distribua o feijão de processamento
Como lidar com esses feijões? Cada feijão é na verdade um objeto. Com um objeto, como o objeto UserQueUeResourceImpl no exemplo acima, podemos obter o objeto ByteCode da interface implementado pelo objeto e, em seguida, obter as anotações na interface UserQueuerousCE e as anotações sobre os métodos e métodos. Obviamente, também podem ser obtidas as anotações sobre o método de implementação do userQueUeResourceImpl. Aqui vou usar o consumerid como chave, e as informações relevantes restantes são encapsuladas como valor e armazenadas em cache em um objeto de mapa. O código principal é o seguinte:
Classe <?> Clazz = resourceimpl.getclass (); Classe <?> Clazzif = clazz.getInterfaces () [0]; Método [] métodos = clazz.getMethods (); String tópiconame = mqutils.gettopicname (clazzif); para (Método M: Métodos) {ConsumanTation Consumanno = M.GetAnnotation (ConsumanTation.class); if (null == Consumanno) {// logger.error ("método = {} precisa de anotação do consumidor.", m.getName ()); continuar; } String consumerId = consumanno.value (); if (stringUtils.isEmpty (consuerId)) {Logger.error ("Method = {} ConsumerID não pode ser nulo", m.getName ()); continuar; } Classe <?> [] ParameterTypes = m.getParameterTypes (); Método ResourceIfMethod = NULL; tente {ResourceIfMethod = clazzif.getMethod (m.getName (), parameterTypes); } catch (noschmethodException | Segurança e) {Logger.error ("Não é possível encontrar o método = {} em super interface = {}.", m.getName (), clazzif.getCanonicalName (), e); continuar; } String tagname = mqutils.getTagName (ResourceIfMethod); consumersmap.put (ConsuerId, new MethodInfo (TopicName, Tagname, M)); }Etapa 3: Ações de consumo através da reflexão
Primeiro, determine o momento da execução da ação de reflexão, ou seja, ouça novas mensagens
Em segundo lugar, como executar ações de reflexão? Eu não vou entrar em detalhes. Os sapatos infantis com fundações relacionadas ao reflexo sabem como fazê-las. O código principal é o seguinte:
MQConnection ConnectionInfo = QueuecorespringUtils.getBean (mQConnection.class); String topicPrefix = ConnectionInfo.getPrefix ()+"_"; String consumerIdPrefix = prefixo+ConnectionInfo.getPrefix ()+"_"; para (String ConsumerId: consumersmap.KeySet ()) {MethodInfo MethodInfo = consumersmap.get (ConsumerId); Propriedades ConnectionProperties = ConverttoProperties (ConnectionInfo); // ID do consumidor que você criou no Console ConnectionProperties.put (PropertyKeyConst.ConsumerId, ConsumerIdPrefix+ConsumerId); Consumidor consumidor = onsfactory.createConsumer (ConnectionProperties); Consumer.subScribe (tópicoPrefix+MethodInfo.gettopicName (), Methodinfo.getTagName (), new Messagelistener () {// Subscribe várias tags de ação pública (Mensagem (Mensagem, UtContext context) {Try {String MessageBodyBodyBodyBodyBodyBody (message.get.getfOn, "UtContext context) {Try {String MessageBody = tópico = {}, tag = {}, consumerId = {}, message = {} ", tópicoPrefix+Methodinfo.gettopicName (), Methodinfo.getTagName (), ConsumerIdPrefix+ConsumerId, Message); Método = Metodinfo.getMethod (); Method.getPeRSeTerTypes () [0]; consumer.start (); Logger.info ("Consumer = {} começou.", ConsumerIdPrefix+ConsumerId); }5. Veja o link Git abaixo para o código completo
https://github.com/kdyzm/queue-core.git
O exposto acima é todo o conteúdo deste artigo. Espero que seja útil para o aprendizado de todos e espero que todos apoiem mais o wulin.com.