1. Message processing center class:
this class is the basic class for processing message logic. If the message received from MQ client is CONSUME, it is consumption message; if the message received from MQ client is not CONSUME, it is production message.
package com.richinfo.demo; /** * Message processing center class * * Created by WangXJ * 2019-02-20 10:57 */ import java.util.concurrent.ArrayBlockingQueue; public class Broker { // Maximum number of messages stored in the queue private final static int MAX_SIZE = 3; // Container for message data private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE); // Production news public static void produce(String msg) { if (messageQueue.offer(msg)) { System.out.println("Message successfully delivered to message processing center: " + msg + ", current" + "The number of messages staged is: " +messageQueue.size()); } else { System.out.println("The message temporarily held in the message processing center reaches the maximum load, Cannot continue to put message!"); } System.out.println("======================================"); } // Consumer News public static String consume() { String msg = messageQueue.poll(); if (msg != null) { // If the consumption condition is met, take a message from the message container System.out.println("Message consumed: " + msg + ",The number of messages currently staged is: " + messageQueue.size()); } else { System.out.println("There are no messages to consume in the message processing center!"); } System.out.println("======================================"); return msg; } }
2. External services of message processing center:
this class implements the Runnable interface, circularly listens to the service port (9999 here), and operates the message queue (production message or consumption message) through the received message type (CONSUME or non CONSUME).
package com.richinfo.demo; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; /** * External service of message processing center * * Created by WangXJ * 2019-02-20 11:21 */ public class BrokerServer implements Runnable{ // Listening service port number public static int SERVICE_PORT = 9999; private final Socket socket; public BrokerServer(Socket socket) { this.socket = socket; } @Override public void run() { try ( BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()); ){ while (true) { String str = in.readLine(); if (str == null) { continue; } System.out.println("Raw data received: " + str); if (str.equals("CONSUME")) { // CONSUME means to CONSUME a message // Consume a message from the message queue String message = Broker.consume(); out.println(message); out.flush(); } else { // In other cases, production messages are put on the message queue Broker.produce(str); } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(SERVICE_PORT); while (true) { BrokerServer brokerServer = new BrokerServer(server.accept()); new Thread(brokerServer).start(); } } }
3. MQ client class:
simulate MQ client to provide services (production message, consumption message) for producer class and consumer class.
package com.richinfo.demo; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; /** * MQ Client * * Created by WangXJ * 2019-02-20 14:19 */ public class MqClient { // Production news public static void produce(String message) throws Exception { Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT); try ( PrintWriter out = new PrintWriter(socket.getOutputStream()); ) { out.println(message); out.flush(); } } // Consumer News public static String consume() throws Exception { Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT); try ( BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()); ) { // Send string "CONSUME" to message queue first to indicate consumption out.println("CONSUME"); out.flush(); // Get another message from the message queue String message = in.readLine(); return message; } } }
4. Producers:
this class is used to call MQ client production messages and store them in message queue.
package com.richinfo.demo; /** * Producer * * Created by WangXJ * 2019-02-20 14:59 */ public class ProduceClient { public static void main(String[] args) throws Exception { MqClient client = new MqClient(); client.produce("Hello Garlic King"); } }
5. Consumers:
this class is used to call the MQ client to consume messages in the message queue.
package com.richinfo.demo; /** * Consumer * * Created by WangXJ * 2019-02-20 15:10 */ public class ConsumeClient { public static void main(String[] args) throws Exception { MqClient client = new MqClient(); String message = client.consume(); System.out.println("The message obtained is: " + message); } }
Conclusion:
.