When using spring boot + rabbitmq, you may want to temporarily disable/enable listening, or modify the number of listening consumers during development. If you restart each time, it is a waste of time, so I have studied to enable deactivate listening or modify some configurations without shutting down.
1. About the configuration of rabbitmq listening
@Configuration@ConditionalOnClass({ RabbitTemplate.class, Channel.class })@EnableConfigurationProperties(RabbitProperties.class)@Import(RabbitAnnotationDrivenConfiguration.class)public class RabbitAutoConfiguration { ...} RabbitAnnotationDrivenConfiguration mainly focuses on monitoring the configuration of the factory and monitoring the factory, but here is just creating beans, and there is no real initialization.
Through the bean class name in the configuration, analyze it. Rabbitmq's listening must be created by the listening factory, so find the listening factory SimpleRabbitListenerContainerFactory
@Bean@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory;} Since there is no initialized listening in the automatic configuration, it should be called in other places. Entering the listening factory class, I found that there is the initializeContainer(SimpleMessageListenerContainer instance) method. I guess that initialization must be related to this method, so I check which places are called, so I found that the RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory) method has the code to create the listening container and initialization.
/** * Create and start a new {@link MessageListenerContainer} using the specified factory. * @param endpoint the endpoint to create a {@link MessageListenerContainer}. * @param factory the {@link RabbitListenerContainerFactory} to use. * @return the {@link MessageListenerContainer}. */protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) { MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); if (listenerContainer instance of InitializingBean) { try { ((InitializingBean) listenerContainer).afterPropertiesSet(); } catch (Exception ex) { throw new BeanInitializationException("Failed to initialize message listener container", ex); } } int containerPhase = listenerContainer.getPhase(); if (containerPhase < Integer.MAX_VALUE) { // a custom phase value if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + this.phase + " vs " + containerPhase); } this.phase = listenerContainer.getPhase(); } return listenerContainer;}Continue to find the place to call this method. After finding the RabbitListenerEndpointRegistrar.afterPropertiesSet() method, I found that there are many places to call it.
Let's take a look at the afterPropertiesSet method, which is in the InitializingBean interface. It is guessed that it should be the bean initialization method that will be called after the spring container creates the bean, so find where the RabbitListenerEndpointRegistrar was created. It turns out that it is a private property in the RabbitListenerAnnotationBeanPostProcessor, and the RabbitListenerAnnotationBeanPostProcessor is initialized in the automatic configuration of RabbitBootstrapConfiguration, so this finds the source of the initialized listening of rabbitmq
2. Dynamic management of rabbitmq monitoring
Going back to the initial question, I want to dynamically enable the listening of disable mq, so I first look at the initial configuration class. Since there is initialization, there may be relevant management. So I found the start() and stop() methods in the RabbitListenerEndpointRegistry, which operates on the listening container. The main source code is as follows
/** * @return the managed {@link MessageListenerContainer} instance(s). */public Collection<MessageListenerContainer> getListenerContainers() { return Collections.unmodifiableCollection(this.listenerContainers.values());} @Overridepublic void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); }}/** * Start the specified {@link MessageListenerContainer} if it should be started * on startup or when start is called explicitly after startup. * @see MessageListenerContainer#isAutoStartup() */private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { listenerContainer.start(); }}@Overridepublic void stop() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { listenerContainer.stop(); }} Write a controller, inject RabbitListenerEndpointRegistry, and enable and disable the listening operation using start() and stop(). The RabbitListenerEndpointRegistry instance can also obtain the listening container and modify some parameters of the listening, such as the number of consumers. The code is as follows:
import java.util.Set;import javax.annotation.Resource;import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.itopener.framework.ResultMap;/** * Created by fuwei.deng on July 24, 2017. */@RestController@RequestMapping("rabbitmq/listener")public class RabbitMQController { @Resource private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @RequestMapping("stop") public ResultMap stop(){ rabbitListenerEndpointRegistry.stop(); return ResultMap.buildSuccess(); } @RequestMapping("start") public ResultMap start(){ rabbitListenerEndpointRegistry.start(); return ResultMap.buildSuccess(); } @RequestMapping("setup") public ResultMap setup(int consumer, int maxConsumer){ Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds(); SimpleMessageListenerContainer container = null; for(String id: containerIds){ container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id); if(container != null){ container.setConcurrentConsumers(consumer); container.setMaxConcurrentConsumers(maxConsumer); } } return ResultMap.buildSuccess(); }}The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.