1。AlibabaCloud公式ウェブサイト---ヘルプ文書
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
公式ウェブサイトの手順に従ってトピックを作成し、Publishing(Producer)を申請し、サブスクリプション(Consumer)を申請します
2。コード
1。構成:
public class mqconfig {/** *テストを開始する前に、次のxxxを交換してください */public static final string public_topic = "test"; // public static final string public_producer_id = "pid_scheduler"; public static final string public_consumer_id = "cid_service"; public static final string access_key = "123"; public static final string secret_key = "123"; public static final string tag = ""; public static final string thread_num = "25"; //消費者スレッドの数/*** onsaddr異なる領域に従って構成してください*パブリックネットワークテスト:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet*パブリッククラウド生産: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * hangzhou Financial Cloud:http://jbponsaddr-internal..com:8080/RocketMQ/NSADDR4CLIENT-internal http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */public static final string onsaddr = "http://onsaddr-internal.aliyun.com:8080/RocketMQ/NSADDR4CLEIENT-INTERNAL";}Onsaddr Alibaba Cloudはパブリッククラウドの生産を使用し、テストはパブリックネットワークを使用します
さまざまなサービスが異なるタグを設定できますが、メッセージボリュームが大きい場合は、新しいトピックを作成することをお勧めします。
2。プロデューサー
方法1:
構成ファイル:producer.xml
<?xml version = "1.0" encoding = "utf-8"?> <!doctype beans public " - // spring // dtd bean // en" "" http://www.springframework.org/dtd/spring-beans.dtd "> <beans =" init-method "" shut-method " name = "Properties"> <map> <entry key = "producterid" value = "" /> <! - pid、pid、> <entry key = "accesskey" value = "" /> <! - access_key - > <entry key = "secretkey" " /> <! - secret_key、> <!<! http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internetパブリッククラウド生産:http://onsaddr-internal.aliyun.com:8080/rocketmq/NSADDR4CLIENT-INTERNAL HANGZHOU FINASIONクラウド: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal shenzhen Financial Cloud:http://mq4finance-sz.addr.aliyun.com:8080/RocketMQ/NSADDR4CLINTDR4CLIENT-INTERNAL- <ENTRITENTRENAL- value = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </property> </bean> </beans>
スタートアップ方法1、クラスのグローバル設定に設定されています。
//プロデューサーのプライベートアプリケーションContext CTXを初期化します。民間生産者。 @value( "$ {producterconfig.enabled}")// switch、spring configurationアイテム、true is on、falseターンオフprivate boolean produceconfigenabled; @postconstruct public void init(){if(true == derduconfigenabled){ctx = new classpathxmlapplicationContext( "producer.xml"); Producer =(Producerbean)ctx.getBean( "Producer"); }}PS:最近ピットを発見しました。プロデューサーが上記の方法で開始された場合、より多く開始すると、FullGCを引き起こします。したがって、次の注釈方法に変更して手動で開始し、使用する場所でシャットダウンできます。
方法2:クラスを構成する(XML不要)
@configurationpublic class producerbeanconfig {@value( "$ {openservices.ons.producerbean.producerid}")private string producerid; @value( "$ {openservices.ons.producerbean.accesskey}")private string accesskey; @value( "$ {openservices.ons.producerbean.secretkey}")private string secretkey;民間の生産者生産者。 @value( "$ {openservices.ons.producerbean.onsaddr}")private string onsaddr; @Bean Public Processerbean OneProducer(){producerbean producerbean = new processerbean();プロパティプロパティ= new Properties(); Properties.SetProperty(PropertyKeyconst.Producerid、processerid); properties.setProperty(PropertyKeyconst.AccessKey、AccessKey); Properties.SetProperty(PropertyKeyconst.Secretkey、SecretKey); properties.setProperty(PropertyKeyconst.onsaddr、onsaddr); Processerbean.setProperties(プロパティ); Return Brocesserbean; }}PS:このダブル11の後、上記の2つの方法は大きなデータボリュームとマルチスレッドの状況にはあまり適しておらず、パフォーマンスが非常に低いため、3を使用することをお勧めします。
方法3 :( XMLは必要ありません)
@componentPublic Class ProducerBeansingleton {@value( "$ {openservices.ons.producerbean.producerid}")private string producelid; @value( "$ {openservices.ons.producerbean.accesskey}")private string accesskey; @value( "$ {openservices.ons.producerbean.secretkey}")private string secretkey; @value( "$ {openservices.ons.producerbean.onsaddr}")private string onsaddr;民間の静的プロデューサープロデューサー。 Private Static Class SingletonHolder {private static finalbeansingleton instance = new producerbeansingleton(); } private producerbeansingleton(){} public static finalbencerbeansingleton getInstance(){return singletonholder.instance; } @PostConstruct public void init(){//プロデューサーインスタンス構成初期化プロパティ= new Properties(); //プロデューサーID Properties.setProperty(PropertyKeyconst.ProducerID、ProducterID); // AccessKey Alibaba Cloud Authentication、Create Properties.SetProperty(PropertyKeyConst.AccessKey、AccessKey); // SecretKey Alibaba Cloud Authentication、Create Properties.setProperty(PropertyKeyconst.Secretkey、SecretKey); // SecretKey Alibaba Cloud Authentication、Create Properties.setProperty(PropertyKeyconst.Secretkey、SecretKey); //送信タイムアウト時間、Unit MilliseCond Properties.setProperty(PropertyKeyconst.sendmsgtimeoutMillis、 "3000")を設定します。 // TCPアクセスドメイン名を設定します(パブリッククラウド生産環境をここに例として参照)Properties.SetProperty(PropertyKeyconst.Onsaddr、onsaddr);プロデューサー= onsfactory.createproducer(Properties); //メッセージを送信する前に、プロデューサーを起動するためにSTARTメソッドを呼び出す必要があります。また、生産者に1回電話する必要があります。Start(); }パブリックプロデューサーgetProducer(){return producer; }}スプリング構成
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialectconsumerconfig. enabled = trueproducercerconfig.enabled = true #method 1:スケジュール= enabled = false#method 2、3:rocketmq /u516c/u7f51/u914d/u7f6eopenservices.ons.producerbean.producerid = pidopenservices.ons.producerbean.accesskey = openservices.ons.producerbean.secretkey = openservices.ons.Ons.Producerbean.secretkey = openservices.ons.producerbean.onsaddr = public network、hangzhouパブリッククラウド生産
方法1メッセージコードを配信します。
try {string jsonc = jsonutils.tojson(elevenmessage);メッセージメッセージ= newメッセージ(mqconfig.topic、mqconfig.tag、jsonc.getBytes()); sendResult sendResult = producer.send(message); if(sendResult!= null){logger.info( "。mqメッセージの成功!"を送信! ";} else {ogger.warn("。sendResult is null ...... ");}} catch(expergger.warn(" doubleElevenallpreservice "); sweep(1000);方法2配信メッセージコード:( 1000回ごとに開始/閉じることができます)
processerbean.start(); try {string jsonc = jsonutils.tojson(evelenmessage);メッセージメッセージ= newメッセージ(mqconfig.topic、mqconfig.tag、jsonc.getBytes()); sendResult sendResult = producer.send(message); if(sendResult!= null){logger.info( "。mqメッセージの成功!"を送信! ";} else {ogger.warn("。sendResult is null ...... ");}} catch(expergger.warn(" doubleElevenallpreservice "); sweep(1000); Processerbean.shutdown();方法3:メッセージを配信します
try {string jsonc = jsonutils.tojson(elevenmessage);メッセージメッセージ= newメッセージ(mqconfig.topic、mqconfig.tag、jsonc.getBytes());プロデューサー= producerbeansingleton.getInstance()。getProducer(); sendResult sendResult = producer.send(message); if(sendResult!= null){logger.info( "doubleElevenmidservice.send mq message success!topic is:";} else {ogger.warn( "doubleElevenmidservice.sendresult is null ......");}} catch(Exception e){logelevenservice( "+e.getmessage()、e); swree.sleep(1000); //例外がある場合、1秒間睡眠}メッセージを送信するコードは例外をキャッチする必要があります。そうしないと、繰り返し送信されます。
ここでのトピックは自分で作成されます。 evelenmessageは送信されるコンテンツです。私は自分で作成したオブジェクトです。
3。消費者
スタートアップクラスを構成します:
@configuration@conditionalonproperty(value = "consumerconfig.enabled"、have balue = "true"、matchifmissing = true)public class consumerconfig {private logger logger = loggerfactory.getLogger(loggerAppendertype.smsdist.name(); @Bean Public Consumer ConsumerFactory(){//異なる消費者はプロパティの名前を変更できません。 ConsumerProperties.setProperty(PropertyKeyconst.Consumerid、mqconfig.consumer_id); ConsumerProperties.setProperty(PropertyKeyconst.AccessKey、mqconfig.access_key); ConsumerProperties.setProperty(PropertyKeyconst.Secretkey、mqconfig.secret_key); //CONSUMERPROPERTIES.SETPROPERTY(PropertyKeyconst.ConsumethReadNums,MQConfig.Thread_Num); ConsumerProperties.setProperty(PropertyKeyconst.onsaddr、mqconfig.onsaddr); Consumer Consumer = onsfactory.createConsumer(ConsumerProperties); consumer.subscribe(mqconfig.topic、mqconfig.tag、new doubleElevenmessageListener()); //新しい対応するリスナーConsumer.start(); logger.info( "consumerconfig start success。");消費者を返す; }}適切なCIDとONSADDRを選択する必要があります。あなた自身の、消費者スレッドカウントなどを使用して、ここで構成できます。
メッセージリスナークラスを作成し、メッセージを消費します。
@componentPublic Class MessageListenerはMessageListener {private logger logger = loggerfactory.getLogger( "Remind");保護された静的evelenReposit evhenReposit; @Resource public void setelevenReposit(evelenReposit evhenreposit){mesageListener .ElevenReposit = ElevenReposit; } @Override public Action Consumption(message message、consumercontext consumentContext){if(message.getTopic()。equals( "own topic")){//他のメッセージの消費を避けますjson変換エラーtry {byte [] body = message.getbody();文字列res = new String(body); // resはプロデューサーによって送信されたメッセージコンテンツ//ビジネスコード} else {logger.warn( "!"); }} catch(Exception e){logger.error( "mesageListener.consume error:" + e.getmessage()、e); } logger.info( "mesageListener.receiveメッセージ"); //メッセージの再投稿の関数をテストする場合は、action.commitmessageをAction.ReconSumeLater return action.commitMessageに置き換えることができます。 } else {logger.warn(); action.reconsumelaterを返します。 }}消費者はマルチスレッドされているため、オブジェクトを静的+セットで注入する必要があるため、オブジェクトレベルをプロセスに上げて、複数のスレッドを共有できますが、親クラスのメソッドと変数を呼び出すことはできません。
消費者のステータスは、消費者が遅れているかどうか、消費速度など、消費者が正常に接続されているかどうかを確認できます。
消費サイトをリセットすると、すべてのメッセージがクリアできます
3。注意すべきこと
1.送信される最大メッセージ本文は256kbです
2。メッセージは最大3日間存在します
3.消費者側のデフォルトのスレッド数は20です
4. Javaがハングアップするか、CPUが実行中に非常に高い量を占有している場合、送信時に1,000個のメッセージの1秒ごとにスレッドを送信できます。
5.ローカルテストまたはスタートアップの場合、Onsaddrをパブリックネットワークに置き換えます。そうしないと、エラーは開始されません。
上記はこの記事のすべての内容です。みんなの学習に役立つことを願っています。誰もがwulin.comをもっとサポートすることを願っています。