ฉันจะไม่แนะนำผู้ทำลายมากเกินไปและอธิบายสถานการณ์ธุรกิจปัจจุบัน แอพพลิเคชั่นทั้งสอง A, B และข้อมูลผ่านไปยังแอปพลิเคชัน B การส่งข้อมูลค่อนข้างเร็ว หากคุณใช้ HTTP เพื่อส่งข้อมูลโดยตรงจากนั้นป้อนฐานข้อมูลประสิทธิภาพไม่สูง มันอาจนำไปสู่แรงกดดันที่มากขึ้นในการใช้งานของ A การใช้ MQ นั้นหนักเกินไปดังนั้นจึงเลือก disruptor คุณยังสามารถใช้เครื่องปฏิกรณ์
Basequeuehelper.java
/*** lmax.disruptor เทมเพลตการประมวลผลคิวที่มีประสิทธิภาพ รองรับคิวเริ่มต้นนั่นคือการเผยแพร่ก่อน init () * * เธรดเริ่มต้นจริงเมื่อ init () ถูกเรียกและเริ่มการประมวลผล ระบบออกและทำความสะอาดทรัพยากรโดยอัตโนมัติ * * @author Xielongwang * @create 2018-01-18 3:49 PM * @Email [email protected] * @description */บทคัดย่อระดับสาธารณะ basequeueHelper <d, extive que extive รายการ <sasequeueHelper> queueHelperList = arrayList ใหม่ <SasequeueHelper> (); / *** disruptor object*/ private disruptor <e> disruptor; / ** * ringbuffer */ private ringbuffer <e> ringbuffer; / ** * initqueue */ รายการส่วนตัว <d> initqueue = new ArrayList <d> (); / *** ขนาดคิว** @return ความยาวคิวต้องเป็นพลังของ 2*/ การป้องกันบทคัดย่อ int getqueuesize (); / ** * Event Factory * * @return EventFactory */ Protected Abstract EventFactory <E> EventFactory (); / ** * ผู้บริโภคเหตุการณ์ * * @return workHandler [] */ การป้องกันบทคัดย่อ workhandler [] gethandler (); / *** การเริ่มต้น*/ โมฆะสาธารณะ init () {ThreadFactory NamedThreadFactory = new ThreadFactoryBuilder (). setNameFormat ("disruptorthreadPool"). build (); disruptor = new disruptor <e> (EventFactory (), getqueuesize (), namedthreadfactory, producertype.single, getStrategy ()); disruptor.setDefaultExceptionHandler (ใหม่ myhandlerexception ()); disruptor.handleeventsworkerpool (Gethandler ()); ringbuffer = disruptor.start (); // เริ่มต้นการเผยแพร่ข้อมูลสำหรับ (d data: initqueue) {ringbuffer.publisheVent (ใหม่ EventTranslatorOnearg <e, d> () {@Override โมฆะสาธารณะ translateTo (เหตุการณ์ e, ลำดับยาว, d data) {event.setValue (ข้อมูล);}}, ข้อมูล); } // เพิ่มการทำความสะอาดทรัพยากรตะขอที่ซิงโครไนซ์ (queueHelperlist) {ถ้า (queueHelperlist.isempty ()) {runtime.getRuntime (). addshutdownhook (Thread () {@Override public Void Run () - } queuehelperlist.add (นี่); }} /*** หากคุณต้องการเปลี่ยนลำดับความสำคัญของการดำเนินการเธรดให้แทนที่นโยบายนี้ Fileingwaitstrategy จะเพิ่มการตอบสนองและครอบครองมากกว่า 70% ของ CPU เมื่อไม่ได้ใช้งาน * ใช้การนอนหลับ WAITSTRATEGY ด้วยความระมัดระวังจะลดการตอบสนองและลดการใช้งาน CPU และใช้ในบันทึกและสถานการณ์อื่น ๆ * * @return Waitstrategy */ บทคัดย่อที่ได้รับการป้องกัน Waitstrategy GetStrategy (); /*** แทรกข้อความคิวรองรับการแทรกคิวก่อนวัตถุเริ่มต้นและจะถูกเผยแพร่ไปยังคิวทันทีเมื่อมีการจัดตั้งคิว */ โมฆะที่ซิงโครไนซ์สาธารณะ publisheVent (d data) {ถ้า (ringbuffer == null) {initqueue.add (data); กลับ; } ringbuffer.publisheVent (ใหม่ EventTranslatorOneArg <e, d> () {@Override โมฆะสาธารณะ translateTo (เหตุการณ์ e, ลำดับยาว, ข้อมูล d) {event.setValue (ข้อมูล);}}, ข้อมูล); } / *** ปิดคิว* / การปิดช่องว่างสาธารณะ () {disruptor.shutdown (); -
EventFactory.java
/** * @author Xielongwang * @create 2018-01-18 6:24 PM * @Email [email protected] * @description */Class Public EventFactory ดำเนินการ com.lmax.disructor.eventFactory -
myhandlerexception.java
คลาสสาธารณะ MyHandLerException ใช้ ExceptionHandler {Private Logger Logger = LoggerFactory.getLogger (myhandlerexception.class); / * * (ไม่ใช่ javadoc) ข้อยกเว้นที่เกิดขึ้นระหว่างการดำเนินการ * * @See * com.lmax.disruptor.exceptionHandler#handleEventException (java.lang.-hrowable * ยาว java.lang.Object) */ @Override logger.error ("ลำดับข้อผิดพลาดของข้อมูลกระบวนการ == [{}] เหตุการณ์ == [{}], ex == [{}]", sequence, event.toString (), ex.getMessage ()); } / * * (ไม่ใช่ javadoc) ข้อยกเว้นที่ Startup * * @See * com.lmax.disruptor.exceptionHandler#handleonstartexception (java.lang. * throwable) * / @Override void handleonstartexception (throwable ex) {logger.error ( } / * * (ไม่ใช่ javadoc) ข้อยกเว้นเมื่อปิด * * @see * com.lmax.disruptor.exceptionhandler#handleonshutdownexception (java.lang * .hrowable) * / @Override void handleonshutdownexception -seriesdata.java (แสดงข้อความที่ส่งไปยังแอพ B โดยแอปพลิเคชัน A)
คลาสสาธารณะ seriesdata {private String deviceInfOSTR; public seriesdata () {} public seriesdata (String deviceInfOSTR) {this.deviceInfOSTR = deviceInfOSTR; } สตริงสาธารณะ getDeviceInFOSTR () {return deviceInfOSTR; } โมฆะสาธารณะ setDeViceInFOSTR (String deviceInfOSTR) {this.deViceInFOSTR = deviceInfOSTR; } @Override สตริงสาธารณะ toString () {return "seriesData {" + "deviceInfOSTR = '" deviceInfOSTR +'/'' + '}'; -seriesdataevent.java
Public Class SeriesDataEvent ขยาย ValueWrapper <SeasyData> {}seriesdataeventhandler.java
Public Class SeriesDataEventHandler ใช้ WorkHandler <SeasyDataEvent> {ส่วนตัว logger logger = loggerFactory.getLogger (SeriesDataEventHandler.class); @autoWired DeviceInfoservice DeviceInfoservice; @Override โมฆะสาธารณะ oneVent (เหตุการณ์ seriesdataevent) {ถ้า (event.getValue () == null || stringutils.isempty (event.getValue (). getDeviceInfOSTR ())) {logger.warn ( } // อุปกรณ์การประมวลผลทางธุรกิจ Infoservice.processData (Event.getValue (). getDeviceInfOSTR ()); -seriesdataeventqueuehelper.java
@ComponentPublic Class SeriesDataEventqueueHelper ขยาย basequeueHelper <seriesdata, seriesdataevent, seriesdataeventhandler> ดำเนินการเริ่มต้น {ส่วนตัวคงที่ int สุดท้าย int queue_size = 1024; @AutoWired List <seriesDataEventHandler> seriesDataEventHandler; @Override ป้องกัน int getQueuesize () {return queue_size; } @Override ป้องกัน com.lmax.disruptor.eventFactory eventFactory () {ส่งคืน eventFactory ใหม่ (); } @Override Protected WorkHandler [] Gethandler () {int size = seriesDataEventHandler.size (); SeriesDataEventHandler [] parameVentHandlers = (seriesDataEventHandler []) seriesDataEventHandler.toArray (SeriesDataEventHandler [ขนาด] ใหม่); ส่งคืน parameventhandlers; } @Override Protected Waitstrategy GetStrategy () {ส่งคืนใหม่ BlockingWaitStrategy (); // ส่งคืนใหม่ DielingingWaitstrategy (); } @Override โมฆะสาธารณะ AfterPropertIesset () โยนข้อยกเว้น {this.init (); -Valuewrapper.java
ValueWrapper ระดับนามธรรมสาธารณะ <t> {ค่า T ส่วนตัว; Public ValueWrapper () {} ValueWrapper สาธารณะ (ค่า t) {this.value = value; } สาธารณะ t getValue () {ค่าคืน; } โมฆะสาธารณะ setValue (ค่า t) {this.value = value; -disruptorConfig.java
@configuration @ComponentsCan (value = {"com.portal.disruptor"}) // อินสแตนซ์หลายครั้งของผู้บริโภคหลายคนในชั้นเรียนสาธารณะ disruptorConfig {/ ** * smsparameventhandler1 * * @return seriesdataeventhandlame } / ** * smsparameventhandler2 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler2 () {return new seriesdataeventhandler (); } / ** * smsparameventhandler3 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler3 () {return new seriesdataeventhandler (); } / ** * smsparameventhandler4 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler4 () {return new seriesdataeventhandler (); } / ** * smsparameventhandler5 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler5 () {return new seriesdataeventhandler (); -ทดสอบ
// inject seriesdataeventqueuehelper ผู้ผลิตข้อความ @autowired private seriesdataeventqueuehelper seriesdataeventqueuehelper; @RequestMapping (value = "/data", method = requestMethod.post, ผลิต = mediaType.application_json_value) Public DataResponsevo <String> teadDevicedata (@requestbody String Devicedata) if (stringutils.isempty (devicedata)) {logger.info ("ข้อมูลตัวรับสัญญาณว่างเปล่า!"); ส่งคืน DataResponsevo ใหม่ <String> (400, "ล้มเหลว"); } seriesdataeventqueuehelper.publisheVent (ใหม่ seriesdata (devicedata)); Long StartTime2 = System.currentTimeMillis (); logger.info ("data ตัวรับสัญญาณ == [{}] Millionsecond == [{}]", Devicedata, StartTime2 - StartTime1); ส่งคืน DataResponsevo ใหม่ <String> (200, "Success"); - แอปพลิเคชัน A ส่งข้อมูลไปยังแอปพลิเคชัน B ผ่าน /ข้อมูลอินเทอร์เฟซจากนั้นส่งข้อความไปยังคิว disruptor ผ่าน seriesdataeventqueuehelper ผู้บริโภคกินมัน กระบวนการทั้งหมดจะไม่บล็อกแอปพลิเคชัน A ข้อความจะหายไป คุณสามารถขยาย seriesdataeventqueuehelper เพื่อตรวจสอบคิว disruptor
ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่ามันจะเป็นประโยชน์ต่อการเรียนรู้ของทุกคนและฉันหวังว่าทุกคนจะสนับสนุน wulin.com มากขึ้น