1. Предисловие
В последнее время компания имеет необходимость использовать очередную очередь Alibaba Cloud. Чтобы сделать его более удобным в использовании, я потратил несколько дней, инкапсулируя очередь сообщений в метод вызова API, чтобы облегчить вызов внутренней системы. Это было завершено сейчас. Здесь мы записываем процесс и соответствующие технологии, используемые и делимся с вами.
Alibaba Cloud теперь предоставляет два службы сообщения: MNS Service и ONS Service. Я думаю, что MNS - это упрощенная версия ONS, а потребление сообщений MNS требует пользовательских стратегий опроса. Напротив, функции режима публикации и подписки ONS являются более мощными (например, по сравнению с MNS, ONS обеспечивает отслеживание сообщений, ведение журнала, мониторинг и другие функции), а его API более удобен в использовании. Также было слышно, что у Alibaba больше не будет развиваться MNS в будущем, а только поддерживать его. Сервис ONS постепенно заменят услугу MNS и станет основным продуктом службы сообщений Alibaba. Поэтому, если есть необходимость использовать очереди сообщения, рекомендуется больше не использовать MNS. Использование ONS - лучший выбор.
Причастные методы: пружина, размышление, динамический прокси, сериализация Джексона и десериализация
Перед прочтением следующей статьи вам необходимо прочитать вышеуказанную документацию, чтобы понять соответствующие концепции (тема, потребитель, производитель, тег и т. Д.) И простую отправку и получение реализаций кода, представленных в документации.
Этот пост в блоге предназначен только для друзей, у которых есть база знаний в очереди сообщениях. Я, естественно, очень рад помочь всем. Не ругайте никого, кто это не понимает, так как это означает, что ваш путь неверен.
2. План проектирования
1. Отправка сообщения
В простой архитектуре CSS, предполагая, что сервер будет прослушать сообщение, отправленное производителем темы, он должен сначала предоставить клиент API. Клиент должен просто позвонить в API и может производить сообщения через производителя.
2. Прием сообщения
Поскольку API сформулирован сервером, сервер, конечно, также знает, как потреблять эти сообщения.
В этом процессе сервер фактически играет роль потребителей, и клиент фактически играет роль производителей, но правила для производителей для производства сообщений сформулированы потребителями для удовлетворения потребностей потребителей.
3. конечная цель
Мы хотим создать отдельный пакет JAR с именем queue-core для предоставления конкретных реализаций зависимостей и публиковать подписки для производителей и потребителей.
3. Сообщение отправки
1. Потребители предоставляют интерфейсы
@Topic (name = "kdyzm", provureerid = "kdyzm_producer") публичный интерфейс userqueuersource {@tag ("test1") public void handleuserinfo (@body @key ("userinfohandler") usermodel user); @Tag ("test2") public void harderuserinfo1 (@body @key ("userinfohandler1") usermodel user);}Поскольку тема и продюсер находятся в отношениях N: 1, производительность напрямую используется в качестве свойства темы; Теги - это очень критическое условие фильтрации, и потребители используют его для классификации сообщений для выполнения различной бизнес -обработки, поэтому тег используется в качестве условия маршрутизации здесь.
2. Производитель отправляет сообщения с использованием API, предоставленного потребителем
Поскольку потребители предоставляют интерфейсы только для использования производителями, невозможно использовать интерфейсы непосредственно, потому что их невозможно создать экземпляр. Здесь мы используем динамический прокси для генерации объектов. В API, предоставленном потребителями, добавьте следующую конфигурацию, чтобы облегчить производителям напрямую импортировать конфигурацию и использовать его. Здесь мы используем пружину на основе Java. Пожалуйста, знайте.
@ConfigurationPublic Class QueueConfig {@autowired @bean public userqueuersource userqueuersource () {return queuersourcefactory.createproxyqueuerSource (userqueuerSource.class); }}3. Инкапсуляция очередного ядра для отправки сообщения продюсера
Все аннотации в 1 выше (тема, тег, тело, ключ) и классы QueuerSourceFactory, используемые в 2, должны быть определены в Queue-Core. Определение аннотации только определяет правила. Реальная реализация фактически в QueuerSourceFactory.
Импорт java.lang.reflect.invocationHandler; импорт java.lang.reflect.method; импорт java.lang.reflect.proxy; импорт org.slf4j.logger; импорт org.slf4j.loggerfactory; импорт com.aliyun.openservices.api.api.message; com.aliyun.openservices.ons.api.producer; импорт com.aliyun.openservices.ons.api.sendresult; импорт com.wy.queue.core.api.mqconnection; импорт com.wy.core.core.utils.jackserializer; импорт. com.wy.queue.core.utils.quecorespringutils; открытый класс QueuerSourceFactory реализует vlocationHandler {private Static Final Logger logger = loggerFactory.getLogger (QueuerSourceFactory.class); Приватная строка TopicName; Private String ProduceerID; Частный Jacksonerializer Serializer = новый Jacksoneserializer (); Private Static Final String Prefix = "pid_"; public QueuerSourceFactory (String TopicName, String ProduceerID) {this.TopicName = topicName; this.producerid = производительность; } public static <t> t createProxyqueuerSource (class <t> clazz) {String topicName = mqutils.getTopicName (clazz); String ueverseerid = mqutils.getproducerid (clazz); T arget = (t) proxy.newproxyinstance (queeresourcefactory.class.getClassloader (), новый класс <?> [] {Clazz}, новый QueuerSourceFactory (TopicName, Produceerid)); вернуть цель; } @Override public Object invoke (Object Proxy, Method Method, Object [] args) бросает Throwable {if (args.length == 0 || args.length> 1) {Throw New Runtimeexception («Примите только один параметр в Queueresource Interface.»); } String tagname = mqutils.gettagname (method); Производительфакторный производительфактор = Queuecorespringutils.getbean (ufferencefactory.class); MQConnection ConnectionInfo = QueueCorespringutils.getbean (mqconnection.class); Производитель производитель = производительфакторию.createProducer (префикс+connectionInfo.getPrefix ()+"_"+Производство); //Send message Message msg = new Message( // // The Topic created in the console, that is, the Topic name to which the message belongs. connectionInfo.getPrefix()+"_"+topicName, // Message Tag, // It can be understood as a tag in Gmail, and the message is reclassified to facilitate Consumer to specify filtering conditions to filter tagName on the MQ server, // Message Body // Any Бинарная форма данных, MQ не мешает какой -либо, // Производитель и потребитель требуются договориться о последовательной сериализации и методе десериализации Serializer.serialize (args [0]). Getbytes ()); SendResult SendResult = Производитель.SEND (MSG); logger.info ("Отправить успех сообщения. Идентификатор сообщения:" + sendresult.getmessageid ()); вернуть ноль; }}Здесь мы специально разместили пользовательский пакет и имена пакетов, используемые третьими лицами для облегчения различия.
Что именно здесь сделано?
Процесс отправки сообщения состоит в том, чтобы создать прокси -объект на динамическом прокси. Объект будет перехвачен при вызове метода. Во -первых, анализируйте все аннотации, такие как имя TeapName, Promingerid, Tag и другая ключевая информация из аннотаций, а затем позвоните Alibaba SDK, чтобы отправить сообщение. Процесс очень прост, но обратите внимание, что при отправке сообщений здесь он разделен на среды. Вообще говоря, предприятие теперь различает три среды: QA, постановка и продукт. Среди них QA и постановка являются тестовыми средами. Для очередей сообщений также есть три кольца. В окружающей среде, однако, QA и стационарные среды часто используют одну и ту же учетную запись Alibaba, чтобы снизить затраты, поэтому созданная тема и ProductID будут размещены в той же области. Таким образом, TopicName с тем же именем не разрешено существовать, поэтому префикс среды добавляется, чтобы различать их, такие как QA_TOPICName, PID_STAGING_PRODUCERID и т. Д.; Кроме того, Queue-Core предоставляет интерфейс MQConnection для получения информации о конфигурации, а услуги производителя должны только для реализации этого интерфейса.
4. Производитель отправляет сообщения
@Autowired private userqueuersource userqueuersource; @Override public void sendmessage () {usermodel usermodel = new usermodel (); usermodel.setname ("kdyzm"); usermodel.setage (25); userqueuersource.handleuserinfo (usermodel); }Для отправки сообщения требуется всего несколько строк кода в указанную тему, которая намного тоньше, чем собственный код отправки.
4. Потребление новостей
По сравнению с отправкой сообщений потребление сообщений является более сложным.
1. Дизайн потребления сообщений
Поскольку тема и потребитель связаны с N: N, ConsumerID помещается в метод реализации потребителя.
@Controller@QueuerSourcepublic Class UserQueUerSourceImpl реализует userqueuerSource {private logger logger = loggerFactory.getLogger (this.getClass ()); @Override @consumerannotation ("kdyzm_consumer") public void harderuserinfo (usermodel user) {logger.info ("Сообщение 1 получено: {}", новый gson (). Tojson (user)); } @Override @consumerannotation ("kdyzm_consumer1") public void harderinfo1 (usermodel user) {logger.info ("Сообщение 2 получено: {}", new Gson (). Tojson (пользователь)); }}Вот две новые аннотации @QueuerSource и @Consumerannotation. Эти две аннотации будут обсуждаться в будущем. Кто -то может спросить меня, почему я должен использовать имя потребительского оборудования вместо имени потребителя, потому что имя потребителей конфликтует с именем в SDK, предоставленном Aliyun. Полем Полем Полем
Здесь потребители предоставляют интерфейс API производителям, чтобы облегчить производителям отправлять сообщения, а потребители реализуют интерфейс для потребления сообщений, отправляемых производителями. Как реализовать интерфейс API - это реализация мониторинга, которая является относительно критической логикой.
2. Queue-Core реализует основную логику очереди сообщений прослушивание
Шаг 1: Используйте метод прослушивания пружинного контейнера, чтобы получить все бобы с помощью аннотаций QueuerSource
Шаг 2: Распределите обработку бобов
Как справиться с этими бобами? Каждый фасоль на самом деле является объектом. С помощью объекта, такого как объект UserqueUerSourceImpl в приведенном выше примере, мы можем получить объект интерфейса Bytecode, реализованный объектом, а затем получить аннотации на интерфейсе userQueRounce и аннотации по методам и методам. Конечно, также могут быть получены аннотации по методу реализации пользовательского QuearuerSourceImpl. Здесь я буду использовать потребитель в качестве ключа, а оставшаяся соответствующая информация инкапсулируется как значение и кэшируется в объект карты. Основной код заключается в следующем:
Класс <?> Clazz = resourceimpl.getClass (); Класс <?> Clazzif = clazz.getInterfaces () [0]; Method [] methods = clazz.getMethods (); String topicName = mqutils.getTopicName (clazzif); Для (Метод M: Методы) {ConmerceNotation Consumeranno = M.Getannotation (ConmerceNotation.class); if (null == consumeranno) {// logger.error ("method = {} нуждается в аннотации потребителей.", m.getName ()); продолжать; } String consumerId = consumeranno.value (); if (stringutils.isempty (consuerid)) {logger.error ("method = {} ConsumerId не может быть нулевым", m.getName ()); продолжать; } Class <?> [] ParameterTypes = m.getParameterTypes (); Method resourceifmethod = null; try {resourceifmethod = clazzif.getMethod (m.getName (), parameterTypes); } catch (nosuchmethodexception | securityException e) {logger.error ("Не удается найти метод = {} в super interface = {}.", m.getName (), clazzif.getCanonicalName (), e); продолжать; } String tagname = mqutils.gettagname (resourceifmethod); ConsumersMap.put (ConsuerID, New MethodInfo (TopicName, Tagname, M)); }Шаг 3: Действия потребления через размышление
Во -первых, определите время выполнения действий отражения, то есть прослушать новые сообщения
Во -вторых, как выполнить действия отражения? Я не буду вдаваться в подробности. Детская обувь с основами, связанными с отражением, знают, как их сделать. Основной код заключается в следующем:
MQConnection ConnectionInfo = QueueCorespringutils.getbean (mqconnection.class); String topenprefix = connectionInfo.getPrefix ()+"_"; String consumerIdprefix = prefix+connectionInfo.getPrefix ()+"_"; для (String ConsumerId: ConsumersMap.KeySet ()) {MethodInfo MethodInfo = ConsumersMap.get (ConsumerID); Свойства ConnectionProperties = ConverttoProperties (ConnectionInfo); // идентификатор потребителя, который вы создали в консоли ConnectionProperties.put (PropertyKeyConst.Consumerid, ConsumerIdprefix+ConsumerID); Потребительский потребитель = onsfactory.createConsumer (ConnectionProperties); Consumer.SubScribe (TOMAPREFIX+MEDICEINFO.getTopicName (), methodInfo.getTagName (), new MessageListener () {// Подписаться на несколько тегов Public Action Souraric TOMA = {}, TAG = {}, ConsumerID = {}, Message = {} », TOMAPREFIX+MEDICEINFO.GetTopicName (), MethodInfo.getTagName (), ConsumerIdPrefix+ConsureTy, MessageBody); MethodInfo.getMethod (); класс <? Method.getParametertypes () [0]; Consumer.Start (); logger.info ("consumer = {} начался.", ConsumerIdprefix+ConsumerID); }5. См. Ссылку GIT ниже для полного кода
https://github.com/kdyzm/queue-core.git
Выше всего содержание этой статьи. Я надеюсь, что это будет полезно для каждого обучения, и я надеюсь, что все будут поддерживать Wulin.com больше.