A simple implementation of message middleware MQ

1. Message processing center class:
   this class is the basic class for processing message logic. If the message received from MQ client is CONSUME, it is consumption message; if the message received from MQ client is not CONSUME, it is production message.

package com.richinfo.demo;
/**
 * Message processing center class
 * 
* Created by WangXJ
* 2019-02-20 10:57
*/

import java.util.concurrent.ArrayBlockingQueue;

public class Broker {
	
	// Maximum number of messages stored in the queue
	private final static int MAX_SIZE = 3;
	
	// Container for message data
	private static ArrayBlockingQueue<String> messageQueue =
			new ArrayBlockingQueue<>(MAX_SIZE);
	
	// Production news
	public static void produce(String msg) {
		if (messageQueue.offer(msg)) {
			System.out.println("Message successfully delivered to message processing center: " + msg + ", current"
					+ "The number of messages staged is: " +messageQueue.size());
		} else {
			System.out.println("The message temporarily held in the message processing center reaches the maximum load, Cannot continue to put message!");
		}
		
		System.out.println("======================================");
	}
	
	// Consumer News
	public static String consume() {
		String msg = messageQueue.poll();
		if (msg != null) {
			// If the consumption condition is met, take a message from the message container
			System.out.println("Message consumed: " + msg + ",The number of messages currently staged is: " + 
					messageQueue.size());
		} else {
			System.out.println("There are no messages to consume in the message processing center!");
		}
		
		System.out.println("======================================");
		
		return msg;
	}
		
}

2. External services of message processing center:
   this class implements the Runnable interface, circularly listens to the service port (9999 here), and operates the message queue (production message or consumption message) through the received message type (CONSUME or non CONSUME).

package com.richinfo.demo;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * External service of message processing center
 * 
* Created by WangXJ
* 2019-02-20 11:21
*/
public class BrokerServer implements Runnable{
	
	// Listening service port number
	public static int SERVICE_PORT = 9999;
	
	private final Socket socket;
	
	public BrokerServer(Socket socket) {
		this.socket = socket;
	}

	@Override
	public void run() {
		try (
			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			PrintWriter out = new PrintWriter(socket.getOutputStream());
		){
			while (true) {
				String str = in.readLine();
				if (str == null) {
					continue;
				}
				System.out.println("Raw data received: " + str);
				
				if (str.equals("CONSUME")) { // CONSUME means to CONSUME a message
					// Consume a message from the message queue
					String message = Broker.consume();
					out.println(message);
					out.flush();
				} else {
					// In other cases, production messages are put on the message queue
					Broker.produce(str);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}

	public static void main(String[] args) throws IOException {
		ServerSocket server = new ServerSocket(SERVICE_PORT);
		while (true) {
			BrokerServer brokerServer = new BrokerServer(server.accept());
			new Thread(brokerServer).start();
		}
	}
}

3. MQ client class:
   simulate MQ client to provide services (production message, consumption message) for producer class and consumer class.

package com.richinfo.demo;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;

/**
 * MQ Client
 * 
* Created by WangXJ
* 2019-02-20 14:19
*/
public class MqClient {

	// Production news
	public static void produce(String message) throws Exception {
		Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
		try (
			PrintWriter out = new PrintWriter(socket.getOutputStream());
		) {
			out.println(message);
			out.flush();
		}
	}
	
	// Consumer News
	public static String consume() throws Exception {
		Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
		try (
			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			PrintWriter out = new PrintWriter(socket.getOutputStream());
		) {
			// Send string "CONSUME" to message queue first to indicate consumption
			out.println("CONSUME");
			out.flush();
			// Get another message from the message queue
			String message = in.readLine();
			
			return message;
		}
	}
}

4. Producers:
   this class is used to call MQ client production messages and store them in message queue.

package com.richinfo.demo;
/**
 * Producer
 * 
* Created by WangXJ
* 2019-02-20 14:59
*/
public class ProduceClient {

	public static void main(String[] args) throws Exception {
		MqClient client = new MqClient();
		client.produce("Hello Garlic King");
	}

}

5. Consumers:
   this class is used to call the MQ client to consume messages in the message queue.

package com.richinfo.demo;
/**
 * Consumer
 * 
* Created by WangXJ
* 2019-02-20 15:10
*/
public class ConsumeClient {

	public static void main(String[] args) throws Exception {
		MqClient client = new MqClient();
		String message = client.consume();
		System.out.println("The message obtained is: " + message);
	}

}

Conclusion:
                              .

Keywords: socket Java

Added by fire_cracker on Tue, 10 Dec 2019 08:59:55 +0200