Não apresentarei muito o disruptor e descreverei o cenário de negócios atual. Os dois aplicativos A, B e A Pass dados para o aplicativo B. A transmissão de dados é relativamente rápida. Se você usar o HTTP para pressionar diretamente os dados e entrar no banco de dados, a eficiência não será alta. Pode levar a uma maior pressão sobre a aplicação de A. O uso do MQ é muito pesado, então o disruptor é selecionado. Você também pode usar o reator
BasequeueHelper.java
/*** Lmax.Disruptor Modelo de processamento de filas eficiente. Suporta filas iniciais, isto é, publicando antes de init (). * * O thread é realmente iniciado quando o init () é chamado e iniciado por processamento. O sistema sai e limpa automaticamente os recursos. * * @Author Xielongwang * @create 2018-01-18 15:49 * @Email [email protected] * @Description */public class BaseQueueHelper <D, e estende a STILPRAPPER <D>, HUNDERNDLING <E> {/** * * Lista <ShustQueueHelper> QueueHelPerlist = new ArrayList <SaseQueueHelper> (); / *** Disruptor Object*/ Disruptor privado <E> disruptor; / ** * RingBuffer */ RingBuffer privado <E> RingBuffer; / ** * initQueue */ private List <D> initQueue = new ArrayList <D> (); / *** Tamanho da fila** @return Comprimento da fila, deve ser uma potência de 2*/ abstrato protegido int getQueUsize (); / ** * Factory de eventos * * @return EventFactory */ Protected Abstract EventFactory <E> EventFactory (); / * * / *** Inicialização*/ public void init () {threadFactory nomeadoThreadFactory = new ThreadFactoryBuilder (). SetNameFormat ("DisruptHorthReadpool"). Build (); disruptor = novo disruptor <e> (eventFactory (), getQueUeSize (), chamadothreadFactory, ProducerType.single, getStrategy ()); disruptor.setDefaultExceptionHandler (new MyHandleRexception ()); Disruptor.HandleEventsWithWorkerpool (Gethandler ()); ringbuffer = disruptor.start (); // Inicialize a publicação de dados para (Dados d: initQueue) {ringbuffer.publishEvent (novo evento de eventTranslatoronearg <e, d> () {@Override public void translatEto (e evento, sequência longa, dados d) {event.setValue (data);}}, dados); } // Adicione o gancho de limpeza de recursos sincronizado (QueueHelPerlist) {if (QueueHelPerlist.ISEMPTY ()) {RUNTIME.GETRUNTIME (). AddShutDownHook (novo ThreadHelper: @Override public void RUN) {for BaseHelperHelperHelper: baseQueueHelper.shutdown (); } QueueHelPerlist.add (this); }} /*** Se você deseja alterar a prioridade de execução do encadeamento, substitua esta política. A SurtingWaitStrategy aumentará a resposta e ocupará mais de 70% da CPU quando ociosa. * Use a SlemewaitStrategy com cautela reduzirá a resposta e reduzirá o uso da CPU e é usado em toras e outros cenários. * * @return waitstrategy */ abstrature protegido waitstrategy getStrategy (); /*** Insira mensagens da fila, suporta inserção de filas antes do objeto init e será publicado na fila imediatamente quando a fila for estabelecida. */ public sincronizado void PublishEvent (D dados) {if (ringbuffer == null) {initQueue.add (dados); retornar; } ringbuffer.publishEvent (novo eventTranslatorOnearg <e, d> () {@Override public void tradlateTo (Evento e, sequência longa, d dados) {event.setValue (dados);}}, dados); } / *** Feche a fila* / public void Shutdown () {disruptor.shutdown (); }}
EventFactory.java
/** * @author xielongwang * @create 2018-01-18 18:24 * @email [email protected] * @Description */public classFactory implementa {@OverRide public publicAvent.EventFactory <sérieDataEVent> {@Override public public }}
MyHandlerexception.java
classe pública MyHandleRexception implementa excepção -Handler {private logger logger = LoggerFactory.getLogger (MyHandlerexception.class); / * * (não-Javadoc) Exceção que ocorre durante a operação * * @see * com.lmax.disruptor.exceptionHandler#handleEventException (java.lang.Throwable *, long, java.lang.object) */ @Override public void (Throwable Ex, long sequence, object). Logger.error ("Processa Data Error Sequence == [{}] event == [{}], ex == [{}]", sequência, event.toString (), ex.getMessage ()); } / * * (não-javadoc) Exceção na startup * * @see * com.lmax.disruptor.exceptionHandler#handleOnstarTexception (java.lang. * throwable) * / @Override public void handleOnstexception (throwable ex) {logger.error (" } / * * (não-javadoc) Exceção quando fechado * * @see * com.lmax.disruptor.exceptionHandler#handleOnshutdownException (java.lang * .TH) * / @Override Void handleOnshutDownsception (throwable ex) {Logger.error ("Shutdown Dishrutor Error == [{Logger.Estride (" Shutdown Dishrutor Error == [{) (") /", "Shutrown Dishrutor ErrorOng. }}SérieData.java (representa a mensagem enviada ao App B pelo aplicativo a)
classe pública sérieData {private String deviceInfostr; public SeriesData () {} public SeriesData (String DeviceInfostr) {this.DeviceInfostr = DeviceInfostr; } public string getDeviceInfostr () {return deviceInfostr; } public void setDeviceInfostr (String deviceInfostr) {this.DeviceInfostr = deviceInfostr; } @Override public String tostring () {return "SeriesData {" + "deviceInfostr = '" + deviceInfostr +'/'' + '}'; }}SérieDataEvent.java
classe pública sérieDataEvent estende ValueWrapper <SIDADATA> {}SérieDataEventHandler.java
classe pública sérieDataEventHandler implementa WorkHandler <SIDADATAEVENT> {private Logger Logger = LoggerFactory.getLogger (sérieDataEventHandler.class); @AUTOWIRED PRIVADO DEVEDITO PRIVADO DEVEDIDADO DEVEDIVOS DEVEMERVIDO; @Override public void Onevent (Evento da SérieDataEvent) {if (event.getValue () == null || stringUtils.isEmpty (event.getValue (). GetDeviceInfostr ())) {logger.warn ("Dados da série de receptores estão vazios!"); } // Processamento de negócios DeviceInfoservice.processdata (event.getValue (). GetDeviceInfostr ()); }}SérieDataEventQueHelper.java
@ComponentPublic Class SeriesDataEventQueHelper estende BaseQueueHelper <SérieData, SérieDataEvent, SérieDataEventHandler> implementa InitializandoBean {private Static final int queue_size = 1024; @Autowired Private List <SermDataEventHandler> SérieDataEventHandler; @Override Protected int getQueUsize () {return fileue_size; } @Override Protected com.lmax.disruptor.eventFactory EventFactory () {return new EventFactory (); } @Override Protected WorkHandler [] gethandler () {int size = sérieDataEventHandler.size (); SérieDataEventHandler [] parameventHandlers = (sérieDataEventHandler []) seriDataEventHandler.ToArray (nova sérieDataEventHandler [size]); retornar paramventhandlers; } @Override Protected waitStrategy getStrategy () {return new BlockingWaitStrategy (); // retorna new houringwaitstrategy (); } @Override public void depoisPropertiEsset () lança exceção {this.init (); }}ValueWrapper.java
classe public abstrata ValueWrapper <T> {Valor T privado; public valueWrapper () {} public valueWrapper (T valor) {this.value = value; } public t getValue () {return value; } public void SetValue (valor t) {this.value = value; }}DisruptorConfig.java
@Configuration @componentsCan (value = {"com.portal.disruptor"}) // várias instâncias de vários consumidores classe pública disruptorConfig {/ ** * smsparameventHandler1 * * @return seriDataDaeventHler */ @Bean Public SeriesDataEVentHandHandLeAndLeventHandler1 () @Bean SeriesDataEventHandAndArtHandLeventHandler1 () @Bean SeriesDataEventHandAndArtHandLeventHandler1 () @Bean SeriesDataEVentHandAndArtHandLeventHandler1 (@Bean SeriesDataDandHandandAndHandLeventHandler1 () } / ** * SMSPARAMENTHANDLER2 * * @return SeriesDataEventHandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHANDLER2 () {retorna new SeriesDataEventHandler (); } / ** * SMSPARAMENTHANDLER3 * * @return SeriesDataEventHandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHANDLER3 () {return New SeriesDataEventHandler (); } / ** * SMSPARAMENTHANDLER4 * * @RETURN SERVIDADEDATAEVENTHANDLER * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHANDLER4 () {Return New SeriesDataEventHandler (); } / ** * smsparameventHandler5 * * @RETURN SérieDataEventHandler * / @Bean Public SeriesDataEventHandler SMSPARAMEVENTHANDLER5 () {return New SeriesDataEventHandler (); }}teste
// Injetar o produtor de mensagens do SérieDataEventQueHelper @AUTOWIRED SERVIDADE PRIVADO DATAEVENTQUEHELPER SERIEDATAEVENTQUEEHELPER; @RequestMapping (Value = "/Data", Method = requestMethod.Post, Produces = MediaType.Application_Json_Value) public DataSponseVoEVo <String> ReceiverDevicedata (@RequestBody String devicedata) {Long starttime1 = System.currenttimEmillis (); if (stringUtils.isEmpty (devicedata)) {Logger.info ("Os dados do receptor estão vazios!"); retornar novos dados DataSponseVo <String> (400, "falhou"); } sérieDataEventQueHelper.publishEvent (New SeriesData (devicedata)); long startTime2 = System.currenttimemillis (); Logger.info ("Dados do receptor == [{}] MOHILionsecond == [{}]", devicedata, startTime2 - startTime1); Retornar novos Dataresponsevo <String> (200, "Sucesso"); } O aplicativo A envia dados para o aplicativo B através da interface /Data e envia a mensagem para a fila do disruptor por meio da sérieDataEventqueueHelper. Os consumidores o consomem. Todo o processo não bloqueará o aplicativo A. A mensagem é perdida. Você pode estender a sérieDataEventQueHelper para monitorar a fila do disruptor.
O exposto acima é todo o conteúdo deste artigo. Espero que seja útil para o aprendizado de todos e espero que todos apoiem mais o wulin.com.