Je ne présenterai pas trop le perturbateur et je décrirai le scénario commercial actuel. Les deux applications A, B et A PASS Données à l'application B. La transmission des données est relativement rapide. Si vous utilisez HTTP pour pousser directement les données, puis entrez la base de données, l'efficacité n'est pas élevée. Cela peut entraîner une plus grande pression sur l'application de A. L'utilisation de MQ est trop lourde, donc le perturbateur est sélectionné. Vous pouvez également utiliser Reactor
Basequeuehelper.java
/ ** * lmax.dispusseur de traitement de file d'attente efficace. Prend en charge les files d'attente initiales, c'est-à-dire la publication avant init (). * * Le thread est en fait démarré lorsque init () est appelé et a commencé le traitement. Le système sort et nettoie automatiquement les ressources. * * @author xielongwang * @create 2018-01-18 3:49 PM * @email [email protected] * @description * / public abstrait class Basequeuehelper <D, e étend la valeur de valeur <d>, h étend un travail de travail <e>> List <baseQueueHelper> queueHelperList = new ArrayList <SebaseueHelper> (); / ** * Objet de perturbateur * / Disrupteur privé <E> Disrupteur; / ** * Ringbuffer * / private ringbuffer <e> ringbuffer; / ** * InitQueue * / Liste privée <d> InitQueue = new ArrayList <D> (); / ** * taille de file d'attente * * @return Longueur de file d'attente, doit être une puissance de 2 * / abstract protégé int getQueuesize (); / ** * Event Factory * * @return EventFactory * / Abstract Protected EventFactory <e> eventFactory (); / ** * Event Consumer * * @return WorkHandler [] * / Abstract Protected WorkHandler [] Gethandler (); / ** * Initialisation * / public void init () {ThreadFactory NamedThreadFactory = new ThreadFactoryBuilder (). SetNameFormat ("DisrupTorthReadpool"). Build (); Disruptor = new Disruptor <e> (eventFactory (), getQueueSize (), nomméThreadFactory, producerType.Single, getStrategy ()); Disruptor.SetDefaultExceptionHandler (new MyHandleRexception ()); Disruptor.Handleevents withworkerpool (Gethandler ()); Ringbuffer = Disruptor.Start (); // initialiser la publication de données pour (d data: initQueue) {ringbuffer.publisheVent (new EventTranslatorOnearg <e, d> () {@Override public void translateto (e event, long séquence, d data) {event.SetValue (data);}}, data); } // Ajouter un crochet de nettoyage des ressources synchronisé (queueHelperList) {if (queuehelperlist.isempty ()) {runtime.getRuntime (). AddShutdownHook (new Thread () {@Override public Void run () {pour (BaseQueueHelper BaseEuehelper: queueHelperList) }); } queueHelperList.add (this); }} / ** * Si vous souhaitez modifier la priorité d'exécution du thread, remplacez cette politique. Le rendement de la mesure augmentera la réponse et occupera plus de 70% du CPU au ralenti. * Utiliser SleepwaitStrategy avec prudence réduira la réponse et réduira l'utilisation du processeur, et est utilisée dans les journaux et autres scénarios. * * @return waitStrategy * / abstrait protégé waitStrategy getStrategy (); / ** * Insérer des messages de file d'attente, prendre en charge l'insertion des files d'attente avant l'objet init et sera publié immédiatement dans la file d'attente lorsque la file d'attente sera établie. * / public synchronisé void PublisheVent (d data) {if (ringBuffer == null) {initqueue.add (data); retour; } RingBuffer.PublisheVent (Nouveau EventTranslatorOneArg <e, D> () {@Override public void Translateto (E Event, Long Sequence, D Data) {event.SetValue (data);}}, data); } / ** * Fermer la file d'attente * / public void shutdown () {Disruptor.shutdown (); }}
EventFactory.java
/ ** * @Author Xielongwang * @create 2018-01-18 6:24 PM * @email [email protected] * @description * / classe publique EventFactory implémente com.lmax.disruptor.eventfactory <SeriesDataEvent> {@Override SeriesDataEvent NewInstance () {Return SeriesDataDat;); }}
MyHandleRexception.java
classe publique MyHandleRexception implémente exceptionHandler {private logger logger = loggerfactory.getLogger (myhandlerexception.class); / * * (non javadoc) Exception qui se produit pendant l'opération * * @see * com.lmax.disruptor.ExceptionHandler # handleEventException (java.lang.throwable *, long, java.lang.object) * / @Override public Void handleEventException (Throwable Ex, Long Sequence, objet événement) {Ex.PrintStackTrace (); Logger.Error ("Process Data Error Sequence == [{}] event == [{}], ex == [{}]", Sequence, event.ToString (), ex.getMessage ()); } / * * (non javadoc) Exception au démarrage * * @see * com.lmax.disruptor.exceptionhandler # handleonstartexception (java.lang. * Throwable) * / @Override public void handleonstartexception (Throwable ex) {Logger.error ("start Disruptor Error == [{}]!", ex.getMessage ()); } / * * (non javadoc) Exception lorsqu'elle est fermée * * @see * com.lmax.disruptor.exceptionhandler # handleonshutdownException (java.lang * .throwable) * / @Override public void handleOnShutDownException (Throwable ex) {Logger.error ("Nowdown Disrupor Error == [{}]!", ex.gessage ()); }}Sériedata.java (représente le message envoyé à l'application B par l'application a)
Classe publique SeriesData {Private String DeviceInfostr; public SeriesData () {} public SeriesData (String DeviceInfostr) {this.deviceInfostr = DeviceInfostr; } public String getDeviceInfostr () {return DeviceInfostr; } public void setDeviceInfostr (String DeviceInfostr) {this.deviceInfostr = DeviceInfostr; } @Override public String toString () {return "seriesData {" + "DeviceInfostr = '" + DeviceInfostr +' / '' + '}'; }}Sériedataevent.java
Classe publique SeriesDataEvent étend ValueWrapper <SeriesData> {}Sériedataeventhandler.java
classe publique SeriesDataEventHandler implémente WorkHandler <ReriesDataEvent> {private logger logger = loggerfactory.getLogger (sérieDataEventHandler.class); @Autowired Private DeviceInfoservice DeviceInfoService; @Override public void onevent (SeriesDataEvent Event) {if (event.getValue () == null || stringUtils.Isempty (event.getValue (). GetDeviceInfostr ())) {Logger.Warn ("Les données de la série de récepteurs sont vides!"); } // Business Processing DeviceInfoservice.ProcessData (event.getValue (). GetDeviceInfostr ()); }}SérieDataeventqueuehelper.java
@ComponentPublic Class SeriesDataEventQueueHelper étend BaseQueueHelper <SeriesData, SeriesDataEvent, SeriesDataEventHandler> implémente InitializationBean {private static final int queue_size = 1024; @Autowired Private List <SeriesDataEventHandler> SérieDataEventHandler; @Override Protected int getQueuesize () {return queue_size; } @Override Protected com.lmax.disruptor.eventfactory eventFactory () {return new EventFactory (); } @Override Protected WorkHandler [] Gethandler () {int size = seriesDataEventHandler.size (); SeriesDataEventHandler [] ParameventHandlers = (SeriesDataEventHandler []) SeriesDataEventHandler.toArray (new SeriesDataEventHandler [Size]); retourner parameventHandlers; } @Override Protected waitStrategy getStrategy () {return new BlockingWaitStrategy (); // Renvoie un nouveau rendementwaitStrategy (); } @Override public void afterpropertiesset () lève une exception {this.init (); }}Valuewrapper.java
Classe abstraite publique ValueWrapper <T> {Valeur T privée; public ValueWrapper () {} public ValueWrapper (t Value) {this.value = value; } public t getValue () {return Value; } public void setValue (t valeur) {this.value = value; }}Perturberconfig.java
@ Configuration @ ComponentsCan (value = {"com.portal.disruptor"}) // plusieurs instances de plusieurs consommateurs classe publique DisruptorConfig {/ ** * smsParameventHandler1 * * @return seriesdataeventHandler * / @bean public SeriesDataEventHandler;); } / ** * smsParameventHandler2 * * @return seriesdataeventHandler * / @bean public SeriesDataEventHandler smsParameventHandler2 () {return new SeriesDataEventHandler (); } / ** * smsParameventHandler3 * * @return seriesdataeventHandler * / @bean public SeriesDataEventHandler smsParameventHandler3 () {return new SeriesDataEventHandler (); } / ** * smsParameventHandler4 * * @return seriesdataeventHandler * / @bean public SeriesDataEventHandler smsParameventHandler4 () {return new SeriesDataEventHandler (); } / ** * smsParameventHandler5 * * @return seriesdataeventHandler * / @bean public SeriesDataEventHandler smsParameventHandler5 () {return new SeriesDataEventHandler (); }}test
// Inject SeriesDataEventQueueHelper Message Producteur @Autowired Private SeriesDataEventQueuehelper SeriesDataEventQueueHelper; @RequestMapping (value = "/ data", méthode = requestMethod.post, products = mediatype.application_json_value) public dataResponsevo <string> receiverDevicedata (@Requestbody String Devichedata) {Long startTime1 = System.currentImMillis (); if (StringUtils.Isempty (Devichedata)) {logger.info ("Les données du récepteur sont vides!"); return new DataResponsevo <string> (400, "échoué"); } seriesdataeventqueuehelper.publishevent (new SeriesData (devisedata)); Long startTime2 = System.CurrentTimemillis (); Logger.info ("Data du récepteur == [{}] MillionsEnond == [{}]", DevichedAdAnd, startTime2 - startTime1); renvoyer un nouveau dataResponsevo <string> (200, «succès»); } L'application A envoie des données à l'application B via l'interface / données, puis envoie le message à la file d'attente de perturbateur via SeriesDataEventQueueHelper. Les consommateurs le consomment. L'ensemble du processus ne bloquera pas l'application A. Le message est perdu. Vous pouvez étendre la série DataEventqueueHelper pour surveiller la file d'attente de perturbateur.
Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.