Saya tidak akan terlalu banyak memperkenalkan pengganggu, dan menggambarkan skenario bisnis saat ini. Dua aplikasi A, B, dan data lulus ke aplikasi B. Transmisi data relatif cepat. Jika Anda menggunakan HTTP untuk secara langsung mendorong data dan kemudian memasukkan database, efisiensinya tidak tinggi. Ini dapat menyebabkan tekanan yang lebih besar pada aplikasi A. Menggunakan MQ terlalu berat, sehingga pengganggu dipilih. Anda juga dapat menggunakan reaktor
BasequeuHelper.java
/*** lmax.disruptrupruprate Efisien Antrian Pemrosesan. Mendukung antrian awal, yaitu, penerbitan sebelum init (). * * Utas sebenarnya dimulai ketika init () dipanggil dan mulai diproses. Sistem keluar dan secara otomatis membersihkan sumber daya. * * @author xielongwang * @create 2018-01-18 3:49 pm * @email [email protected] * @description */Public Abstract Class BasequeUeHelper <D, E Record All Stends <D>, HAGI PRIBEL SUMBERSIFLEKLE BAGIAN <E> {/** ** ** PRIVATION, dan HUBURAN BURNIACKLE BURSIFUL ENVERIFLY <E> {/** STENSIF PRIVEATIONAL, dan HUBURAN BURNIACLY BURSIFUL ENVERIFLY <E> {/** STENSIF PRIVEATION, dan HOLEUS ENVERIFLY BURSIFUL ENVERIFLE <E>> Daftar <BasequeUehelper> queuuHelperlist = new ArrayList <BaseQueUhelper> (); / *** objek pengganggu*/ pengganggu swasta <e> pengganggu; / ** * RingBuffer */ Private RingBuffer <E> RingBuffer; / ** * initqueue */ daftar pribadi <d> initqueue = new ArrayList <D> (); / *** Ukuran antrian** @return antrian panjang, harus berupa kekuatan 2*/ terlindungi int int getqueuesize (); / ** * Event Factory * * @return EventFactory */ Protected Abstrak EventFactory <E> EventFactory (); / ** * Konsumen Acara * * @return WorkHandler [] */ Workhandler abstrak yang dilindungi [] getHandler (); / *** Inisialisasi*/ public void init () {ThreadFactory namedThreadFactory = new ThreadFactoryBuilder (). SetNameformat ("DisruptorthReadPool"). Build (); Disruptor = Disruptor baru <E> (EventFactory (), getQueuesize (), bernamaTreadFactory, productype.single, getStrategy ()); Disruptor.setDefaultExceptionHandler (baru MyHandLerException ()); Disruptor.handleeventswithworkerpool (getHandler ()); RingBuffer = Disruptor.Start (); // Inisialisasi Penerbitan Data untuk (D Data: Initqueue) {RingBuffer.PublishEvent (EventTranslatoroneArg baru <e, d> () {@Override public void translateTo (Event, Event, Long Sequence, D Data) {event.setValue (data);}}, data); } // Tambahkan sumber daya pembersih hook disinkronkan (queueHelperlist) {if (queueHelperlist.isempty ()) {runtime.getRuntime (). Addshutdownhook (thread baru () {@override public run () {for (basis basequeUhelperperpers: basequeUhelperpers: basequeUhelperpers: basequeUhelperpers: basequeUhelperpers: basequeUhelperpers: basequeUhelperpers: basequeUhelperpers: basequeUhelperpers {quleistlper: basequeUhelperpers {quleistlperperpery: basequeueHelper: basis basequeUhelperperperper basequeUhelper.shutdown ();}}}); } queUehelperlist.add (ini); }} /*** Jika Anda ingin mengubah prioritas eksekusi utas, timpa kebijakan ini. Menghasilkan Strategi akan meningkatkan respons dan menempati lebih dari 70% CPU saat menganggur. * Gunakan Sleepingwaitstrategy dengan hati -hati akan mengurangi respons dan mengurangi penggunaan CPU, dan digunakan dalam log dan skenario lainnya. * * @return Waitstrategy */ Protected Abstract Waitstrategy GetStrategy (); /*** Masukkan pesan antrian, dukungan memasukkan antrian sebelum objek init, dan akan diterbitkan ke antrian segera ketika antrian dibuat. */ public disinkronkan void publisheVent (data d) {if (ringBuffer == null) {initqueue.add (data); kembali; } RingBuffer.PublishEvent (EventTranslatoroneArg baru <e, d> () {@Override public void TransLateTo (Event E Event, Long Sequence, D Data) {event.setValue (data);}}, data); } / *** Tutup antrian* / public void shutdown () {Disruptor.shutdown (); }}
EventFactory.java
/** * @author xielongwang * @create 2018-01-18 6:24 pm * @email [email protected] * @description */kelas publik eventfactory mengimplementasikan com.lmax.disruptor.eventfactory <deriesAevent> {@override publicer (SeriesDataEvent> {@Override publicer (SeriesDataEvent> {@Override Public (SeriesDataEvent> {@OVERRIDEDEVENCE (SeriesDataEvent> {@Override public (SeriesDataEvent> {@Override public {SeriesDataEvent> }}
Myhandlerexception.java
kelas publik MyHandLerException mengimplementasikan ExceptionHandler {private Logger Logger = LoggerFactory.getLogger (myhandleRexception.class); / * * (non-javadoc) Pengecualian yang terjadi selama operasi * * @see * com.lmax.disruprupt.ExceptionHandler#HandleEventException (java.lang.Trowable *, long, java.lang. Logger.Error ("Urutan kesalahan data proses == [{}] event == [{}], ex == [{}]", sequence, event.toString (), ex.getMessage ()); } / * * (non-javadoc) Pengecualian saat startup * * @see * com.lmax.disrupt.ExceptionHandler#handleonstartexception (java.lang. * Throwable) * / @Override void handleonstartException (throwable ex) {logger.error ("start void handleonstartException (throwable) {logger.error (" start disrupring ("Disrapporage == (log logger) {logger.error (" start disremremor ==) (Disremor == [log logger) {logger.error ("start disrapter (" disremor a) {{logger) {logger.error ("start disrapor = {logger) {logger. } / *; }}SeriesData.java (mewakili pesan yang dikirim ke aplikasi B dengan aplikasi a)
Public Class SeriesData {Private String DeviceInfostr; Public SeriesData () {} public SeriesData (String DeviceInfostr) {this.deviceInfostr = deviceInfostr; } public String getDeviceInfostr () {return deviceInfostr; } public void setDeviceInfostr (String deviceInfostr) {this.deviceInfostr = deviceInfostr; } @Override Public String ToString () {return "SeriesData {" + "deviceInfostr = '" + deviceInfostr +'/'' + '}'; }}SeriesDataEvent.java
Public Class SeriesDataEvent memperluas ValueWrapper <SeriesData> {}SeriesDataEventHandler.java
Public Class SeriesDataEventhandler mengimplementasikan Workhandler <SersionEvent> {private Logger Logger = LoggerFactory.getLogger (SeriesDataEventHandler.class); @Autowired private deviceInfoservice deviceInfoservice; @Override public void onevent (SeriesDataEvent Event) {if (event.getValue () == null || stringutils.isempty (event.getValue (). GetDeviceInfostr ())) {logger.warn ("Data seri penerima kosong!"); } // Business Processing deviceInfoservice.processdata (event.getValue (). GetDeviceInfostr ()); }}SeriesDataEventQueUhelper.java
@ComponentPublic Class SeriesDataEventQueUhelper memperluas basequeuHelper <SeriesData, SeriesDataEvent, SeriesDataEventHandler> mengimplementasikan inisialisasi {private static final int queue_size = 1024; @Autowired Private List <sersionDataEventHandler> SeriesDataEventHandler; @Override Int int getqueuesize () {return queue_size; } @Override Protected com.lmax.disruptor.eventfactory eventFactory () {return new EventFactory (); } @Override dilindungi workhandler [] getHandler () {int size = SeriesDataEventHandler.size (); SeriesDataEventHandler [] ParamEventhandlers = (SeriesDataEventHandler []) SeriesDataEventHandler.toArray (SeriesdataEventhandler baru [size]); mengembalikan ParamEventhandlers; } @Override dilindungi waitstrategy getStrategy () {return new blockingwaitstrategy (); // return new HomelDingWaitStrategy (); } @Override public void afterpropertiesset () melempar Exception {this.init (); }}Valuewrapper.java
Public Abstract Class ValueWrapper <T> {private t Value; public valuewrapper () {} public valuewrapper (nilai t) {this.value = nilai; } public t getValue () {nilai pengembalian; } public void setValue (nilai t) {this.value = nilai; }}DisruptorConfig.java
@Configuration@ComponentScan(value = {"com.portal.disruptor"})// Multiple instances of several consumers public class DisruptorConfig { /** * smsParamEventHandler1 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler1() { return new SeriesDataEventHandler(); } / ** * smsparameventhandler2 * * @return SeriesDataEventhandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHDANDLER2 () {return New SeriesDataEventHandler (); } / ** * smsparameventhandler3 * * @return SeriesDataEventhandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHDANDLER3 () {return New SeriesDataEventHandler (); } / ** * smsparameventhandler4 * * @return SeriesDataEventhandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHDANDLER4 () {return New SeriesDataEventHandler (); } / ** * smsparameventhandler5 * * @return SeriesDataEventhandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHDANDLER5 () {return New SeriesDataEventHandler (); }}tes
// inject produser pesan SeriesDataEventQueUhelper @Autowired private SeriesEventQueUhelper SeriesDataEventQueUhelper; @RequestMapping (value = "/data", Method = requestMethod.post, menghasilkan = mediatype.application_json_value) public dataresponsevo <string> receiverdevicedata (@RequestBody String Devicedata) {long starttime1 = System.currentTimeMillis (); if (stringutils.isempty (devicedata)) {logger.info ("Data penerima kosong!"); return dataresponsevo baru <string> (400, "gagal"); } SeriesDataEventQueUhelper.publishevent (Seriesdata baru (Devicedata)); Long StartTime2 = System.CurrentTimeMillis (); Logger.info ("Data Penerima == [{}] Millionsecond == [{}]", Devicedata, StartTime2 - StartTime1); return dataresponsevo baru <string> (200, "sukses"); } Aplikasi A mengirimkan data ke aplikasi B melalui antarmuka /data, dan kemudian mengirim pesan ke antrian pengganggu melalui SeriesDataEventQueUhelper. Konsumen mengkonsumsinya. Seluruh proses tidak akan memblokir aplikasi A. Pesan hilang. Anda dapat memperpanjang SeriDataEventQueUEhelper untuk memantau antrian pengganggu.
Di atas adalah semua konten artikel ini. Saya berharap ini akan membantu untuk pembelajaran semua orang dan saya harap semua orang akan lebih mendukung wulin.com.