나는 Disruptor를 너무 많이 소개하지 않고 현재 비즈니스 시나리오를 설명하지 않을 것입니다. 두 응용 프로그램 A, B 및 Application B 로의 패스 데이터. 데이터 전송은 비교적 빠릅니다. HTTP를 사용하여 데이터를 직접 푸시 한 다음 데이터베이스를 입력하면 효율이 높지 않습니다. A의 응용 프로그램에 더 큰 압력을 초래할 수 있습니다. MQ를 사용하는 것이 너무 무겁기 때문에 방해물이 선택됩니다. 원자로를 사용할 수도 있습니다
BasequeueHelper.java
/*** lmax.disruptor 효율적인 대기열 처리 템플릿. 초기 대기열, 즉 init () 전에 게시를 지원합니다. * * init ()가 호출되고 처리를 시작할 때 스레드가 실제로 시작됩니다. 시스템은 자원을 종료하여 자동으로 정리합니다. * * @Author Xielongwang * @Create 2018-01-18 3:49 PM * @email [email protected] * @description */public acplact class basequeuehelper <d, e는 valuewrapper <d>, h alughandler <e >> {/** * extrends alluge <e >> {/** * reclud uniformly uniformly, 그리고 정리하십시오. List <asequeuehelper> QueueHelperlist = new ArrayList <Basequeueuhelper> (); / *** irruptor object*/ private druptor <e> hruptor; / ** * Ringbuffer */ Private Ringbuffer <E> RingBuffer; / ** * initqueue */ private list <d> initqueue = new arraylist <d> (); / *** 대기열 크기** @return 큐 길이는 2*/ 보호 된 추상 int getqueuesize ()의 전력이어야합니다. / ** * 이벤트 팩토리 * * @return eventFactory */ 보호 된 초록 eventFactory <E> eventFactory (); / ** * 이벤트 소비자 * * @return workHandler [] */ Protected Abstract WorkHandler [] gethandler (); / *** 초기화*/ public void init () {ThreadFactory namedThreadFactory = new ThreadFactoryBuilder (). setNameFormat ( "ripruptorthreadpool"). build (); ripruptor = new Druptor <e> (eventFactory (), getqueuesize (), nellthreadFactory, producerType.single, getStrategy ()); ripruptor.setDefaulteXceptionHandler (new myHandlerException ()); isruptor.handleeventswithworkerpool (gethandler ()); ringbuffer = ripruptor.start (); // (d data : initqueue) {ringbuffer.publishevent (new eventtranslatoronearg <e, d> () {@override public void transeto (e event, long sequence, d data) {event.setValue (data);}, data); } // 리소스 청소 후크 동기화 (QueueHelperList) {if (queueHelperList.isempty ()) {runtime.getRuntime (). addShutdownHook (new Thread () {@OverRide public void Run () {for (basequeHelperqueueHelper : QueuHelperList) BasequeueHelper.shutdown (); } queueHelperList.Add (this); }} /*** 스레드 실행 우선 순위를 변경하려면이 정책을 무시하십시오. 수율은 유휴 상태에서 반응을 증가시키고 CPU의 70% 이상을 차지할 것입니다. * Sleefittwaitstrategy를주의해서 사용하면 응답이 줄어들고 CPU 사용량을 줄이고 로그 및 기타 시나리오에 사용됩니다. * * @return waitstrategy */ 보호 된 초록 Waitstrategy getStrategy (); /*** 대기열 메시지를 삽입하고 객체 시작하기 전에 대기열 삽입을 지원하며 대기열이 설정되면 즉시 대기열에 게시됩니다. */ public synchronized void publishevent (d data) {if (ringbuffer == null) {initqueue.add (data); 반품; } ringbuffer.publishevent (new EventTransLatorOnearg <e, d> () {@override public void transeto (e event, long sequence, d data) {event.setValue (data);}}, data); } / *** 닫기* / public void shutdown () {ripruptor.shutdown (); }}
eventFactory.java
/** * @Author Xielongwang * @Create 2018-01-18 6:24 PM * @Email [email protected] * @descript */public class eventFactory empless com.lmax.disprator.eventactory <seciredataevent> {@override publicevent NewInstance () {NewDataEvent () {) {) {); }}
myHandlerException.java
공개 클래스 MyHandlerException은 Exception Handler {private logger = loggerfactory.getLogger (myHandlerException.class)를 구현합니다. / * * (Javadoc) 예외 * * @see * com.lmax.disruptor.exceptionhandler#handleEventException (java.lang.throwable *, long, java.lang.object) */ @override public void handleEventException (throwable ex, long sequence, object event) {ex.printstacktrace (); logger.error ( "프로세스 데이터 오류 시퀀스 == [{}] event == [{}], ex == [{}]", 시퀀스, event.toString (), ex.getMessage ()); } / * * (비 Javadoc) 시작시 예외 * * @see * com.lmax.disruptor.exceptor.exceptionhandler#handleonstartexception (java.lang. * trashable) * / @override public void handleonstartexception (trownable ex) {logger.error ( "start disruptor error == [}]) } / * * (비 Javadoc) 닫을 때의 예외 * * * @see * com.lmax.disruptor.exceptor.exceptionhandler#handleOnshutdownException (java.lang * .throwable) * / @override public void handleOnshutdownException (Throwable ex) {logger.error ( "shutdown errrur == [})); }}SeriesData.java (응용 프로그램 a에 의해 App B에 전송 된 메시지를 나타냅니다)
public class seriesdata {private String deviceinfostr; public seriesdata () {} public seriesdata (String DeviceInfostr) {this.deviceinfostr = deviceinfostr; } public String getDeviceInfostr () {return deviceInfoStr; } public void setDeviceInfostr (String DeviceInfOStr) {this.deviceInfostr = DeviceInfOstr; } @override public String toString () {return "seriesdata {" + "deviceInfostr = '" + deviceInfostr +'/'' + '}'; }}SeriesDataEvent.java
Public Class SeriesDataEvent는 ValueWrapper <SearyData> {}을 확장합니다.SeriesDataEventhandler.java
public class seriesdataeventhandler는 WorkHandler <SearchDataEvent>를 구현합니다. @autowired private deviceinfoservice deviceinfoservice; @override public void onevent (seriesdataevent event) {if (event.getValue () == null || stringUtils.isempty (event.getValue (). getDeviceInfostr ())) {logger.warn ( "수신기 시리즈 데이터가 비어 있습니다!"); } // 비즈니스 프로세싱 DeviceInfoservice.processData (event.getValue (). getDeviceInfostr ()); }}SeriesDataEventqueueHelper.java
@ComponentPublic Class SeriesDataEventqueUeHelper 확장 BasequeUeHelper <SeriesData, SeriesDataEvent, SeriesDataEventhandler> impintess 초기화 {private static final queue_size = 1024; @autowired private list <seriesdataeventhandler> seriesdataeventhandler; @override protected int getqueuesize () {return queue_size; } @override protected com.lmax.disruptor.eventFactory eventFactory () {return new EventFactory (); } @override Protected WorkHandler [] gethandler () {int size = seriesdataeventhandler.size (); SeriesDataEventhandler [] ParameVenthandlers = (SeriesDataEventhandler []) SeriesDataEventhandler.toArray (New SeriesDataEventhandler [size]); Parameventhandlers를 반환합니다. } @override protected waitstrategy getstrategy () {return new blockingwaitstrategy (); // 새로운 수율을 반환합니다 .waitstrategy (); } @override public void approperTiesset ()는 예외 {this.init (); }}valuewrapper.java
공개 초록 클래스 ValueWrapper <t> {private t value; public valuewrapper () {} public valuewrapper (t value) {this.value = value; } public t getValue () {return 값; } public void setValue (t value) {this.value = value; }}Disruptorconfig.java
@configuration @componentscan (value = { "com.portal.disruptor"}) // 여러 소비자의 여러 인스턴스 공개 클래스 DisruptorConfig {/ ** * smsparameventhandler1 * * @return seriesdataeventhandler */ @bean publicdataeventhandler smsparameventhandler1 () {return newdataeventhandler (); } / ** * smsparameventhandler2 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler2 () {return new seriesdataeventhandler (); } / ** * smsparameventhandler3 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler3 () {return new seriesdataeventhandler (); } / ** * smsparameventhandler4 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler4 () {return new seriesdataeventhandler (); } / ** * smsparameventhandler5 * * @return seriesdataeventhandler * / @bean public seriesdataeventhandler smsparameventhandler5 () {return new seriesdataeventhandler (); }}시험
// 시리즈를 주입합니다. @requestmapping (value = "/data", method = requestmethod.post, produces = mediaType.application_json_value) public dataresponsevo <string> receiverdevedata (@requestbody string devicedata) {long starttime1 = system.currenttimemillis (); if (stringUtils.isempty (devicedata)) {logger.info ( "수신기 데이터가 비어 있습니다!"); 새로운 DataresponseVo <string> (400, "실패")을 반환합니다. } SeriesDataEventqueueHelper.publishevent (New SeriesData (devicedata)); Long StartTime2 = System.CurrentTimeMillis (); logger.info ( "수신기 데이터 == [{}] millionecond == [{}]", devicedata, startTime2 -StartTime1); 새로운 DataresponseVo <string> (200, "성공")을 반환합니다. } 응용 프로그램 A는 /데이터 인터페이스를 통해 응용 프로그램 B로 데이터를 전송 한 다음 SeriesDataEventqueUeHelper를 통해 메시지를 Disruptor 큐로 보냅니다. 소비자는 그것을 소비합니다. 전체 프로세스는 응용 프로그램 A를 차단하지 않습니다. 메시지가 손실됩니다. SeriesDataEventqueueHelper를 확장하여 Druptor 큐를 모니터링 할 수 있습니다.
위는이 기사의 모든 내용입니다. 모든 사람의 학습에 도움이되기를 바랍니다. 모든 사람이 wulin.com을 더 지원하기를 바랍니다.