1。序文
最近、同社はAlibaba Cloudメッセージキューを使用する必要があります。使用がより便利になるために、私は数日間、メッセージキューをAPI呼び出し方法にカプセル化して、内部システムの呼び出しを容易にしました。現在完了しています。ここでは、プロセスと使用される関連技術を記録し、共有します。
Alibaba Cloudは、MNSサービスとONSサービスの2つのメッセージサービスを提供しています。 MNSはONSの単純化されたバージョンであり、MNSメッセージの消費にはカスタムポーリング戦略が必要だと思います。対照的に、ONSの公開およびサブスクリプションモード関数はより強力です(たとえば、MNSと比較して、ONSはメッセージ追跡、ロギング、監視、その他の機能を提供します)、そのAPIはより便利です。また、Alibabaは将来MNSを開発しなくなるが、維持するだけでなくなると聞いています。 ONSサービスは、MNSサービスを徐々に置き換え、Alibabaのメッセージサービスの主な製品になります。したがって、メッセージキューを使用する必要がある場合は、MNSを再度使用しないことをお勧めします。 ONSを使用することが最良の選択です。
関係するテクニック:春、反射、ダイナミックプロキシ、ジャクソンのシリアル化、脱派化
次の記事を読む前に、上記のドキュメントを読んで、関連する概念(トピック、消費者、プロデューサー、タグなど)と、ドキュメントで提供されているコードの実装を簡単に送信および受信することを理解する必要があります。
このブログ投稿は、メッセージキューに知識ベースを持っている友人のみです。私は当然、みんなを助けることができてとてもうれしいです。あなたの道が間違っていることを意味するので、それを理解していない人をoldっないでください。
2。設計計画
1。メッセージ送信
シンプルなCSSアーキテクチャでは、サーバーがトピックプロデューサーから送信されたメッセージを聞いていると仮定すると、最初にクライアントAPIを提供する必要があります。クライアントは、単にAPIを呼び出すだけで、プロデューサーを介してメッセージを作成できます。
2。メッセージ受信
APIはサーバーによって策定されているため、サーバーはもちろんこれらのメッセージを消費する方法も知っています。
このプロセスでは、サーバーは実際に消費者の役割を果たし、クライアントは実際に生産者の役割を果たしますが、生産者がメッセージを生成するためのルールは、消費者の消費ニーズを満たすために消費者によって策定されます。
3。究極の目標
依存関係の特定の実装を提供するために、キューコアという名前の別のJARパッケージを作成し、生産者と消費者にサブスクリプションを公開したいと考えています。
3。メッセージ送信
1.消費者はインターフェイスを提供します
@topic(name = "kdyzm"、producterid = "kdyzm_producer")public interface userqueueresource {@tag( "test1")public void handleuserinfo(@body @key( "userinfohandler")usermodel user); @tag( "test2")public void handleuserinfo1(@body @key( "userinfohandler1")usermodel user);}トピックとプロデューサーはn:1関係にあるため、生産物はトピックのプロパティとして直接使用されます。タグは非常に重要なフィルタリング条件であり、消費者はそれを使用してメッセージを分類して異なるビジネス処理を実行するため、タグはここでルーティング条件として使用されます。
2.プロデューサーは、消費者が提供するAPIを使用してメッセージを送信します
消費者は生産者が使用するためのインターフェイスのみを提供するため、インターフェイスをインスタンス化する方法がないため、インターフェイスを直接使用する方法はありません。ここでは、動的プロキシを使用してオブジェクトを生成します。消費者が提供するAPIで、次の構成を追加して、プロデューサーを容易にして構成を直接インポートして使用します。ここでは、Javaに基づいたSpring Configを使用します。知ってください。
@configurationpublic class queueconfig {@autowired @bean public userqueresource userqueueresource(){return queueresourcefactory.createproxyqueueresource(userqueueresource.class); }}3。プロデューサーのメッセージ送信のためのキューコアのカプセル化
上記の1のすべての注釈(トピック、タグ、ボディ、キー)および2で使用されるQueueresourceFactoryクラスは、キューコアで定義する必要があります。注釈の定義は、ルールのみを定義します。実際の実装は、実際にはQueureSourceFactoryにあります。
Import java.lang.reft.invocationHandler; Import Java.lang.Reflect.Method; Import java.lang.relect.proxy; Import org.slf4j.logger; Import org.slf4j.loggeractory; com.aliyun.onservices.ons.ans.api.message; com.aliyun.openservices.ons.api.producer; import com.aliyun.openservices.ons.api.sendresult; import com.wy.queue.core.api.mqconnection; Import com.wy.wy.queue.core.utils.utils.jacksonserializer; import com.wy.key.core.core.utils.utils.utils.utils.utils com.wy.queue.core.utils.queecorespringutils; public class queueresourceFactory Impocationhandler {private static final logger logger = obgeractory.getLogger(queueresourceFactory.class);プライベート文字列トピック名; Private String ProduceRid; private jacksonserializer serializer = new Jacksonserializer(); private static final string prefix = "pid_"; public queueresourceFactory(string topicname、string producterid){this.topicname = topicname; this.producerid = producterid; } public static <t> t createproxyqueueresource(class <t> clazz){string topicname = mqutils.getopicName(clazz); string producerid = mqutils.getProducerid(clazz); tターゲット=(t)proxy.newproxyinstance(queueresourceFactory.class.getClassLoader()、new class <?ターゲットを返します。 } @OverrideパブリックオブジェクトInvoke(Object Proxy、Method Method、Object [] Args)Throws Throwable {if(args.length == 0 || args.length> 1){throw new runtimeexception( "queueresourceインターフェイスで1つのparamのみを受け入れるだけです。"); } string tagname = mqutils.getTagname(method); ProducterFactory ProducterFactory = QueueCoresPringutils.getBean(producterfactory.class); mqconnection connectioninfo = queecorespringutils.getbean(mqconnection.class);プロデューサー= producterfactory.createproducer(prefix+connectioninfo.getPrefix()+"_"+producterid); //メッセージを送信メッセージmsg = newメッセージ(// //コンソールで作成されたトピック、つまり、メッセージが属するトピック名。ConnectionInfo.getPrefix()+"_"+TopicName、//メッセージタグ、// Gmailのタグとして理解できます。データの形式、MQはいずれにも干渉しません。 sendResult sendResult = producer.send(msg); logger.info( "メッセージの成功を送信します。メッセージIDは:" + sendResult.getMessageId()); nullを返します。 }}ここでは、区別を促進するためにサードパーティが使用するカスタムパッケージとパッケージ名を特別に投稿しました。
ここで正確に何が行われていますか?
メッセージを送信するプロセスは、動的プロキシにプロキシオブジェクトを作成することです。メソッドを呼び出すときにオブジェクトは傍受されます。まず、トピック名、プロデュース、タグ、その他の重要な情報などのすべての注釈を解析し、Alibaba SDKに電話してメッセージを送信します。プロセスは非常に単純ですが、ここでメッセージを送信するときは環境に分割されることに注意してください。一般的に、企業は現在、QA、ステージング、製品の3つの環境を区別しています。その中でも、QAとステージングはテスト環境です。メッセージキューには、3つのリングもあります。ただし、環境では、QAおよびステージング環境は、同じAlibabaアカウントを使用してコストを削減することが多いため、作成されたトピックとProductIDが同じエリアに配置されます。このようにして、同じ名前のトピック名は存在することを許可されていないため、QA_TopicName、PID_STAGING_PRODUCERIDなど、それらを区別するために環境プレフィックスが追加されます。さらに、Queue-CoreはMQConnectionインターフェイスを提供して構成情報を取得し、プロデューサーサービスはこのインターフェイスのみを実装する必要があります。
4.プロデューサーはメッセージを送信します
@autowired private userqueueresource userqueueresource; @Override public void sendmessage(){usermodel usermodel = new usermodel(); usermodel.setname( "kdyzm"); usermodel.setage(25); userqueueresource.handleuserinfo(usermodel); }指定されたトピックにメッセージを送信するには、数行のコードが必要です。これは、ネイティブ送信コードよりもはるかに薄いです。
4。ニュース消費
メッセージの送信と比較して、メッセージの消費はより複雑です。
1。メッセージ消費設計
トピックと消費者はn:n関係であるため、消費者は消費者の特定の実装方法に配置されます
@controller@queueresourcepublic class userqueueresourceimpl explments userqueueresource {private logger logger = loggerfactory.getLogger(this.getClass()); @override @consumerannotation( "kdyzm_consumer")public void handleuserinfo(usermodel user){ogger.info( "メッセージ1がreceed:{}"、new gson()。tojson(user)); } @override @consumerannotation( "kdyzm_consumer1")public void handleuserinfo1(usermodel user){gger.info( "メッセージ2がreceed:{}"、new gson()。tojson(user)); }}ここに2つの新しいアノテーション@queueresourceと@consumerannotationがあります。これらの2つの注釈は、将来議論されます。名前がAliyunが提供するSDKの名前と名前が競合するため、誰かが名前の消費者の代わりにその名前のConsumerAnnotationを使用する必要がある理由を尋ねるかもしれません。 。 。 。
ここで、消費者はプロデューサーにAPIインターフェイスを提供して、プロデューサーがメッセージを送信するように促進し、消費者はプロデューサーから送信されたメッセージを消費するインターフェイスを実装します。 APIインターフェイスを実装する方法は、比較的重要なロジックである監視を実装することです。
2.キューコアは、メッセージキューリスニングのコアロジックを実装します
ステップ1:スプリングコンテナのリスニング方法を使用して、Queueresourceアノテーションを備えたすべての豆を入手します
ステップ2:加工豆を配布します
これらの豆を扱う方法は?各豆は実際にはオブジェクトです。上記の例では、userqueresourceimplオブジェクトなどのオブジェクトを使用すると、オブジェクトによって実装されたインターフェイスバイトコードオブジェクトを取得し、インターフェイスユーザーqueuerourseceとメソッドとメソッドのアノテーションでアノテーションを取得できます。もちろん、userqueueresourceimplの実装方法に関する注釈も取得できます。ここでは、ConsumerIDをキーとして使用します。残りの関連情報は、値としてカプセル化され、マップオブジェクトにキャッシュされます。コアコードは次のとおりです。
クラス<?> clazz = resourceImpl.getClass(); class <?> clazzif = clazz.getInterfaces()[0];方法[] methods = clazz.getMethods(); string topicname = mqutils.getTopicName(clazzif); for(メソッドM:メソッド){ConsumerAnnotation Consumeranno = m.getAnnotation(ConsumerAnnotation.class); if(null == consumeranno){// logger.error( "method = {}消費者アノテーションが必要です。"、m.getname());続く; } string consumerid = consumeranno.value(); if(stringutils.isempty(consuerid)){logger.error( "method = {} consumerid be null"、m.getname());続く; } class <?> [] parametertypes = m.getParametertypes();メソッドresourceifmethod = null; try {resourceifmethod = clazzif.getMethod(m.getName()、parametertypes); } catch(nosuchmethodexception | securityexception e){logger.error( "super interface = {}でmethod = {}を見つけることができない。続く; } string tagname = mqutils.getTagname(resourceifmethod); consumersmap.put(consuerid、new MethodInfo(Topicname、Tagname、M)); }ステップ3:反射による消費アクション
まず、反射アクション実行のタイミングを決定します。つまり、新しいメッセージを聞きます
第二に、反射アクションを実行する方法は?詳細は説明しません。反射関連の基礎を備えた子供用の靴は、それらの作り方を知っています。コアコードは次のとおりです。
mqconnection connectioninfo = queecorespringutils.getbean(mqconnection.class); string topicprefix = connectionInfo.getPrefix()+"_";文字列consumeridprefix = prefix+connectioninfo.getPrefix()+"_"; for(string consumerid:consumersmap.keyset()){methodinfo methodinfo = consumersmap.get(consumerid);プロパティConnectionProperties = convertToproperties(ConnectionInfo); //コンソールConnectionProperties.putで作成した消費者ID(PropertyKeyconst.Consumerid、ConsumerIdPrefix+ConsumerID); Consumer Consumer = onsfactory.createConsumer(connectionProperties); Consumer.subscribe(topicprefix+methodinfo.getTopicName()、MethodInfo.getTagname()、new MessageListener(){//複数のタグパブリックアクション消費量(メッセージメッセージ、consumercontextコンテキスト){try {string messagebody = new String(message.info( ")、" utf-8 ");" "utf-8");トピック= {}、tag = {}、consumerid = {}、message = {} "、topicprefix+methodinfo.getTopicName()、methodinfo.getTagname()、consumeridprefix+consumerid、messagebody); method method = methodinfo.getMethod(方法。 Consumer.start(); logger.info( "Consumer = {}が開始されました。"、ConsumerIdPrefix+ConsumerID); }5.完全なコードについては、以下のgitリンクを参照してください
https://github.com/kdyzm/queue-core.git
上記はこの記事のすべての内容です。みんなの学習に役立つことを願っています。誰もがwulin.comをもっとサポートすることを願っています。