Problem Description: after the rabbitmq message is received in the project, a series of processing is carried out first. After all processing is completed, the message is pushed to the foreground. However, in the process of processing the message, each method has the code to interact with the database, which directly causes the message to be pushed out of time.
Single thread code model:
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class MqHandler implements MessageListener { //Message processing public void onMessage(Message msg) { //Processing A dealA(msg); //Processing B dealB(msg); //Processing C dealC(msg); //Processing D dealD(msg); //Processing E dealE(msg); //Processing F dealF(msg); } public void dealA(Message msg){} public void dealB(Message msg){} public void dealC(Message msg){} public void dealD(Message msg){} public void dealE(Message msg){} public void dealF(Message msg){} }
Single thread processing diagram:
Solution: adopt the way of multithreading, each thread deals with one or more logic, improve CPU utilization, optimize message response time.
Multithreading diagram:
code implementation
1: Configure spring thread pool
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- Number of core threads --> <property name="corePoolSize" value="10" /> <!-- Maximum threads --> <property name="maxPoolSize" value="50" /> <!-- Maximum queue length >=mainExecutor.maxSize --> <property name="queueCapacity" value="1000" /> <!-- Idle time allowed by thread pool to maintain threads --> <property name="keepAliveSeconds" value="300" /> <!-- Task rejected by thread pool(No threads available)Handling strategy of --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean>
2: Thread class
ABProcess: give the logic in method A and method B to this thread for processing
import java.util.ArrayList; import java.util.List; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; public class ABProcess implements Runnable { @Autowired private ThreadPoolTaskExecutor taskExecutor; private List<Message> ABList = new ArrayList<Message>(); // Initialization method: start thread public void init() { taskExecutor.execute(this); } // How to add messages public void addList(Message msg) { synchronized (this) { ABList.add(msg); // Because ABList is a shared variable, the main thread calls this method to add, and the sub thread is deleted. There is thread safety problem, so synchronization is needed } } @Override public void run() { while (true) { if (ABList.size() > 0) { // Process if there is a message dealA(ABList.get(0)); dealB(ABList.get(0)); synchronized (this) { ABList.remove(0); // Delete after processing } System.out.println("dealABSuccess"); } try { Thread.sleep(10);// Thread dormancy } catch (InterruptedException e) { e.printStackTrace(); } } } public void dealA(Message msg) { } public void dealB(Message msg) { } }
spring configures the Bean and initializes the init method
<bean id="ABProcess" class="com.thread.ABProcess" init-method="init"/>
CDProcess: give the logic of method C and method D to this thread for processing. The specific implementation is consistent with ABProcess
3: Modify the main thread MqHandler logic
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.beans.factory.annotation.Autowired; public class MqHandler implements MessageListener { @Autowired private ABProcess acProcess; @Autowired private CDProcess cdProcess; //Message processing public void onMessage(Message msg) { acProcess.addList(msg);//The main thread adds the message to the collection and submits it to the sub thread ABProcess for processing cdProcess.addList(msg);//The main thread adds the message to the collection and submits it to the child thread CDProcess for processing //The logic code of E and F is simple and directly handed to the main thread dealE(msg); dealF(msg); } public void dealE(Message msg){} public void dealF(Message msg){} }