No presentaré demasiado al disruptor y describiré el escenario comercial actual. Las dos aplicaciones A, B y A Pase Datos a la aplicación B. La transmisión de datos es relativamente rápida. Si usa HTTP para presionar directamente los datos y luego ingrese la base de datos, la eficiencia no es alta. Puede conducir a una mayor presión sobre la aplicación de A. Usar MQ es demasiado pesado, por lo que se selecciona el disruptor. También puedes usar reactor
Buesqueuehelper.java
/*** LMAX. Plantilla de procesamiento de cola eficiente de disruptor. Admite colas iniciales, es decir, publicar antes de init (). * * El hilo realmente se inicia cuando se llama a init () y se inicia procesando. El sistema sale y limpia automáticamente los recursos. * * @author xielongwang * @create 2018-01-18 3:49 pm * @email [email protected] * @Description */public abstract class BaseSqueueHelper <d, e extiende valuewrapper <d>, h extiende WorkHandler <e>> {/** * registrar todas Lista <BasequeueHelper> queueHelperList = new ArrayList <BasequeueHelper> (); / *** Objeto disruptor*/ disruptor privado <E> disruptor; / ** * ringBuffer */ private ringbuffer <E> ringbuffer; / ** * Initqueue */ Private List <d> Initqueue = new ArrayList <d> (); / *** Tamaño de la cola** @return Longitud de la cola, debe ser un poder de 2*/ protegido abstracto int getqueuSize (); / ** * Event Factory * * @Return EventFactory */ protegido Abstract EventFactory <E> EventFactory (); / ** * Consumidor de eventos * * @return workhandler [] */ protegido abstracto workhandler [] gethandler (); / *** Inicialización*/ public void init () {ThreadFactory namedThreadFactory = new ThreadFactoryBuilder (). SetNameFormat ("DisruptorthreadPool"). Build (); disruptor = new Disruptor <E> (eventFactory (), getqueuSize (), namedthreadFactory, producertype.single, getTrategy ()); disruptor.setDefaultExceptionHandler (new MyHandLerException ()); disruptor.handleeventswithworkerpool (gethandler ()); ringBuffer = disruptor.start (); // Inicializar la publicación de datos para (D Datos: Initqueue) {ringBuffer.PublisheVent (new EventTranslatorOnearg <e, d> () {@Override public void Translateto (E Event, Long Sequence, D Data) {Event.SetValue (datos);}}, datos); } //Add resource cleaning hook synchronized (queueHelperList) { if (queueHelperList.isEmpty()) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (BaseQueueHelper baseQueueHelper : queueHelperList) { BuesqueueHelper.shutdown (); } queuehelperlist.add (this); }} /*** Si desea cambiar la prioridad de ejecución del hilo, anule esta política. El cero de cero aumentará la respuesta y ocupará más del 70% de la CPU cuando está inactiva. * Use SleepingWaitStrategy con precaución reducirá la respuesta y reducirá el uso de la CPU, y se usa en registros y otros escenarios. * * @return waitstrategy */ protegido abstracto WaitStrategy getStrategy (); /*** Inserte los mensajes de la cola, admite la inserción de colas antes del objeto init, y se publicará en la cola inmediatamente cuando se establezca la cola. */ public Synchronized void PublisheVent (d data) {if (ringBuffer == null) {initqueue.add (data); devolver; } ringBuffer.PublisheVent (New EventTranslatorOreng <e, d> () {@Override public void Translateto (E event, Long secuence, d data) {event.setValue (data);}}, data); } / *** Cerrar cola* / public void shutdown () {disruptor.shutdown (); }}
EventFactory.java
/** * @author Xielongwang * @Create 2018-01-18 6:24 pm * @email [email protected] * @Description */public class EventFactory Implements com.lmax.disruptor.eventFactory <SeriesDataEvent> {@Override PublicDataEvent NewInstance () {Return New SereEvent () (); }}
Myhandlerexception.java
clase pública myHandLerException implementa ExceptionHandler {private logger logger = loggerFactory.getLogger (myHandLerException.class); / * * (no javadoc) Excepción que ocurre durante la operación * * @see * com.lmax.disruptor.ExceptionHandler#handleEventException (java.lang.throwable *, long, java.lang.object) */ @override public void manyEventException (showeable ex, long secuence, object Event) {ex.printstacktRace (); logger.error ("Secuencia de error de datos de proceso == [{}] event == [{}], ex == [{}]", secuencia, event.ToString (), ex.getMessage ()); } / * * (no javadoc) Excepción al inicio * * @see * com.lmax.disruptor.ExceptionHandler#HandleOnStARTException (java.lang. * showable) * / @Override public void HandleSttarTexception (showleable Ex) {Logger.error ("Error de inicio de inicio == [{}]! } / * * (no javadoc) Excepción cuando se está cerrando * * @see * com.lmax.disruptor.exceptionHandler#handleonShutdownException (java.lang * .throwable) * / @Override public void (manejo de la mano (showable ex) {logger.error ("Error de interruptor de cierre == [{}]!",). }}SeriesData.java (representa el mensaje enviado a la aplicación B por la aplicación a)
Public Class SeriesData {private String DeviceInfoStr; Public seriesData () {} public seriesData (String DeviceInfoStr) {this.deviceInfoStr = DeviceInfoStr; } public String getDeviceInfoStr () {return DeviceInfoStr; } public void setDeviceInfoStr (String DeviceInfoStr) {this.deviceInfoStr = DeviceInfoStr; } @Override public string toString () {return "seriesData {" + "dispositivosinfostr = '" + DeviceInfoStr +'/' +'} '; }}SeriesDataEvent.java
Public Class SeriesDataEvent extiende ValueWrapper <SeriesData> {}SeriesDataEventHandler.java
Public Class SeriesDataEventHandler implementa WorkHandler <SeriesDataEvent> {private logger logger = loggerFactory.getLogger (serieDataEventHandler.class); @AUTOWIREDEDIREDED ENVICTOINFOSERVICE DeviceInfoservice; @Override public void OneVent (serie de EventDataEvent) {if (event.getValue () == null || stringUtils.isEmpty (event.getValue (). GetDeviceInfoStr ()) {logger.warn ("Los datos de la serie de receptores están vacíos!"); } // Deviceinfoservice.processData (event.getValue (). GetDeviceInfoStr ()); }}SeriesDataEventQueueHelper.java
@ComponentPublic Class SeriesDataEventQueueHelper extiende BuesqueueHelper <SeriesData, SeriesDataEvent, SeriesDataEventHandler> Implementa InicializingBean {private Static final int queue_size = 1024; @AUTOWired Lista privada <serieDataEventHandler> SeriesDataEventHandler; @Override protegido int getqueuSize () {return queue_size; } @Override Protected com.lmax.disruptor.eventFactory EventFactory () {return new EventFactory (); } @Override WorkHandler [] gethandler () {int size = seriesDataEventHandler.size (); SeriesDataEventHandler [] parameventhandlers = (serieDataEventHandler []) SeriesDataEventHandler.toarray (nueva serieDataEventHandler [tamaño]); Devuelve los paramevestores; } @Override WaitStrategy protegido getStrategy () {return new BloquingWaitStrategy (); // devolver nuevo cerotingwaitStrategy (); } @Override public void AfterPropertiesSet () lanza la excepción {this.init (); }}Vallewrapper.java
Public Abstract Class ValueWrapper <T> {valor t privado; public valuewraper () {} public valuewraper (t value) {this.value = value; } public t getValue () {Valor de retorno; } public void setValue (t value) {this.value = value; }}DisruptorConfig.java
@Configuration @ComponentScan (valor = {"com.portal.disrupror"}) // Múltiples instancias de varios consumidores clase pública disruptorCig {/ ** * smsparameventhandler1 * * @return seriesdataventhandler */ @Bean ser serie publicadaventhandler smsParameVenthandler1 () {return NewdataEventhAventhandler ();););););););); } / ** * smsParAMEventHandler2 * * @return seriesDataEventHandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHANDLER2 () {return New SeriesDataEventHandler (); } / ** * smsParAMEventHandler3 * * @return seriesDataEventHandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHANDLER3 () {return New SeriesDataEventHandler (); } / ** * smsParAMEventHandler4 * * @return seriesDataEventHandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHANDLER4 () {return New SeriesDataEventHandler (); } / ** * smsParAMEventHandler5 * * @return seriesDataEventHandler * / @Bean public SerieDataEventHandler smsParameVentHandler5 () {return new SeriesDataEventHandler (); }}prueba
// inyectar la serie DataEventQueueHelper Mensaje productor @autewired series privadas DataEventQueueHelper SeriesDataEventQueueHelper; @RequestMapping (value = "/data", método = requestmethod.post, produce = mediatype.application_json_value) public dataSponsevo <String> receperDevicedata (@RequestBody String DeviceData) if (StringUtils.isEmpty (DeviceTata)) {logger.info ("¡Los datos del receptor están vacíos!"); devolver nuevo datAREpSonsevo <String> (400, "fallido"); } SeriesDataEventQueueHelper.PublisheVent (nueva serieData (Devicedata)); Long StartTime2 = System.CurrentTimemillis (); logger.info ("Datos de receptor == [{}] Millionsecond == [{}]", Devicedata, inicioTtime2 - starttime1); devolver nuevo DatoPonseVo <String> (200, "éxito"); } Aplicación A envía datos a la aplicación B a través de la interfaz /datos, y luego envía el mensaje a la cola de disruptor a través de la serieDataEventQueueHelper. Los consumidores lo consumen. Todo el proceso no bloqueará la aplicación A. El mensaje se pierde. Puede extender la serie DataEventQueueHelper para monitorear la cola de disruptor.
Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.