1. 서문
최근 에이 회사는 Alibaba Cloud 메시지 대기열을 사용해야합니다. 더 편리하게 사용하기 위해 내부 시스템의 호출을 용이하게하기 위해 메시지 큐를 API 호출 방법으로 캡슐화하는 데 며칠을 보냈습니다. 지금 완료되었습니다. 여기서 우리는 프로세스와 관련 기술을 기록하고 귀하와 공유합니다.
Alibaba Cloud는 이제 MNS 서비스와 ONS 서비스의 두 가지 메시지 서비스를 제공합니다. MNS는 단순화 된 버전의 ONS라고 생각하며 MNS 메시지 소비에는 맞춤형 폴링 전략이 필요합니다. 대조적으로, ONS의 게시 및 구독 모드 기능은 더 강력합니다 (예 : MNS와 비교하여 ONS는 메시지 추적, 로깅, 모니터링 및 기타 기능을 제공하고 API가 더 편리합니다. 또한 알리바바는 더 이상 미래에 MN을 개발하지 않고 유지할 것이라고 들었습니다. ONS 서비스는 점차 MNS 서비스를 대체하고 Alibaba의 메시지 서비스의 주요 제품이됩니다. 따라서 메시지 대기열을 사용해야하는 경우 MNS를 다시 사용하지 않는 것이 좋습니다. ONS를 사용하는 것이 최선의 선택입니다.
관련된 기술 : 봄, 반사, 동적 대리, 잭슨 직렬화 및 사막화
다음 기사를 읽기 전에 관련 개념 (주제, 소비자, 생산자, 태그 등)과 문서에 제공된 간단한 전송 및 수신 코드 구현을 이해하려면 위의 문서를 읽어야합니다.
이 블로그 게시물은 메시지 대기열에 대한 지식 기반이있는 친구들을위한 것입니다. 나는 당연히 모든 사람을 돕게되어 매우 기쁩니다. 당신의 길이 잘못되었음을 의미하기 때문에 이해하지 못하는 사람을 꾸짖지 마십시오.
2. 디자인 계획
1. 메시지 보내기
간단한 CSS 아키텍처에서 서버가 주제 생산자가 보낸 메시지를 듣게 될 것이라고 가정하면 먼저 클라이언트 API를 제공해야합니다. 클라이언트는 단순히 API를 호출하기 만하면 생산자를 통해 메시지를 생성 할 수 있습니다.
2. 메시지 수신
API는 서버에 의해 공식화되므로 서버는 이러한 메시지를 소비하는 방법도 알고 있습니다.
이 프로세스에서 서버는 실제로 소비자의 역할을 수행하며 고객은 실제로 생산자의 역할을 수행하지만, 제작자가 메시지를 생산하는 규칙은 소비자 소비 요구를 충족시키기 위해 소비자가 공식화합니다.
3. 궁극적 인 목표
우리는 Queue-Core라는 별도의 JAR 패키지를 만들어 의존성의 특정 구현을 제공하고 생산자 및 소비자를위한 구독을 게시하려고합니다.
3. 메시지 보내기
1. 소비자는 인터페이스를 제공합니다
@Topic (이름 = "kdyzm", producerId = "kdyzm_producer") public 인터페이스 userqueUeresource {@tag ( "test1") public void handsUserInfo (@body @key ( "userInfoHandler") usermodel user); @tag ( "test2") public void handleUserInfo1 (@body @key ( "userInfoHandler1") usermodel user);}주제와 생산자는 N : 1 관계에 있기 때문에 ProducerId는 주제의 속성으로 직접 사용됩니다. TAG는 매우 중요한 필터링 조건이며 소비자는이를 사용하여 메시지를 다른 비즈니스 처리를 수행하기 위해 메시지를 분류하므로 TAG는 여기에서 라우팅 조건으로 사용됩니다.
2. 생산자는 소비자가 제공 한 API를 사용하여 메시지를 보냅니다.
소비자는 생산자가 사용할 수있는 인터페이스 만 제공하기 때문에 인터페이스를 인스턴스화 할 방법이 없기 때문에 직접 인터페이스를 사용할 방법이 없습니다. 여기서는 동적 프록시를 사용하여 객체를 생성합니다. 소비자가 제공 한 API에서 다음 구성을 추가하여 생산자가 구성을 직접 가져와 구성하여 사용하도록 용이하게합니다. 여기서는 Java를 기반으로 Spring 구성을 사용합니다. 알아주세요.
@ConfigurationPublic Class QueueConfig {@autowired @bean public userqueueresource userqueUeresource () {return queueresourcefactory.createProxyqueUeresource (userqueUeresource.class); }}3. 생산자의 메시지 보내기를위한 대기열 코어의 캡슐화
위의 1의 모든 주석 (주제, 태그, 본문, 키) 및 2에 사용 된 QueueresourceFactory 클래스는 큐 코어로 정의되어야합니다. 주석의 정의는 규칙 만 정의합니다. 실제 구현은 실제로 QueueresourceFactory에 있습니다.
import java.lang.reflect.invocationHandler; import java.lang.reflect.method; import java.lang.reflect.proxy; import org.slf4j.logger; import org.slf4j.loggeraptory; import com.aliyun.openservices.ons.ons.mssage; import com.aliyun.openservices.ons.api.producer; import com.aliyun.openservices.ons.sendresult; import com.wy.queue.core.api.mqconnection; import com.wy.quey.ceue.core.core.jacksonserializer; import com.wy.queue.core.util.mqutils; com.wy.queue.core.utils.queecorespringutils; public class queueresourcefactory empoctionhandler {private static final logger = loggerfactory.getlogger (queueresourcefactory.class); 개인 문자열 주제 이름; 개인 문자열 생산자; Private Jacksonserializer Serializer = New JackSonserializer (); 개인 정적 최종 문자열 접두사 = "PID_"; public queueresourcefactory (String topicName, String ProducerId) {this.topicName = topicName; this.producerid = producerid; } public static <t> t CreateProxyqueUeresource (class <t> clazz) {String topicname = mqutils.getTopicName (Clazz); 문자열 producerId = mqutils.getProduceRid (Clazz); t target = (t) proxy.newproxyInstance (queueresourcefactory.class.getClassLoader (), new Class <?> [] {Clazz}, 새로운 queueresourceFactory (TopicName, ProducerId)); 반환 대상; } @Override public object invoke (오브젝트 프록시, 메소드 메소드, 개체 [] args) 던지기 가능 {if (args.length == 0 || args.length> 1) {throw new runtimeexception ( "퀘이어 에스 소스 인터페이스에서 하나의 매개 변수 만 허용"); } 문자열 tagname = mqutils.getTagName (메소드); ProducerFactory ProducerFactory = Queuecorespringutils.getBean (ProducerFactory.class); mqconnection connectionInfo = queuecorespringutils.getBean (mqConnection.class); Producer Producer = ProducerFactory.createProducer (Prefix+ConnectionInfo.getPrefix ()+"_"+ProducerId); // 메시지 메시지 메시지 메시지 MSG = 새 메시지 (// // 콘솔에서 생성 된 주제, 즉 메시지가 속한 주제 이름. ConnectionInfo.getPrefix ()+"_"+TopicName, // 메시지 태그, // Gmail에서 태그로 이해되도록 리포팅 될 수 있습니다. 이진 형태의 데이터, MQ는 // 생산자와 소비자가 필요하지 않습니다. 일관된 직렬화 및 사제화 방법 Serializer.serialize (args [0]). getBytes ()); sendresult sendresult = producer.send (msg); logger.info ( "메시지 보내기 성공. 메시지 id는 다음과 같습니다." + sendresult.getmessageId ()); 널 리턴; }}여기에 우리는 특별히 맞춤형 패키지와 제 3자가 구별을 용이하게하기 위해 사용한 패키지 이름을 게시했습니다.
여기서 정확히 무엇을 수행합니까?
메시지를 보내는 과정은 동적 프록시에서 프록시 객체를 만드는 것입니다. 메소드를 호출 할 때 객체가 가로 채 웁니다. 먼저 주석의 TopicName, ProduceRid, 태그 및 기타 주요 정보와 같은 모든 주석을 구문 분석 한 다음 Alibaba SDK에 전화하여 메시지를 보냅니다. 프로세스는 매우 간단하지만 여기에서 메시지를 보낼 때 환경으로 나뉩니다. 일반적으로 기업은 이제 QA, 준비 및 제품의 세 가지 환경을 구별합니다. 그중에서도 QA와 준비는 테스트 환경입니다. 메시지 대기열의 경우 3 개의 고리도 있습니다. 그러나 환경에서 QA 및 준비 환경은 종종 동일한 Alibaba 계정을 사용하여 비용을 줄이므로 생성 된 주제와 ProductID는 동일한 영역에 배치됩니다. 이런 식으로, 동일한 이름을 가진 TopicName은 존재할 수 없으므로 환경 접두사가 추가되어 qa_topicName, pid_staging_produceRid 등과 같은 구별됩니다. 또한 큐 코어는 구성 정보를 얻기 위해 MQConnection 인터페이스를 제공하며 생산자 서비스는이 인터페이스 만 구현하면됩니다.
4. 프로듀서는 메시지를 보냅니다
@autowired private userqueUeresource userqueUeresource; @override public void sendmessage () {usermodel usermodel = new usermodel (); usermodel.setName ( "kdyzm"); usermodel.setage (25); userqueueresource.handleuserinfo (usermodel); }지정된 주제로 메시지를 보내려면 몇 줄의 코드 만 필요합니다.이 주제는 기본 전송 코드보다 훨씬 얇습니다.
4. 뉴스 소비
메시지 전송과 비교할 때 메시지 소비가 더 복잡합니다.
1. 메시지 소비 설계
주제와 소비자는 N : N 관계이므로 소비자는 소비자의 특정 구현 방법에 배치됩니다.
@controller@queueresourcepublic 클래스 userqueueresourceimpl은 userqueueresource {private logger = loggerfactory.getLogger (this.getClass ()); @override @consumerAnnotation ( "kdyzm_consumer") public void handsuserinfo (usermodel user) {logger.info ( "메시지 1 수신 : {}", new Gson (). Tojson (사용자)); } @override @consumerAnnotation ( "kdyzm_consumer1")) public void handsuserinfo1 (usermodel user) {logger.info ( "메시지 수신 : {}", new Gson (). tojson (사용자)); }}다음은 두 가지 새로운 주석 @QueUeresource와 @ConsumerAnnotation입니다. 이 두 가지 주석은 향후 논의 될 것입니다. 소비자라는 이름이 Aliyun이 제공 한 SDK의 이름과 충돌하기 때문에 누군가가 소비자라는 이름을 사용해야하는 이유를 물어볼 수 있습니다. . . .
여기서 소비자는 생산자가 메시지를 보내도록 촉진하기 위해 생산자에게 API 인터페이스를 제공하고 소비자는 프로듀서가 보낸 메시지를 소비하기 위해 인터페이스를 구현합니다. API 인터페이스를 구현하는 방법은 비교적 중요한 논리 인 모니터링을 구현하는 것입니다.
2. queue-core는 메시지 큐 청취의 핵심 논리를 구현합니다
1 단계 : 스프링 컨테이너의 청취 방법을 사용하여 Queueresource 주석이있는 모든 콩을 얻습니다.
2 단계 : 가공 콩을 배포하십시오
이 콩을 다루는 방법? 각 콩은 실제로 물체입니다. 위의 예에서 userqueUeresourceimpl 객체와 같은 객체를 사용하면 객체에서 구현 한 인터페이스 바이트 코드 객체를 얻은 다음 interface userqueUerouserce 및 메소드 및 메소드에 대한 주석에 주석을 얻을 수 있습니다. 물론, userqueueresourceimpl의 구현 방법에 대한 주석도 얻을 수 있습니다. 여기서는 소비자를 키로 사용하고 나머지 관련 정보는 값으로 캡슐화되어지도 객체로 캐싱됩니다. 핵심 코드는 다음과 같습니다.
클래스 <?> clazz = resourceimpl.getClass (); 클래스 <?> clazzif = clazz.getInterfaces () [0]; 메소드 [] 메소드 = clazz.getMethods (); 문자열 topicname = mqutils.getTopicName (clazzif); for (method m : methods) {ConsumerAnnotation consumeranno = m.getAnnotation (consumerAnnotation.class); if (null == consumeranno) {// logger.error ( "method = {} 소비자 주석이 필요합니다.", M.getName ()); 계속하다; } 문자열 consumerid = consumeranno.value (); if (stringUtils.isempty (consuerid)) {logger.error ( "method = {} consumerid는 null이 될 수 없습니다", m.getName ()); 계속하다; } class <?> [] parameterTypes = M.GetParameterTypes (); 메소드 resourceifmethod = null; try {resourceifmethod = clazzif.getMethod (m.getName (), parameterTypes); } catch (nosuchmethodexception | SecurityException e) {logger.error ( "수퍼 인터페이스에서 메소드 = {}를 찾을 수 없음 = {}.", m.getName (), clazzif.getCanonicalName (), e); 계속하다; } 문자열 tagname = mqutils.getTagName (ResourceIfMethod); ConsumersMap.put (consuerid, new MethodInfo (topicName, tagname, m)); }3 단계 : 반사를 통한 소비 동작
먼저, 반사 조치 실행의 타이밍, 즉 새 메시지를 듣습니다.
둘째, 반사 행동을 수행하는 방법은 무엇입니까? 나는 세부 사항으로 들어 가지 않을 것입니다. 반사 관련 기초가있는 어린이 신발은 그들을 만드는 방법을 알고 있습니다. 핵심 코드는 다음과 같습니다.
mqconnection connectionInfo = queuecorespringutils.getBean (mqConnection.class); 문자열 topicprefix = connectionInfo.getPrefix ()+"_"; 문자열 consumeridprefix = prefix+connectionInfo.getPrefix ()+"_"; for (string consumerid : consumersmap.keyset ()) {methodInfo methodInfo = consumerSmap.get (consumerid); 속성 ConnectionProperties = ConnectToProperties (ConnectionInfo); // 콘솔 ConnectionProperties.put에서 만든 소비자 ID (PropertyKeyConst.ConsumerId, ConsumerIdPrefix+ConsumerId); 소비자 소비자 = onsfactory.createConsumer (ConnectionProperties); 소비자.subscribe (topicprefix+methodinfo.getTopicName (), methodInfo.getTagName (), new MessAgelistener () {// 여러 태그 공개 행동 소비 (메시지 메시지, 소비자 텍스트 컨텍스트) {string atmessebody = new String (worth.gger.info.info. topic = {{}, tag = {}, consumerid = {}, message = {} ", topicprefix+methodinfo.getTopicName (), methodInfo.getTagName (), consumerIdPrefix+소비자, methodInfo.getMethod (); classtype = getParameterTypes () [0]; 소비자 .Start (); logger.info ( "소비자 = {}가 시작되었습니다.", consumeridprefix+consumerid); }5. 전체 코드는 아래의 git 링크를 참조하십시오.
https://github.com/kdyzm/queue-core.git
위는이 기사의 모든 내용입니다. 모든 사람의 학습에 도움이되기를 바랍니다. 모든 사람이 wulin.com을 더 지원하기를 바랍니다.