The message service adopts the publishing subscriber mode. The message producer sends the message to the message service center, and the message service center copies and forwards the message to the message consumer. From the perspective of process, it can be regarded as two stages of interaction, one is from producer to message center, and the other is from message center to consumer.
Take the business message of delegation order creation as an example, let's implement these two processes.
When the message client receiver has been implemented previously, we can access the following address when the client and server complete login authentication,
http://localhost:10001/event?event=lms.transportbill.consignmentbill.create&id=WT202101210005
It can simulate the business system and push a commission order creation request to our message client. The event code is LMS transportbill. consignmentbill. Create, the order number is WT202101210005
Producer pushes message to message center
The producer is actually a message client, and the message center is a message server.
After receiving the call from the business system, the producer will find the corresponding message sender according to the event code, because the main logic is implemented in the parent class, which greatly simplifies the development of business functions. Here, you only need to set the message subject.
/** * Delegation order creation sender * @author wqliu * @date 2022-1-18 11:02 **/ public class ConsignmentBillCreateSender extends RequestMessageSender { public ConsignmentBillCreateSender(){ super("lms.transportbill.consignmentbill.create"); } }
Build a request message according to the event code and business document ID and send it to the message center.
{"content":"WT202101210005","id":"1489152492813930497","messageType":"REQUEST","publishAppCode":"SCS","publishTime":"2022-02-03 16:23:16","sendCount":0,"topic":"lms.transportbill.consignmentbill.create"}
[
](http://localhost:10001/event?event=lms.transportbill.consignmentbill.create&id=WT202101210005)
When the message center receives the message, it determines that it is a request message through the messageType attribute, and then finds the corresponding message processor according to the topic attribute. Similarly, it only needs to inherit the parent class here.
/** * Delegation order creation request processor * @author wqliu * @date 2022-1-21 19:19 **/ @Slf4j public class ConsignmentBillCreateRequestHandler extends RequestMessageHandler { @Override protected void messageOperation(RequestMessage message, Channel channel) { } }
The overall processing flow of the message is defined and implemented in the parent class.
/** * Message processing * * @param message news * @param channel passageway */ public void handleMessage(RequestMessage requestMessage, Channel channel) { // Log message requests apiMessageLogService.createRequestPart(requestMessage); //Validation framework validateFramework(requestMessage); //The request message status is set to no need to send by default apiMessageLogService.updateStatus(MessageStatusEnum.NOT_TO_REQUEST.name(),requestMessage.getId()); //Special treatment messageOperation(requestMessage, channel); //Send response to message sender sendResponse(requestMessage, channel); //Message processing (copy and forward) if(isNeedRepost()){ repostMessage(requestMessage.getTopic(),requestMessage.getContent()); } }
The implementation of sending a response to the message sender is as follows
private void sendResponse(RequestMessage requestMessage, Channel channel) { //Gets the message subject of the response message String responseTopicCode = getResponseTopicCode(requestMessage.getTopic()); //Build the sender according to the message subject ResponseMessageSender responseMessageSender = (ResponseMessageSender)MessageSenderFactory.createSender(responseTopicCode); //send message responseMessageSender.sendMessage(channel, requestMessage); }
For most business messages, if there are no special circumstances, reply a confirmation message uniformly to inform the client that the message has been successfully received, with the subject of framework message. confirm
/** * Send a response to the requester with a message acknowledgement * @author wqliu * @date 2021-10-14 10:38 **/ public class MessageConfirmResponseSender extends ResponseMessageSender { public MessageConfirmResponseSender() { super("framework.message.confirm.response"); } }
The complete message is as follows:
{"id":"1489152495347277825","messageType":"RESPONSE","publishAppCode":"MessageServer","publishTime":"2022-02-03 16:23:16","requestMessageId":"1489152492813930497","result":"SUCCESS","topic":"framework.message.confirm.response"}
After receiving the above message, the client determines that it is a response message according to the messageType attribute, and then finds the corresponding message processor according to the topic attribute
/** * Message acknowledgement response processor * @author wqliu * @date 2022-1-16 8:53 **/ @Slf4j public class MessageConfirmResponseHandler extends ResponseMessageHandler { }
This processor only needs to inherit the parent class. The actual work to be completed is to verify the data and update the log.
Message center pushes messages to consumers
In addition to receiving the business message pushed by the producer, on the one hand, the message center needs to push a response message of message confirmation to the producer; On the other hand, you need to find all consumers who subscribe to the topic (actually the message client) according to the message topic, and copy and forward the message.
In this case, the actual message center acts as a broker between the producers and consumers of the message. Producers and consumers only need to interact with the message center without knowing the existence of each other, that is, to realize the decoupling between producers and consumers.
Next, let's implement the process of pushing business messages from the message center to consumers.
The following methods are the process of the message center processing request messages. The previous steps include logging, verifying the framework, setting the message status, personalized processing and sending responses of the processor. The protagonist of this section is the repostMessage method.
/** * Message processing * * @param message news * @param channel passageway */ public void handleMessage(RequestMessage requestMessage, Channel channel) { // Log message requests apiMessageLogService.createRequestPart(requestMessage); //Validation framework validateFramework(requestMessage); //The request message status is set to no need to send by default apiMessageLogService.updateStatus(MessageStatusEnum.NOT_TO_REQUEST.name(),requestMessage.getId()); //Special treatment messageOperation(requestMessage, channel); //Send response to message sender sendResponse(requestMessage, channel); //Message processing (copy and forward) if(isNeedRepost()){ repostMessage(requestMessage.getTopic(),requestMessage.getContent()); } }
The content of the repostMessage method is as follows:
/** * Message forwarding * @param topic * @param content */ private void repostMessage(String topic,String content) { //Build the sender according to the message subject RequestMessageSender requestMessageSender = (RequestMessageSender) MessageSenderFactory.createSender(topic); //The message ID and message content of the incoming original request requestMessageSender.sendMessage(content); }
It is relatively simple to find the message client of the subscription topic and build the forwarding message logic, which is implemented in the message sender.
There may be multiple message clients subscribing to a message subject, which can be divided into two cases. One is that these message clients can receive all messages, that is, they do not need to control data permissions. For example, they send all the transportation Mo creation messages to the service provider for in transit tracking; For example, we can only send the delegated messages to a specific carrier when we have created the delegated message.
The implementation of the delegation order creation sender is as follows:
/** * Delegation order creation sender * @author wqliu * @date 2022-2-1 8:14 **/ public class ConsignmentBillCreateRequestSender extends RequestMessageSender { public ConsignmentBillCreateRequestSender() { super("lms.transportbill.consignmentbill.create"); } @Override protected boolean dataPermissionFilter(String content, String appCode) { //Get business document ID String id = content; //Get the carrier code of the business document through api call, which is simulated as 001 here String carrierCode="001"; //Find the list of carrier data roles owned by the current application List<ApiDataPermission> list = apiDataPermissionService.getPermissionByRoleCode(DataRoleEnum.CARRIER.name(), appCode); AtomicBoolean hasPermission= new AtomicBoolean(false); list.stream().forEach(x->{ //If the business code of the data permission record is consistent with the document code, or wildcards are used, you have permission if(x.getBusinessCode().equals(carrierCode) || x.getBusinessCode().equals(DATA_PERMISSION_ALL)){ hasPermission.set(true); return; } }); return hasPermission.get(); } }
When consumers receive the message pushed by the service center, they use the following processor to process it
/** * Delegation order creation request message processor * @author wqliu * @date 2022-1-22 10:20 **/ @Slf4j public class ConsignmentBillCreateRequestHandler extends RequestMessageHandler { @Override protected void messageOperation(RequestMessage message, Channel channel) { // Conduct business processing } }
Its parent class will send a message confirmation response message to the message center to notify the message center that the business message has been received.
After receiving this message, the message center updates the message log.
/** * Message acknowledgement response processor * @author wqliu * @date 2022-1-16 8:53 **/ @Slf4j public class MessageConfirmResponseHandler extends ResponseMessageHandler { }
The above is the overall processing flow. The message center sends business messages to consumers. Consumers conduct business processing and feed back a response message of message confirmation. After receiving it, the message center updates the status of the message log to complete the closed loop.