Я не буду слишком много представлять разрушитель и описать текущий бизнес -сценарий. Два приложения A, B и передача данных в приложение B. передача данных относительно быстрая. Если вы используете HTTP, чтобы напрямую натолкнуть данные, а затем введите базу данных, эффективность не высока. Это может привести к большему давлению на применение А. Использование MQ слишком тяжело, поэтому выбирается разрушитель. Вы также можете использовать реактор
BasequeueHelper.java
/*** Lmax.Disruptor Эффективный шаблон обработки очередей. Поддерживает начальные очереди, то есть публикация до init (). * * Поток фактически запускается, когда init () вызывается и запускает обработку. Система выходит и автоматически очищает ресурсы. * * @author xielongwang * @create 2018-01-18 3:49 вечера * @email [email protected] * @description */public Abstract Class BaseueHelper <D, E расширяет valuewrapper <d>, h расширяет рабочее руча <e >> {/**. List <basequeuehelper> queuehelperlist = new Arraylist <basequeuehelper> (); / *** Объект Disruptor*/ Private Disruptor <e> Disruptor; / ** * RINGBUFFER */ Private Ringbuffer <e> Ringbuffer; / ** * initqueue */ private list <d> initqueue = new ArrayList <d> (); / *** Размер очереди** @return Длина очереди, должна быть сила 2*/ Защищенный абстрактный int getQueuesize (); / ** * Фабрика событий * * @return EventFactory */ Protected Abstract EventFactory <e> EventFactory (); / ** * Потребитель событий * * @return Workhandler [] */ Защищенный абстрактный работник [] gethandler (); / *** Инициализация*/ public void init () {thinkfactory andThreadFactory = new ThinkFactoryBuilder (). SetNameFormat ("DisruptorThreadPool"). Build (); Disruptor = New Disruptor <e> (EventFactory (), getQueuesize (), NateThreadFactory, ProductType.Single, getStrategy ()); Disruptor.setDefatexceptionHandler (new MyHandLerexception ()); Disruptor.handleeventswithWorkerPool (gethandler ()); RingBuffer = Disruptor.Start (); // Инициализировать публикацию данных для (D Data: initqueue) {ringbuffer.publishevent (new EventTranslatorOnearg <e, d> () {@Override public void translateTo (E Event, Long Sequence, D Data) {Event.SetValue (data);}}, Data); } // Добавить ресурс для очистки крюка Synchronized (queueHelperList) {if (QueueHelperList.isempty ()) {runtime.getRuntime (). AddShutDownHook (new Thread () {@Override public void run () {for (basequeueHelpeueHelper: QueueHelperList) }}); } queuehelperlist.add (this); }} /*** Если вы хотите изменить приоритет выполнения потока, переопределите эту политику. Earligingwaitstrategy увеличит реакцию и займет более 70% процессора при простоя. * Используйте SleepwaitStrategy с осторожностью уменьшит ответ и уменьшит использование процессора и используется в журналах и других сценариях. * * @return Waitstrategy */ защищенная абстрактная waitstrategy getStrategy (); /*** Вставьте сообщения в очереди, поддерживайте вставку очереди перед объектом инициатором и будут опубликованы в очередь немедленно при установке очереди. */ public synchronized void publishevent (d data) {if (ringbuffer == null) {initqueue.add (data); возвращаться; } ringbuffer.publishEvent (new EventTranslatorOnearg <e, d> () {@Override public void translateTo (E Event, Long Sequence, D Data) {event.SetValue (data);}}, data); } / *** Закрыть очередь* / public void shutdown () {disruptor.shutdown (); }}
EventFactory.java
/** * @author xielongwang * @create 2018-01-18 6:24 pm * @email [email protected] * @description */public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> { @Override public SeriesDataEvent newInstance() { return new SeriesDataEvent(); }}
Myhandlerexception.java
открытый класс myHandLerexception реализует exceptionHandler {private logger logger = loggerfactory.getLogger (myhandleRexception.class); / * * (не-Javadoc) Исключение, которое происходит во время операции * * @see * com.lmax.disruptor.exceptionhandler#handleeventexception (java.lang.throwable *, long, java.lang.object) */ @@override public void handleeventexception (Throwable ex, Long Sequence, объект Event) {ex.printStactexception (); logger.error ("Последовательность ошибки данных процесса == [{}] event == [{}], ex == [{}]", последовательность, event.tostring (), ex.getmessage ()); } / * * (не Javadoc) Исключение при запуске * * @see * com.lmax.disruptor.exceptionhandler#handleonstartexception (java.lang. * throwable) * / @@override public void handleonstartexception (throwable ex) {logger.error ("start disruptor error == [{}]", ex.getmess; } / * * (не-Javadoc) Исключение, когда закрыто * * @see * com.lmax.disruptor.exceptionHandler#handleOnShutDownException (java.lang * .Throwable) * / @@override public void handleonshutdownexception (throwable ex) {logger.Error ("urddown disruptor error == [{{{}", ex); }}Seriesdata.java (представляет сообщение, отправленное в приложение B по приложению A)
public class series hardata {private String DeviceInfoStr; public seriesdata () {} public seriesdata (String DeviceInfoStr) {this.DeviceInfoStr = deviceInfoStr; } public String getDeviceInfoStr () {return DeviceInfoStr; } public void setDeviceInfoStr (String DeviceInfoStr) {this.DeviceInfoStr = deviceInfoStr; } @Override public String toString () {return "seriesData {" + "deviceInfoStr = '" + deviceInfoStr +'/'' + '}'; }}Seriesdataevent.java
Public Class Series DATAEVENT Extends ValueWrapper <seriesData> {}Seriesdataeventhandler.java
Общедоступный класс серии Dadataeventhandler реализует Workhandler <seriesdataevent> {private logger logger = loggerfactory.getlogger (seriesdataeventhandler.class); @Autowired Private DeviceInfoService DeviceInfoService; @Override public void OneVent (seriesDataEvent Event) {if (event.getValue () == null || stringUtils.isempty (event.getValue (). GetDeviceInfoStr ())) {logger.warn («Данные серии приемника пусты!»); } // бизнес -обработка DeviceInfoService.processData (event.getValue (). GetDeviceInfoStr ()); }}Seriesdataeventqueuehelper.java
@Componentpublic class series dadataeventqueuehelper extends basequeuehelper <seriesdata, seriesdataevent, seriesdataeventhandler> реализует инициализацию bean {private static final queue_size = 1024; @Autowired private list <seriesdataeventhandler> seriesdataeventhandler; @Override защищен int getQueuesize () {return queue_size; } @Override Protected com.lmax.disruptor.eventFactory eventFactory () {return new EventFactory (); } @Override защищен WorkHandler [] gethandler () {int size = seriesdataeventhandler.size (); Seriesdataeventhandler [] parameventhandlers = (seriesdataeventhandler []) series dadataeventhandler.toarray (new series dadataeventhandler [size]); вернуть Parameventhandlers; } @Override защищен WaitStrategy getStrategy () {return new BlockingWaitStrategy (); // вернуть новый eleamingwaitstrategy (); } @Override public void afterpropertiesset () бросает исключение {this.init (); }}Valuewrapper.java
открытый абстрактный класс valuewrapper <t> {private t value; public valueWrapper () {} public ValueWrapper (t value) {this.value = value; } public t getValue () {return Value; } public void setValue (t value) {this.value = value; }}DisruptorConfig.java
@Configuration@ComponentScan(value = {"com.portal.disruptor"})// Multiple instances of several consumers public class DisruptorConfig { /** * smsParamEventHandler1 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler1() { return new SeriesDataEventHandler(); } / ** * smsparameventhandler2 * * @return seriesdataeventhandler * / @bean public series dadataeventhandler smsparameventhandler2 () {return new series adataeventhandler (); } / ** * smsparameventhandler3 * * @return seriesdataeventhandler * / @bean public series dadataeventhandler smsparameventhandler3 () {return new series adataeventhandler (); } / ** * smsparameventhandler4 * * @return seriesdataeventhandler * / @bean public series dadataeventhandler smsparameventhandler4 () {return new series dadataeventhandler (); } / ** * smsparameventhandler5 * * @return seriesdataeventhandler * / @bean public series dadataeventhandler smsparameventhandler5 () {return new series dadataeventhandler (); }}тест
// Inject Series DadataEventqueueHelper Produce Produce @Autowired Private Series DadataEventQueueHelper Series DadataEventqueueHelper; @Requestmapping (value = "/data", method = requestMethod.post, производители = mediaType.application_json_value) public dataResponsevo <string> receiverDevyedata (@Requestbody String string) {long startTime1 = System.CurrentTimeMillis (); if (stringUtils.isempty (devinedata)) {logger.info ("Данные приемника пусты!"); вернуть новый dataResponsevo <string> (400, "не удастся"); } series dadataeventqueuehelper.publishevent (new seriesdata (depecedata)); long startTime2 = System.currentTimeMillis (); logger.info ("Данные приемника == [{}] millionsecond == [{}]", devedicationata, startTime2 - startTime1); вернуть новый dataResponsevo <string> (200, "успех"); } Приложение A отправляет данные в приложение B через интерфейс /data, а затем отправляет сообщение в очередь Disruptor через серию DadataEventqueueHelper. Потребители потребляют это. Весь процесс не будет блокировать приложение A. Сообщение потеряно. Вы можете расширить серию DadataEventqueueHelper, чтобы контролировать очередь на разрушитель.
Выше всего содержание этой статьи. Я надеюсь, что это будет полезно для каждого обучения, и я надеюсь, что все будут поддерживать Wulin.com больше.