Analysis of Reactor network programming model

preface

Reactor design pattern is a common pattern for processing concurrent I/O. it sends client requests to different processors to improve the efficiency of event processing. In the most common application scenario, java NIO, users process network requests using asynchronous non blocking IO. Including the netty framework, the model is used. As for the definition of asynchrony and non blocking, please see my article below

Layer 7 protocol and TCP, UDP, Http and Nio analysis of network programming foundation

Reactor model overview

The reason for using the Reactor model mainly comes from using NIO only, which can only solve the multi connection problem and is non blocking. However, after the connection is established, the processing of each connection is only single thread and synchronous processing; How to achieve asynchronous processing, the Reactor model is introduced here, and NIO is combined with threads to quickly process tasks.

Scalable written in doug lea   IO   in   In Java, this book analyzes and builds scalable high-performance IO services, introduces the event driven model, and describes how nio can be combined with threads to improve the processing power of programs.  

Scalable IO in Java extract code 9aru

Next, take a part of the graph and analyze the Reactor model

 

It includes multiple clients, sends them to the server, and uses the Reactor to allocate the Dispatch   To deal with

Reactor under single thread model

 

In the single threaded model:

The reactor thread client sends the client request to the acceptor for processing, and the acceptor assigns the new client connection to the handler; Each handler is processed.

This is a design pattern achieved by the selector selector in the jdk to process business logic through each channel; The logical implementation here

public class SelectorDemo {
	public static void main(String[] args) throws IOException {
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);

		// Create selector
		Selector selector = Selector.open();

		// Register selector
		ssc.register(selector, SelectionKey.OP_ACCEPT);// You need to set the default event here
		ssc.socket().bind(new InetSocketAddress(8080));// Binding port
		while (true) {
			int readyChannels = selector.select();// Will block until an event triggers

			if (readyChannels == 0)
				continue;

			Set<SelectionKey> selectedKeys = selector.selectedKeys();// Gets the collection of triggered events

			Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
			while (keyIterator.hasNext()) {

				SelectionKey key = keyIterator.next();

				if (key.isAcceptable()) {
					SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
					socketChannel.configureBlocking(false);
					socketChannel.register(selector, SelectionKey.OP_READ);
					// ServerSocketChannel has received a new connection, which can only act on ServerSocketChannel

				} else if (key.isConnectable()) {
					// Connecting to a remote server takes effect only when the client connects asynchronously

				} else if (key.isReadable()) {
					// There is data in SocketChannel that can be read
					ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
					SocketChannel socketChannel = (SocketChannel) key.channel();
					while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
						// In the case of long connection, it is necessary to manually judge whether the data reading has ended (a simple judgment is made here: if it exceeds 0 bytes, the request is considered to have ended)
						if (requestBuffer.position() > 0)
							break;
					}
					requestBuffer.flip();
					byte[] content = new byte[requestBuffer.limit()];
					requestBuffer.get(content);
					System.out.println(new String(content));
					socketChannel.register(selector, SelectionKey.OP_WRITE);
				} else if (key.isWritable()) {
					SocketChannel socketChannel = (SocketChannel) key.channel();
					// SocketChannel can start writing data
					ByteBuffer buffer = ByteBuffer.wrap("sucess".getBytes());
					socketChannel.write(buffer);
					socketChannel.close();
				}

				// Remove processed events
				keyIterator.remove();
			}

		}
	}
}

There will be a problem here, that is, the read and with states may be slow, so multiple clients are connected, that is, the processing logic of the server is too slow. The solution is proposed in Scalable IO in Java, using worker thread pool processing logic.

Reactor of business worker pool mode

 

A thread pool is added here for processing

In this mode, data processing is handed over to the thread for processing on the basis of a single Reactor. Slow business operation leads to traffic jam. But there will still be problems

For massive connections, all IOS can only be processed in one thread. Read data and write data are massive operations, but the thread cannot handle them.

Reactor of IO thread pool and business worker thread pool mode

The IO threads are separated here   Allocate io for multiple connections. Also to improve performance.

Implement a nioreader multithreading model

  • Create NIOReactorDemo data   MainReactor main thread and   SubReactor   Acceptor object
  • The Selector object is held in the MainReactor to run the driver that blocks waiting events. Initialize the main thread. Here, the thread is started directly instead of using the thread pool for management, because I think the main thread Reactor single thread is enough, and there is no need for multithreading. It does not involve data reading and writing, but just a transparent transmission.
public class NIOReactorDemo {

	private ServerSocketChannel serverSocketChannel;

	private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

	private MainReactor mainReactor = new MainReactor();// Responsible for receiving client connections

	public static void main(String[] args) throws Exception {
		NIOReactorDemo demo = new NIOReactorDemo();
		// initialization
		demo.init();
	}

	/**
	 * Initialize the server channel, register it in mainReactor, and start mainReactor
	 * 
	 * @throws IOException
	 */
	public void init() throws IOException {
		// The main thread initializes data and starts
		serverSocketChannel = ServerSocketChannel.open();
		serverSocketChannel.configureBlocking(false);
		mainReactor.registor(serverSocketChannel);
		mainReactor.start();

	}

	/**
	 * MainReactor Responsible for receiving client connection registration
	 */
	class MainReactor extends Thread {
		private Selector selector;

		@Override
		public void run() {

			while (true) {
				try {
					// 5. Start the selector (housekeeper)
					selector.select();// Block until the event notification is returned
					Set<SelectionKey> selectionKeys = selector.selectedKeys();
					Iterator<SelectionKey> iterator = selectionKeys.iterator();
					while (iterator.hasNext()) {
						SelectionKey key = iterator.next();
						iterator.remove();

						if (key.isAcceptable()) {
							SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
							// Connect the client to the acceptor
							new Acceptor(socketChannel);
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}

		public void registor(ServerSocketChannel serverSocketChannel) throws IOException {
			selector = Selector.open();
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		}

		/**
		 * Assign the client connection to a subReactor thread and start the subReactor thread
		 * 
		 * @author
		 *
		 */
		class Acceptor {

			public Acceptor(SocketChannel socketChannel) {

			}

		}
	}

	/**
	 * It is responsible for reading data from the client, handing it to the working thread for processing, and writing data to the client
	 */
	class SubReactor implements Runnable {

		@Override
		public void run() {

		}
	}

}
  • Next, continue to write the Acceptor class to allocate data  
	/**
		 * Assign the client connection to a subReactor thread and start the subReactor thread
		 * 
		 * @author
		 *
		 */
		class Acceptor {

			public Acceptor(SocketChannel socketChannel) throws IOException {
				socketChannel.configureBlocking(false);
				SubReactor subReactor = new SubReactor();
				subReactor.register(socketChannel);
				subThreadPool.execute(subReactor);
			}

		}
  • And create a select in the io task and register it.
/**
	 * It is responsible for reading data from the client, handing it to the working thread for processing, and writing data to the client
	 */
	class SubReactor implements Runnable {
		private Selector selector;

		public SubReactor() throws IOException {
			selector = Selector.open();
		}

		@Override
		public void run() {

		}

		/**
		 * Register the client channel in the selector and register the OP_READ event
		 * 
		 * @param socketChannel
		 * @throws ClosedChannelException
		 */
		public void register(SocketChannel socketChannel) throws ClosedChannelException {
			socketChannel.register(selector, SelectionKey.OP_READ);
		}
	}
  • In SubReactor   Read the data and give it to the handler. The event is passed and processed.
/**
	 * It is responsible for reading data from the client, handing it to the working thread for processing, and writing data to the client
	 */
	class SubReactor implements Runnable {
		private Selector selector;



		public SubReactor() throws IOException {
			selector = Selector.open();
		}

		@Override
		public void run() {


				try {
					// 5. Start the selector (housekeeper)
					selector.select();// Block until the event notification is returned

					Set<SelectionKey> selectionKeys = selector.selectedKeys();
					Iterator<SelectionKey> iterator = selectionKeys.iterator();
					while (iterator.hasNext()) {
						SelectionKey key = iterator.next();
						iterator.remove();

						if (key.isReadable()) {// Triggered when the client has data to read
							try {
								SocketChannel socketChannel = (SocketChannel) key.channel();
								new Handler(socketChannel);
							} catch (Exception e) {
								e.printStackTrace();
								key.cancel();
							}
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			
		}
  • Finally, a handler handles it and gives it to the business processing thread pool. Here, the size of the requested bytebuffer is set by yourself. It is not so complete, but can be used as an example. If optimization is required, the specific optimization
		class Handler {
			
			public Handler(SocketChannel socketChannel) throws IOException {
				ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
				while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
					// In the case of long connection, it is necessary to manually judge whether the data reading has ended (a simple judgment is made here: if it exceeds 0 bytes, the request is considered to have ended)
					if (requestBuffer.position() > 0)
						break;
				}
				if (requestBuffer.position() == 0)
					return; // If there is no data, the subsequent processing will not be continued
				requestBuffer.flip();
				byte[] content = new byte[requestBuffer.remaining()];
				requestBuffer.get(content);
				System.out.println(new String(content));
				System.out.println("Data received,come from:" + socketChannel.getRemoteAddress());
				// TODO business operation, database interface call, etc
				workPool.execute(new Runnable() {
					@Override
					public void run() {
						// Processing business
					}
				});

				// Response result 200
				String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World";
				ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
				while (buffer.hasRemaining()) {
					socketChannel.write(buffer);
				}
			}
		}
  • Finally, make a binding and start the server in the mian method
	public static void main(String[] args) throws Exception {
		NIOReactorDemo demo = new NIOReactorDemo();
		// initialization
		demo.init();
		demo.bind();
	}
/**
	 * Bind port and start service
	 * 
	 * @throws IOException
	 */
	private void bind() throws IOException {
		serverSocketChannel.bind(new InetSocketAddress(8080));
	}

The Reactor model adopted adopts event driven to split and split the IO data flow, so as to solve the problem of non blocking batch connection and business processing

Keywords: Java http reactor TCP/IP

Added by OopyBoo on Mon, 27 Sep 2021 08:13:46 +0300