From the perspective of source code, how to solve the problem of consumers stopping consumption after RabbitMQ is restarted

Some time ago, the RabbitMQ broker server kept a backlog of messages due to a queue. In the early morning, the O & M expanded the capacity of the mq server machine pod and restarted RabbitMQ. Then in the morning, it found that its service had been reporting exceptions and stopped consuming after mq was restarted, which affected the operation of the business, Although the mq restart was successful, the consumer did not reconnect successfully. This section will analyze the source code of spring rabbit to analyze the causes of problems and solutions.

catalogue

1, Problems arising

2, Spring rabbit consumption source code analysis

3, Solve the problem of consumers stopping consumption

1, Problems arising

First, let's see what exceptions are reported. Here we have selected some main exception stacks and posted them

o.s.a.r.l.BlockingQueueConsumer - Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException
Failed to declare queue(s):[work_queue]

...............................................

Consumer received fatal=false exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.handleDeclarationException(BlockingQueueConsumer.java:661)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:601)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:581)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1196)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1041)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[work_queue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:710)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:594)
    ... 4 common frames omitted
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52)
    at sun.reflect.GeneratedMethodAccessor175.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1110)
    at com.sun.proxy.$Proxy285.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:689)
    ... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
    ... 13 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597)
    ... 1 common frames omitted

...........................

o.s.a.r.l.SimpleMessageListenerContainer  message:  Stopping container from aborted consumer

Let's pick out several main error messages:

  1. Failed to declare queue(s):[work_queue]
  2. Consumer received fatal=false exception on startup org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
  3. Stopping container from aborted consumer

2, Spring rabbit consumption source code analysis

Open the source code of spring rabbit and find that the Consumer received fatal=false exception on startup exception is in the run() method of AsyncMessageProcessingConsumer, a subclass of SimpleMessageListenerContainer class.

The following is a brief analysis of the consumer source code process of spring rabbit:

Spring rabbit will create a SimpleMessageListenerContainer object for each consumer (possibly consuming one or more queues). SimpleMessageListenerContainer inherits AsyncMessageProcessingConsumer, which implements the start() method of the Lifecycle interface. The start() method will be called when starting.

The start() method of AsyncMessageProcessingConsumer is as follows:

	/**
	 * Start this container.
	 * @see #doStart
	 */
	@Override
	public void start() {
		if (isRunning()) {
			return;
		}
		if (!this.initialized) {
			synchronized (this.lifecycleMonitor) {
				if (!this.initialized) {
					afterPropertiesSet();
				}
			}
		}
		try {
			if (logger.isDebugEnabled()) {
				logger.debug("Starting Rabbit listener container.");
			}
			configureAdminIfNeeded();
			checkMismatchedQueues();
			doStart();
		}
		catch (Exception ex) {
			throw convertRabbitAccessException(ex);
		}
	}

The start() method calls the doStart() method

/**
 * Start this container, and notify all invoker tasks.
 */
protected void doStart() {
	// Reschedule paused tasks, if any.
	synchronized (this.lifecycleMonitor) {
		this.active = true;
		this.running = true;
		this.lifecycleMonitor.notifyAll();
	}
}

In fact, the doStart method in the subclass SimpleMessageListenerContainer is called,

 /**
 * Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer
 * to this container's task executor.
 */
@Override
protected void doStart() {
	checkListenerContainerAware();
	super.doStart();
	synchronized (this.consumersMonitor) {
		if (this.consumers != null) {
			throw new IllegalStateException("A stopped container should not have consumers");
		}
		int newConsumers = initializeConsumers();
		if (this.consumers == null) {
			logger.info("Consumers were initialized and then cleared " +
					"(presumably the container was stopped concurrently)");
			return;
		}
		if (newConsumers <= 0) {
			if (logger.isInfoEnabled()) {
				logger.info("Consumers are already running");
			}
			return;
		}
		Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
		for (BlockingQueueConsumer consumer : this.consumers) {
			AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
			processors.add(processor);
			getTaskExecutor().execute(processor);
			if (getApplicationEventPublisher() != null) {
				getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
			}
		}
		waitForConsumersToStart(processors);
	}
}

The initializeConsumers method creates the corresponding number of consumers according to the number of consumer threads configured by us. The actual consumption logic is in BlockingQueueConsumer.

Then loop through the BlockingQueueConsumer collection and wrap each BlockingQueueConsumer to create an AsyncMessageProcessingConsumer (which implements the Runnable interface).         

        getTaskExecutor().execute(processor) gets the thread pool to execute the created thread task, and then publishes an AsyncConsumerStartedEvent event.       

protected int initializeConsumers() {
	int count = 0;
	synchronized (this.consumersMonitor) {
		if (this.consumers == null) {
			this.cancellationLock.reset();
			this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
			for (int i = 0; i < this.concurrentConsumers; i++) {
				BlockingQueueConsumer consumer = createBlockingQueueConsumer();
				this.consumers.add(consumer);
				count++;
			}
		}
	}
	return count;
}

Let's take a look at the core logic of consumption, that is, the run method implemented by AsyncMessageProcessingConsumer:

We can see that there is a while loop in the try catch code block and a mainLoop() loop. The mainLoop method is the logic of pulling messages. We can see that there are many exceptions in catch. A boolean variable abort is defined above, which defaults to false. In some exceptions caught, abort is changed to true. The value of abort directly determines the processing logic in the killOrRestart() method below.

Look at the catch logic of the QueuesNotAvailableException thrown earlier, change the aborted to true, and call the publishConsumerFailedEvent method.

catch (QueuesNotAvailableException ex) {
	logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);
	if (isMissingQueuesFatal()) {
		this.startupException = ex;
		// Fatal, but no point re-throwing, so just abort.
		aborted = true;
	}
	publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
}

About (true at this time) is passed in the publishConsumerFailedEvent method, that is, the value of fatal is true, and a ListenerContainerConsumerFailedEvent event (true at this time) is added to abortEvents (BlockingQueue).

private final BlockingQueue<ListenerContainerConsumerFailedEvent> abortEvents = new LinkedBlockingQueue<>();
......
protected void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) {
	if (!fatal || !isRunning()) {
		super.publishConsumerFailedEvent(reason, fatal, t);
	}
	else {
		try {
			this.abortEvents.put(new ListenerContainerConsumerFailedEvent(this, reason, t, fatal));
		}
		catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}
}

killOrRestart() method: notes are written in key places. It can be seen that when the value of aborted is true, the listener containerconsumerfailedevent event will be obtained from the blocking queue abortEvents, and the event will be broadcast; If aborted is false, the restart method will be called to restart the consumer container.

Note: here, when the above exception in the previous catch is changed to true or the consumer is closed, the consumer container will not restart automatically. It only publishes a ListenerContainerConsumerFailedEvent broadcast event. In other cases, the consumer will restart automatically.

Our processing logic for consumers to stop consumption can start with the listener containerconsumerfailedevent broadcast event.

private void killOrRestart(boolean aborted) {
	//Judge whether the consumer is closed | aborted==true
	if (!isActive(this.consumer) || aborted) {
		logger.debug("Cancelling " + this.consumer);
		try {
			this.consumer.stop();
			SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
			if (getApplicationEventPublisher() != null) {
				getApplicationEventPublisher().publishEvent(
						new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
			}
		}
		catch (AmqpException e) {
			logger.info("Could not cancel message consumer", e);
		}
		if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort
				.compareAndSet(null, Thread.currentThread())) {
			logger.error("Stopping container from aborted consumer");
			stop();
			SimpleMessageListenerContainer.this.containerStoppingForAbort.set(null);
			ListenerContainerConsumerFailedEvent event = null;
			do {
				try {
					//Get the ListenerContainerConsumerFailedEvent event from the blocking queue abortEvents
					event = SimpleMessageListenerContainer.this.abortEvents.poll(ABORT_EVENT_WAIT_SECONDS,
							TimeUnit.SECONDS);
					if (event != null) {
						//If the ListenerContainerConsumerFailedEvent is not empty, publish and broadcast the event
						SimpleMessageListenerContainer.this.publishConsumerFailedEvent(
								event.getReason(), event.isFatal(), event.getThrowable());
					}
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
			while (event != null);
		}
	}
	else {
		logger.info("Restarting " + this.consumer);
		//Call the restart method to restart the consumer container
		restart(this.consumer);
	}
}

The restart method will recreate the consumer, publish the consumer restart event asyncconsumerrestartevent, and execute the AsyncMessageProcessingConsumer task through the thread pool.

private void restart(BlockingQueueConsumer oldConsumer) {
	BlockingQueueConsumer consumer = oldConsumer;
	synchronized (this.consumersMonitor) {
		if (this.consumers != null) {
			try {
				// Need to recycle the channel in this consumer
				consumer.stop();
				// Ensure consumer counts are correct (another is going
				// to start because of the exception, but
				// we haven't counted down yet)
				this.cancellationLock.release(consumer);
				this.consumers.remove(consumer);
				if (!isActive()) {
					// Do not restart - container is stopping
					return;
				}
				//Recreate consumer BlockingQueueConsumer
				BlockingQueueConsumer newConsumer = createBlockingQueueConsumer();
				newConsumer.setBackOffExecution(consumer.getBackOffExecution());
				consumer = newConsumer;
				this.consumers.add(consumer);
				if (getApplicationEventPublisher() != null) {
					//Publish consumer restart event asyncconsumerrestartevent
					getApplicationEventPublisher()
							.publishEvent(new AsyncConsumerRestartedEvent(this, oldConsumer, newConsumer));
				}
			}
			catch (RuntimeException e) {
				logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage());
				// Re-throw and have it logged properly by the caller.
				throw e;
			}
			//AsyncMessageProcessingConsumer executes tasks asynchronously through the thread pool
			getTaskExecutor()
					.execute(new AsyncMessageProcessingConsumer(consumer));
		}
	}
}

3, Solve the problem of consumers stopping consumption

Through the above source code analysis, we know that when some exceptions occur in consumption (such as QueuesNotAvailableException), a ListenerContainerConsumerFailedEvent event will be published. We can listen to this event and restart the consumer container.

Events related to RabbitMQ in spring are subclasses of AmqpEvent

By publishing events, spring can notify observers (i.e. event listeners) of some consumer behaviors. Consumer related events are as follows:

  • AsyncConsumerStartedEvent: a new consumer start event
  • Asyncconsumersstoppedevent: a consumer stop event
  • Asyncconsumerrestartevent: a consumer restart event
  • ListenerContainerConsumerFailedEvent: an event in which message listener consumption fails

We can listen to the ListenerContainerConsumerFailedEvent event. Its definition is as follows: there is an attribute fatal, which we mentioned above. When fatal is true, it means that the consumer has a fatal error. At this time, the consumer will not automatically retry and restart. We need to restart in the event processing logic. When fatal is false, we can ignore this event and start again automatically in the consumer container.

public class ListenerContainerConsumerFailedEvent extends AmqpEvent {
	private final String reason;
	private final boolean fatal;
	private final Throwable throwable;
}

Processing logic code: when judging that the fatal of event is true, first judge whether the container is running. If not, call start to start, and then send alarm information.

import java.util.Arrays;
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
 
    @Override
    public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        log.error("Consumer failure event:{}", event);
        if (event.isFatal()) {
            log.error("Stopping container from aborted consumer. Reason::{}", event.getReason(), event.getThrowable());
            SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
            String queueNames = Arrays.toString(container.getQueueNames());
            try {
                try {
                    Thread.sleep(30000);
                } catch (Exception e) {
                    log.error(e.getMessage());
                }
                //Determine whether the consumer container is running at this time
                Assert.state(!container.isRunning(), String.format("Listening container%s Running!", container));
                //The consumer container is not started while it is running
                container.start();
                log.info("Restart queue{}Successful listening", queueNames);
            } catch (Exception e) {
                log.error("Restart queue{}Failed to listen for", queueNames, e);
            }
            // TODO SMS / email / nail Alarm, including queue information, listening to the disconnection reason, abnormal information during disconnection, whether the restart is successful, etc
        }
    }
}

Reference  https://blog.csdn.net/u011424653/article/details/79824538

Keywords: RabbitMQ

Added by Tucker1337 on Tue, 18 Jan 2022 20:11:12 +0200