Disruptorをあまり紹介しませんし、現在のビジネスシナリオについて説明します。 2つのアプリケーションa、b、およびアプリケーションBへのパスデータ。データ送信は比較的高速です。 HTTPを使用してデータを直接プッシュしてからデータベースを入力すると、効率は高くありません。 Aのアプリケーションに大きな圧力につながる可能性があります。 MQの使用は重すぎるため、破壊者が選択されます。リアクターを使用することもできます
basequeuehelper.java
/*** lmax.disurdur効率的なキュー処理テンプレート。初期キュー、つまりinit()の前に公開することをサポートします。 * * init()が呼び出され、処理を開始すると、スレッドは実際に開始されます。システムは出発し、リソースを自動的にクリーンアップします。 * * @author xielongwang * @create 2018-01-18 3:49 pm * @email [email protected] * @description */public Abstract class basekeuehelper <d、e extends valuewrapper <d>、hはワークハンドラー<e> List <BaseQueueHelper> QueueHelperList = new ArrayList <BaseQueueHelper>(); / *** Disruptor Object*/ Private Disruptor <e> Desruptor; / ** * ringbuffer */ private ringbuffer <e> ringbuffer; / ** * initqueue */ private list <d> initqueue = new ArrayList <d>(); / ***キューサイズ** @returnキューの長さ、2*/保護された抽象int getqueuesize()のパワーでなければなりません。 / ** * event Factory * * @return eventFactory */ protected abstract eventFactory <e> eventFactory(); / ** *イベント消費者 * * @return workhandler [] */ protected abstract workhandler [] gethandler(); / ***初期化*/ public void init(){threadfactory namedThreadFactory = new SthreadFactoryBuilder()。setNameFormat( "DesruptOrthreadPool")。build(); scruptor = new Desruptor <e>(eventFactory()、getqueuesize()、namedthreadFactory、decutertype.single、getStrategy()); durruptor.setDefaultExceptionHandler(new myhandlerexception()); druptor.handleeventswithworkerpool(gethandler()); ringbuffer = darruptor.start(); //(dデータ:initqueue){ringbuffer.publishevent(new eventTranslatorOnearg <e、d>(){@override public void translateto(e event、long sequence、d data){event.setValue(data);}}、data); } //リソースクリーニングフック同期(queuehelperlist){if(queuehelperlist.isempty()){runtime.getRuntime()。addshutdownhook(new Sthread(){@Override public void run(){for(basequeuehelper baseuehelper:queuehellist() }}}); } queuehelperlist.add(this); }} /***スレッド実行の優先順位を変更する場合は、このポリシーをオーバーライドします。 WaitStrategyを獲得すると、応答が増加し、アイドル時にCPUの70%以上を占有します。 * SleepingWaitStrategyを使用して注意を払って応答を減らし、CPU使用量を削減し、ログやその他のシナリオで使用されます。 * * @return waitstrategy */ protected abstract waitstrategy getStrategy(); /***キューメッセージを挿入し、オブジェクトinitの前にキューの挿入をサポートし、キューが確立されるとすぐにキューに公開されます。 */ public同期void publishevent(d data){if(ringbuffer == null){initqueue.add(data);戻る; } ringbuffer.publishevent(new eventTranslatorOnearg <e、d>(){@Override public void transteto(e event、long sequence、d data){event.setValue(data);}}、data); } / *** close queue* / public void shutdown(){dasraptor.shutdown(); }}
eventFactory.java
/** * @author xielongwang * @create 2018-01-18 6:24 pm * @email [email protected] * @description */public class eventactoryはcom.lmax.disuptor.eventfactory <seriesdataevent> {@override public series dataevent new-enveveveveveveveveveveveveveventance() }}
myhandlerexception.java
パブリッククラスmyhandlerexceptionを実装した例外handler {private logger logger = loggerfactory.getLogger(myhandlerexception.class); / * *(nonjavadoc)操作中に発生する例外 * * * @see * com.lmax.discuptor.exceptionhandler#handleeventexception(java.lang.rowable *、long、java.lang.object) logger.error( "プロセスデータエラーSequence == [{}] event == [{}]、ex == [{}]"、sequence、event.toString()、ex.getMessage()); } / * *(nonjavadoc)スタートアップでの例外 * * @see * com.lmax.discuptor.exceptionhandler#handleonstartexception(java.lang。 * throwable) * / @Override public void handleOnstartexception(投げ可能なex){logger.error( "asruptor error = = [{}]; } / * *(nonjavadoc)閉じた場合の例外 * * * @see * com.lmax.discturtor.exceptionhandler#handleonshutdownexception(java.lang * .throwable) * / @Override publiconshutdownexception(投げ可能なex){logger.error( "shut disrutrurtor erry = = = = [}] }}seriesdata.java(アプリケーションaによってアプリBに送信されたメッセージを表します)
public class seriesdata {private string deviceinfostr; public seriesdata(){} public seriesdata(string deviceinfostr){this.deviceInfostr = deviceInfostr; } public String getDeviceInfoStr(){return deviceInfoStr; } public void setDeviceInfostr(String deviceInfostr){this.deviceInfostr = deviceInfostr; } @Override public String toString(){return "seriesData {" + "deviceInfoStr = '" + deviceInfoStr +'/'' + '}'; }}seriesdataevent.java
Public Class SeriesDataeventはValueWrapper <seriesData> {}を拡張しますseriesdataeventhandler.java
パブリッククラスシリーズDataEventHandlerはWorkHandler <seriesDataevent> {private logger logger = loggerfactory.getLogger(seriesdataeventhandler.class); @autowired private deviceinfoservice deviceinfoservice; @Override public void onevent(seriesdataevent event){if(event.getValue()== null || stringutils.isempty(event.getValue()。getDeviceInfostr())){logger.warn( "受信シリーズデータは空!"); } // Business Processing DeviceInfoservice.ProcessData(event.getValue()。getDeviceInfoStr()); }}seriesdataeventqueuehelper.java
@componentPublic classシリーズDataEventQueueHelper拡張basequequeuehelper <seriesdataevent、seriesdataeventhandler> Intializingbean {private static final int queue_size = 1024; @Autowiredプライベートリスト<SeriesDataEventHandler>シリーズDataeventHandler; @Override Protected int getqueUesize(){return queue_size; } @Override Protected com.lmax.disurtor.eventFactory eventFactory(){return new eventFactory(); } @Override Protected WorkHandler [] gethandler(){int size = seriesdataeventhandler.size(); seriesdataeventhandler [] parameventhandlers =(seriesdataeventhandler [])seriesdataeventhandler.toarray(new seriesdataeventhandler [size]); ParameVenthandlersを返します。 } @override Protected waitstrategy getStrategy(){return new BlockockingWaitStrategy(); // new ricewingwaitstrategy();を返します。 } @Override public void afterpropertiesset()スロー例外{this.init(); }}ValueWrapper.java
パブリックアブストラクトクラスValueWrapper <t> {private t value; public ValueWrapper(){} public ValueWrapper(t value){this.value = value; } public t getValue(){return値; } public void setValue(t value){this.value = value; }}sruptororconfig.java
@configuration @componentscan(value = {"com.portal.disuptor"})//複数の消費者の複数のインスタンスpublic class dasruptorconfig {/ ** * smsparameventhandler1 * * @return dataeventhandler */ @bean public seriesdataeventhandler smsparameventler1 } / ** * smsparameventhandler2 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler2(){return new seriesdataeventhandler(); } / ** * smsparameventhandler3 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler3(){return new seriesdataeventhandler(); } / ** * smsparameventhandler4 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler4(){return new seriesdataeventhandler(); } / ** * smsparameventhandler5 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler5(){return new seriesdataeventhandler(); }}テスト
// Inject seriesdataeventqueuehelperメッセージプロデューサー@autowired private seriesdataeventqueuehelper seriesdataeventqueuehelper; @RequestMapping(value = "/data"、method = requestmethod.post、produces = mediatype.application_json_value)public dataresponsevo <string> receiverdevicedata(@requestbody string devicedata){long starttime1 = system.currenttimemillis(); if(stringutils.isempty(devicedata)){logger.info( "受信者データは空です!");新しいdataresponsevo <string>(400、 "failed")を返します。 } seriesdataeventqueuehelper.publishevent(new seriesdata(devicedata)); long starttime2 = system.currenttimemillis(); logger.info( "受信者data == [{}] millionsecond == [{}]"、devicedata、starttime2 -starttime1);新しいdataresponsevo <string>(200、 "success")を返します。 }アプリケーションAは、 /データインターフェイスを介してアプリケーションBにデータを送信し、シリーズDataEventQueueHelperを介してDisruptorキューにメッセージを送信します。消費者はそれを消費します。プロセス全体がアプリケーションAをブロックしません。メッセージは失われます。シリーズDataeventqueuehelperを拡張して、Disruptorキューを監視できます。
上記はこの記事のすべての内容です。みんなの学習に役立つことを願っています。誰もがwulin.comをもっとサポートすることを願っています。