لن أقدم الكثير من العطل ، وأصف سيناريو العمل الحالي. التطبيقان A و B و A PASS إلى التطبيق B. نقل البيانات سريع نسبيا. إذا كنت تستخدم HTTP لدفع البيانات مباشرة ثم إدخال قاعدة البيانات ، فإن الكفاءة ليست عالية. قد يؤدي ذلك إلى زيادة الضغط على تطبيق A. استخدام MQ ثقيل للغاية ، لذلك يتم تحديد Disruptor. يمكنك أيضًا استخدام المفاعل
basequeuehelper.java
/*** قالب معالجة قائمة الانتظار الفعال. يدعم قوائم الانتظار الأولية ، أي النشر قبل init (). * * يتم تشغيل الخيط فعليًا عند استدعاء init () وبدأ معالجته. يخرج النظام ويقوم بتنظيف الموارد تلقائيًا. * * Author xielongwang * create 2018-01-18 3:49 PM * eMail [email protected] * description */public agruss class class baseuehelper <d ، e يمتد extits extracts <d> ، H. قائمة <CaseQueHeLper> QueUeHelperList = new ArrayList <BaseQueueHelper> () ؛ / *** كائن disruptor*/ disruptor private <e> disruptor ؛ / ** * Ringbuffer */ private Ringbuffer <e> ringbuffer ؛ / ** * initqueue */ list private <d> initqueue = new ArrayList <D> () ؛ / *** حجم قائمة الانتظار** طول قائمة الانتظار ، يجب أن يكون قوة 2*/ محمية مجردة int getQueueSize () ؛ / ** * Event Factory * * return eventFactory */ محمي Eventfactory <e> eventFactory () ؛ / ** * Event Consumer * * Regurn WorkHandler [] */ محمية مجردة WorkHandler [] Gethandler () ؛ / *** initialization*/ public void init () {threadfactory nameThReadFactory = new threadfactorybuilder (). Disruptor = new disruptor <e> (eventFactory () ، getQueueSize () ، namedThreadFactory ، producerType.single ، getTrategy ()) ؛ disruptor.setDefaultExceptionHandler (New MyhandLerexception ()) ؛ Disruptor.HandleeventswithworkerPool (Gethandler ()) ؛ Ringbuffer = disruptor.start () ؛ // تهيئة نشر البيانات لـ (d data: initqueue) {ringbuffer.publishevent (New EventTransLatorOneArg <e ، d> () {Override public void translateto (e event ، long sequence ، d data) {event.setvalue (data) ؛}} ، data) ؛ } // إضافة خطاف تنظيف الموارد متزامن (QueUeHelperList) {if (QueUeHelperList.isempty ()) {runtime.getRuntime (). addshutdownhook (new thread () {ouverride public reg () }) ؛ } QueUeHelperList.add (this) ؛ }} /*** إذا كنت تريد تغيير أولوية تنفيذ مؤشر الترابط ، فتجاوز هذه السياسة. سيؤدي العائد على الاستجابة إلى زيادة الاستجابة وسيشغل أكثر من 70 ٪ من وحدة المعالجة المركزية عند الخمول. * استخدام SleepingWaitStrategy بحذر سيقلل من الاستجابة ويقلل من استخدام وحدة المعالجة المركزية ، ويستخدم في السجلات والسيناريوهات الأخرى. * * @return waitstrategy */ محمية مجردة waitstrategy getTrategy () ؛ /*** أدخل رسائل قائمة الانتظار ، ودعم إدراج قوائم الانتظار قبل كائن init ، وسيتم نشرها إلى قائمة الانتظار على الفور عند إنشاء قائمة الانتظار. */ public publishEvent (d data) {if (ringbuffer == null) {initqueue.add (data) ؛ يعود؛ } ringbuffer.publishevent (New EventTransLatorOneArg <e ، d> () {Override public void translateto (e event ، sequence long ، d data) {event.setValue (data) ؛}} ، data) ؛ } / *** leint queue* / public void sthowddown () {disruptor.shutdown () ؛ }}
EventFactory.java
/** * Author xielongwang * create 2018-01-18 6:24 PM * @eMail [email protected] * description */public class eventfactory تنفذ com.lmax.disruptor.eventfactory <seriesdataevent> { @ @ }}
myhandlerexception.java
الطبقة العامة myhandlerexception تنفذ استثناء Handler {private logger logger = loggerFactory.getLogger (myhandlerexception.class) ؛ / * * (غير javadoc) استثناء يحدث أثناء العملية * * see * com.lmax.disreptor.exceptionHandler#GeneAdeventException (java.lang.throwable * ، long ، java.lang.Object) */ Override public void geneventexception (throw ex ، sermence ext ، Object). logger.Error ("Sequence Sequence == [{}] event == [{}] ، ex == [{}]" ، التسلسل ، event.toString () ، ex.getMessage ()) ؛ } / * * (غير javadoc) استثناء في بدء التشغيل * * see * com.lmax.disruptor.exceptionHandler#GeneatOnStartException (java.lang. * throwable) * / @Override public void handleonstartException (exable Ex) } / * * (غير javadoc) استثناء عند الإغلاق * * see * com.lmax.disruptor.exceptionHandler#HandleOnShutDownException (java.lang *. }}SeriesData.java (يمثل الرسالة المرسلة إلى التطبيق B حسب التطبيق أ)
فئة عامة SeriesData {private string deviceInfoStr ؛ public SeriesData () {} seriesData (سلسلة deviceInfoStr) {this.deviceInfoStr = deviceInfoStr ؛ } السلسلة العامة getDeviceInfoStr () {return deviceInfoStr ؛ } public void setDeviceInfoStr (سلسلة deviceInfoStr) {this.deviceInfoStr = deviceInfoStr ؛ } Override Public String ToString () {return "seriesData {" + "deviceInfoStr = '" + deviceInfoStr +'/' +'} '؛ }}SeriesDataevent.java
SeriesDataEvent من الطبقة العامة يمتد ValuewRapper <SirectData> {}SeriesDataeventHandler.java
SeriesDataEventHandler Public SeriesDataEventHandler تنفذ WorkHandler <SiredDataEvent> {private logger logger = loggerfactory.getLogger (seriesDataeventHandler.Class) ؛ Autowired private deviceInfoservice deviceInfoservice ؛ Override public void Onevent (SeriesDataevent event) {if (event.getValue () == null || stringUtils.isempty (event.getValue (). getDeviceInfoStr ())) {logger.warn ( } // معالجة الأعمال deviceInFoservice.ProcessData (event.getValue (). }}SeriesDataeventqueueHelper.java
@componentpublic class seriesdataeventqueehelper يمتد basequeuehelper <seriesData ، seriesDataevent ، seriesDataEventHandler> تنفذ initializingbean {private static int queue_size = 1024 ؛ @Autowired List <SiredDataEventHandler> SeriesDataEventHandler ؛ Override محمي int getQueUeSize () {return queue_size ؛ } Override محمي com.lmax.disruptor.eventFactory eventFactory () {return new eventFactory () ؛ } Override WorkHandler [] Gethandler () {int size = seriesDataEventHandler.size () ؛ SeriesDataEventHandler [] ParameventHandlers = (SeriesDataEventHandler []) SeriesDataEventHandler.toarray (SeriesDataEventHandler [Size]) ؛ إرجاع Parameventhandlers ؛ } Override محمي WaitStrategy getTrategy () {return New BlockingWaitStrategy () ؛ // إرجاع new givelingwaitstrategy () ؛ } Override public void بعد propertiesset () يلقي الاستثناء {this.init () ؛ }}valuewrapper.java
الفئة المجردة العامة فئة ValuewRapper <T> {private t value ؛ publuewrapper () {} publuewRapper (t value) {this.value = value ؛ } p getValue () {return value ؛ } public void setValue (t value) {this.value = value ؛ }}DisruptorConfig.java
@configuration @componentscan (value = {"com.portal.disreptor"}) // مثيلات متعددة للعديد من المستهلكين من الفئة العامة disruptorConfig {/ ** * smsparameventhandler1 * * @return endrateventhandler */ @bean publicdataeventhandler smsparameventland1 () } / ** * smsparameventhandler2 * * regurn seriesDataEventHandler * / bean public SeriesDataEventHandler SmsParameVentHandler2 () {return new SeriesDataEventHandler () ؛ } / ** * smsparameventhandler3 * * regurn seriesDataEventHandler * / bean public SeriesDataEventHandler smsparameVentHandler3 () {return new SeriesDataEventHandler () ؛ } / ** * smsparameventhandler4 * * regurn seriesDataEventHandler * / bean public SeriesDataEventHandler SmsParameVentHandler4 () {return new SeriesDataEventHandler () ؛ } / ** * smsparameventhandler5 * * regurn seriesDataEventHandler * / bean public SeriesDataEventHandler SmsParameVentHandler5 () {return new SeriesDataEventHandler () ؛ }}امتحان
// enject seriesDataeventQueueHelper ProducerAurediired private SeriesDataeventqueHelper SeriesDataeventQueueHelper ؛ @requestmapping (value = "/data" ، method = requestMethod.post ، reduces = mediaType.application_json_value) public dataResponsvo <string> requiverdevicedata (requestbody string devicedata) {long starttime1 = system.currentTimEmillis () ؛ if (stringUtils.isempty (devicedata)) {logger.info ("بيانات المتلقي فارغة!") ؛ إرجاع dataResponsevo جديد <string> (400 ، "فشل") ؛ } seriesDataeventQueueHelper.PublishEvent (New SeriesData (Devicedata)) ؛ long startTime2 = System.CurrentTimeMillis () ؛ logger.info ("بيانات المتلقي == [{}] millionsecond == [{}]" ، devicedata ، startTime2 - StartTime1) ؛ إرجاع dataResponsevo الجديد <string> (200 ، "النجاح") ؛ } التطبيق A يرسل البيانات إلى التطبيق B من خلال واجهة /البيانات ، ثم يرسل الرسالة إلى قائمة انتظار Disruptor من خلال SeriesDataeventQueueHelper. المستهلكون يستهلكونها. لن تمنع العملية بأكملها التطبيق أ. يتم فقد الرسالة. يمكنك تمديد سلسلة SeriesDataeventQueueHelper لمراقبة قائمة انتظار Disruptor.
ما سبق هو كل محتوى هذه المقالة. آمل أن يكون ذلك مفيدًا لتعلم الجميع وآمل أن يدعم الجميع wulin.com أكثر.