I will not introduce the disruptor too much, and describe the current business scenario. The two applications A, B, and A pass data to the application B. The data transmission is relatively fast. If you use http to directly push the data and then enter the database, the efficiency is not high. It may lead to greater pressure on A's application. Using mq is too heavy, so disruptor is selected. You can also use Reactor
BaseQueueHelper.java
/** * lmax.disruptor Efficient queue processing template. Supports initial queues, that is, publishing before init(). * * The thread is actually started when init() is called and started processing. The system exits and automatically cleans up resources. * * @author xielongwang * @create 2018-01-18 3:49 pm * @email [email protected] * @description */public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> { /** * Record all queues, and clean up resources uniformly when the system exits*/ private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>(); /** * Disruptor object*/ private Disruptor<E> disruptor; /** * RingBuffer */ private RingBuffer<E> ringBuffer; /** * initQueue */ private List<D> initQueue = new ArrayList<D>(); /** * Queue size* * @return Queue length, must be a power of 2*/ protected abstract int getQueueSize(); /** * Event factory* * @return EventFactory */ protected abstract EventFactory<E> eventFactory(); /** * Event Consumer* * @return WorkHandler[] */ protected abstract WorkHandler[] getHandler(); /** * Initialization*/ public void init() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build(); disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy()); disruptor.setDefaultExceptionHandler(new MyHandlerException()); disruptor.handleEventsWithWorkerPool(getHandler()); ringBuffer = disruptor.start(); //Initialize data publishing for (D data : initQueue) { ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } //Add resource cleaning hook synchronized (queueHelperList) { if (queueHelperList.isEmpty()) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (BaseQueueHelper baseQueueHelper : queueHelperList) { baseQueueHelper.shutdown(); } } }); } queueHelperList.add(this); } } /** * If you want to change the thread execution priority, override this policy. YieldingWaitStrategy will increase the response and occupy more than 70% of the CPU when idle. * Use SleepingWaitStrategy with caution will reduce the response and reduce CPU usage, and is used in logs and other scenarios. * * @return WaitStrategy */ protected abstract WaitStrategy getStrategy(); /** * Insert queue messages, support inserting queues before object init, and will be published to the queue immediately when the queue is established. */ public synchronized void publishEvent(D data) { if (ringBuffer == null) { initQueue.add(data); return; } ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } /** * Close queue*/ public void shutdown() { disruptor.shutdown(); }}
EventFactory.java
/** * @author xielongwang * @create 2018-01-18 6:24 pm * @email [email protected] * @description */public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> { @Override public SeriesDataEvent newInstance() { return new SeriesDataEvent(); }}
MyHandlerException.java
public class MyHandlerException implements ExceptionHandler { private Logger logger = LoggerFactory.getLogger(MyHandlerException.class); /* * (non-Javadoc) Exception that occurs during operation* * @see * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable * , long, java.lang.Object) */ @Override public void handleEventException(Throwable ex, long sequence, Object event) { ex.printStackTrace(); logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage()); } /* * (non-Javadoc) Exception at startup* * @see * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang. * Throwable) */ @Override public void handleOnStartException(Throwable ex) { logger.error("start disruptor error ==[{}]!", ex.getMessage()); } /* * (non-Javadoc) Exception when closed* * @see * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang * .Throwable) */ @Override public void handleOnShutdownException(Throwable ex) { logger.error("shutdown disruptor error ==[{}]!", ex.getMessage()); }}SeriesData.java (represents the message sent to App B by Application A)
public class SeriesData { private String deviceInfoStr; public SeriesData() { } public SeriesData(String deviceInfoStr) { this.deviceInfoStr = deviceInfoStr; } public String getDeviceInfoStr() { return deviceInfoStr; } public void setDeviceInfoStr(String deviceInfoStr) { this.deviceInfoStr = deviceInfoStr; } @Override public String toString() { return "SeriesData{" + "deviceInfoStr='" + deviceInfoStr + '/'' + '}'; }}SeriesDataEvent.java
public class SeriesDataEvent extends ValueWrapper<SeriesData> {}SeriesDataEventHandler.java
public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> { private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class); @Autowired private DeviceInfoService deviceInfoService; @Override public void onEvent(SeriesDataEvent event) { if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) { logger.warn("receiver series data is empty!"); } //Business processing deviceInfoService.processData(event.getValue().getDeviceInfoStr()); }}SeriesDataEventQueueHelper.java
@Componentpublic class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean { private static final int QUEUE_SIZE = 1024; @Autowired private List<SeriesDataEventHandler> seriesDataEventHandler; @Override protected int getQueueSize() { return QUEUE_SIZE; } @Override protected com.lmax.disruptor.EventFactory eventFactory() { return new EventFactory(); } @Override protected WorkHandler[] getHandler() { int size = seriesDataEventHandler.size(); SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]); return paramEventHandlers; } @Override protected WaitStrategy getStrategy() { return new BlockingWaitStrategy(); //return new YieldingWaitStrategy(); } @Override public void afterPropertiesSet() throws Exception { this.init(); }}ValueWrapper.java
public abstract class ValueWrapper<T> { private T value; public ValueWrapper() {} public ValueWrapper(T value) { this.value = value; } public T getValue() { return value; } public void setValue(T value) { this.value = value; }}DisruptorConfig.java
@Configuration@ComponentScan(value = {"com.portal.disruptor"})// Multiple instances of several consumers public class DisruptorConfig { /** * smsParamEventHandler1 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler1() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler2 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler2() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler3 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler3() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler4 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler4() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler5 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler5() { return new SeriesDataEventHandler(); }}test
//Inject SeriesDataEventQueueHelper message producer @Autowired private SeriesDataEventQueueHelper seriesDataEventQueueHelper; @RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE) public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) { long startTime1 = System.currentTimeMillis(); if (StringUtils.isEmpty(deviceData)) { logger.info("receiver data is empty !"); return new DataResponseVo<String>(400, "failed"); } seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData)); long startTime2 = System.currentTimeMillis(); logger.info("receiver data ==[{}] millionsecond ==[{}]", deviceData, startTime2 - startTime1); return new DataResponseVo<String>(200, "success"); } Application A sends data to Application B through the /data interface, and then sends the message to the disruptor queue through seriesDataEventQueueHelper. Consumers consume it. The entire process will not block the application A. The message is lost. You can extend the SeriesDataEventQueueHelper to monitor the disruptor queue.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.