Multithreading large number of rabbitmq messages

Problem Description: after the rabbitmq message is received in the project, a series of processing is carried out first. After all processing is completed, the message is pushed to the foreground. However, in the process of processing the message, each method has the code to interact with the database, which directly causes the message to be pushed out of time.

 

Single thread code model:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MqHandler implements MessageListener {

	//Message processing
	public void onMessage(Message msg) {
		//Processing A
		dealA(msg);
		//Processing B
		dealB(msg);
		//Processing C
		dealC(msg);
		//Processing D
		dealD(msg);
		//Processing E
		dealE(msg);
		//Processing F
		dealF(msg);
		
	}
	public void dealA(Message msg){}
	public void dealB(Message msg){}
	public void dealC(Message msg){}
	public void dealD(Message msg){}
	public void dealE(Message msg){}
	public void dealF(Message msg){}
}

 

Single thread processing diagram:

 

Solution: adopt the way of multithreading, each thread deals with one or more logic, improve CPU utilization, optimize message response time.

Multithreading diagram:

 

code implementation

1: Configure spring thread pool

<bean id="taskExecutor"
		class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
	<!-- Number of core threads -->
	<property name="corePoolSize" value="10" />
	<!-- Maximum threads -->
	<property name="maxPoolSize" value="50" />
	<!-- Maximum queue length >=mainExecutor.maxSize -->
	<property name="queueCapacity" value="1000" />
	<!-- Idle time allowed by thread pool to maintain threads -->
	<property name="keepAliveSeconds" value="300" />
	<!-- Task rejected by thread pool(No threads available)Handling strategy of -->
	<property name="rejectedExecutionHandler">
		<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
	</property>
</bean>

 

2: Thread class

ABProcess: give the logic in method A and method B to this thread for processing

import java.util.ArrayList;
import java.util.List;

import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class ABProcess implements Runnable {

	@Autowired
	private ThreadPoolTaskExecutor taskExecutor;

	private List<Message> ABList = new ArrayList<Message>();

	// Initialization method: start thread
	public void init() {
		taskExecutor.execute(this);
	}

	// How to add messages
	public void addList(Message msg) {
		synchronized (this) {
			ABList.add(msg); // Because ABList is a shared variable, the main thread calls this method to add, and the sub thread is deleted. There is thread safety problem, so synchronization is needed
		}
	}

	@Override
	public void run() {
		while (true) {

			if (ABList.size() > 0) { // Process if there is a message
				dealA(ABList.get(0));
				dealB(ABList.get(0));
				synchronized (this) {
					ABList.remove(0); // Delete after processing
				}
				System.out.println("dealABSuccess");
			}

			try {
				Thread.sleep(10);// Thread dormancy
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}

	public void dealA(Message msg) {
	}

	public void dealB(Message msg) {
	}

}

spring configures the Bean and initializes the init method

<bean id="ABProcess" class="com.thread.ABProcess" init-method="init"/>

 

CDProcess: give the logic of method C and method D to this thread for processing. The specific implementation is consistent with ABProcess

 

3: Modify the main thread MqHandler logic

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;

public class MqHandler implements MessageListener {
	
	@Autowired
	private ABProcess acProcess;
	
	@Autowired
	private CDProcess cdProcess;
	
	//Message processing
	public void onMessage(Message msg) {
		
		acProcess.addList(msg);//The main thread adds the message to the collection and submits it to the sub thread ABProcess for processing
		
		cdProcess.addList(msg);//The main thread adds the message to the collection and submits it to the child thread CDProcess for processing
		
		//The logic code of E and F is simple and directly handed to the main thread
		dealE(msg);
		dealF(msg);
	}
	public void dealE(Message msg){}
	public void dealF(Message msg){}
}

 

 

Keywords: Java Spring RabbitMQ Database

Added by nrsh_ram on Thu, 02 Jan 2020 06:08:56 +0200