preface
Reactor design pattern is a common pattern for processing concurrent I/O. it sends client requests to different processors to improve the efficiency of event processing. In the most common application scenario, java NIO, users process network requests using asynchronous non blocking IO. Including the netty framework, the model is used. As for the definition of asynchrony and non blocking, please see my article below
Layer 7 protocol and TCP, UDP, Http and Nio analysis of network programming foundation
Reactor model overview
The reason for using the Reactor model mainly comes from using NIO only, which can only solve the multi connection problem and is non blocking. However, after the connection is established, the processing of each connection is only single thread and synchronous processing; How to achieve asynchronous processing, the Reactor model is introduced here, and NIO is combined with threads to quickly process tasks.
Scalable written in doug lea IO in In Java, this book analyzes and builds scalable high-performance IO services, introduces the event driven model, and describes how nio can be combined with threads to improve the processing power of programs.
Scalable IO in Java extract code 9aru
Next, take a part of the graph and analyze the Reactor model
It includes multiple clients, sends them to the server, and uses the Reactor to allocate the Dispatch To deal with
Reactor under single thread model
In the single threaded model:
The reactor thread client sends the client request to the acceptor for processing, and the acceptor assigns the new client connection to the handler; Each handler is processed.
This is a design pattern achieved by the selector selector in the jdk to process business logic through each channel; The logical implementation here
public class SelectorDemo { public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // Create selector Selector selector = Selector.open(); // Register selector ssc.register(selector, SelectionKey.OP_ACCEPT);// You need to set the default event here ssc.socket().bind(new InetSocketAddress(8080));// Binding port while (true) { int readyChannels = selector.select();// Will block until an event triggers if (readyChannels == 0) continue; Set<SelectionKey> selectedKeys = selector.selectedKeys();// Gets the collection of triggered events Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); // ServerSocketChannel has received a new connection, which can only act on ServerSocketChannel } else if (key.isConnectable()) { // Connecting to a remote server takes effect only when the client connects asynchronously } else if (key.isReadable()) { // There is data in SocketChannel that can be read ByteBuffer requestBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) { // In the case of long connection, it is necessary to manually judge whether the data reading has ended (a simple judgment is made here: if it exceeds 0 bytes, the request is considered to have ended) if (requestBuffer.position() > 0) break; } requestBuffer.flip(); byte[] content = new byte[requestBuffer.limit()]; requestBuffer.get(content); System.out.println(new String(content)); socketChannel.register(selector, SelectionKey.OP_WRITE); } else if (key.isWritable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); // SocketChannel can start writing data ByteBuffer buffer = ByteBuffer.wrap("sucess".getBytes()); socketChannel.write(buffer); socketChannel.close(); } // Remove processed events keyIterator.remove(); } } } }
There will be a problem here, that is, the read and with states may be slow, so multiple clients are connected, that is, the processing logic of the server is too slow. The solution is proposed in Scalable IO in Java, using worker thread pool processing logic.
Reactor of business worker pool mode
A thread pool is added here for processing
In this mode, data processing is handed over to the thread for processing on the basis of a single Reactor. Slow business operation leads to traffic jam. But there will still be problems
For massive connections, all IOS can only be processed in one thread. Read data and write data are massive operations, but the thread cannot handle them.
Reactor of IO thread pool and business worker thread pool mode
The IO threads are separated here Allocate io for multiple connections. Also to improve performance.
Implement a nioreader multithreading model
- Create NIOReactorDemo data MainReactor main thread and SubReactor Acceptor object
- The Selector object is held in the MainReactor to run the driver that blocks waiting events. Initialize the main thread. Here, the thread is started directly instead of using the thread pool for management, because I think the main thread Reactor single thread is enough, and there is no need for multithreading. It does not involve data reading and writing, but just a transparent transmission.
public class NIOReactorDemo { private ServerSocketChannel serverSocketChannel; private static ExecutorService threadPool = Executors.newFixedThreadPool(10); private MainReactor mainReactor = new MainReactor();// Responsible for receiving client connections public static void main(String[] args) throws Exception { NIOReactorDemo demo = new NIOReactorDemo(); // initialization demo.init(); } /** * Initialize the server channel, register it in mainReactor, and start mainReactor * * @throws IOException */ public void init() throws IOException { // The main thread initializes data and starts serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); mainReactor.registor(serverSocketChannel); mainReactor.start(); } /** * MainReactor Responsible for receiving client connection registration */ class MainReactor extends Thread { private Selector selector; @Override public void run() { while (true) { try { // 5. Start the selector (housekeeper) selector.select();// Block until the event notification is returned Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept(); // Connect the client to the acceptor new Acceptor(socketChannel); } } } catch (Exception e) { e.printStackTrace(); } } } public void registor(ServerSocketChannel serverSocketChannel) throws IOException { selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } /** * Assign the client connection to a subReactor thread and start the subReactor thread * * @author * */ class Acceptor { public Acceptor(SocketChannel socketChannel) { } } } /** * It is responsible for reading data from the client, handing it to the working thread for processing, and writing data to the client */ class SubReactor implements Runnable { @Override public void run() { } } }
- Next, continue to write the Acceptor class to allocate data
/** * Assign the client connection to a subReactor thread and start the subReactor thread * * @author * */ class Acceptor { public Acceptor(SocketChannel socketChannel) throws IOException { socketChannel.configureBlocking(false); SubReactor subReactor = new SubReactor(); subReactor.register(socketChannel); subThreadPool.execute(subReactor); } }
- And create a select in the io task and register it.
/** * It is responsible for reading data from the client, handing it to the working thread for processing, and writing data to the client */ class SubReactor implements Runnable { private Selector selector; public SubReactor() throws IOException { selector = Selector.open(); } @Override public void run() { } /** * Register the client channel in the selector and register the OP_READ event * * @param socketChannel * @throws ClosedChannelException */ public void register(SocketChannel socketChannel) throws ClosedChannelException { socketChannel.register(selector, SelectionKey.OP_READ); } }
- In SubReactor Read the data and give it to the handler. The event is passed and processed.
/** * It is responsible for reading data from the client, handing it to the working thread for processing, and writing data to the client */ class SubReactor implements Runnable { private Selector selector; public SubReactor() throws IOException { selector = Selector.open(); } @Override public void run() { try { // 5. Start the selector (housekeeper) selector.select();// Block until the event notification is returned Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) {// Triggered when the client has data to read try { SocketChannel socketChannel = (SocketChannel) key.channel(); new Handler(socketChannel); } catch (Exception e) { e.printStackTrace(); key.cancel(); } } } } catch (Exception e) { e.printStackTrace(); } }
- Finally, a handler handles it and gives it to the business processing thread pool. Here, the size of the requested bytebuffer is set by yourself. It is not so complete, but can be used as an example. If optimization is required, the specific optimization
class Handler { public Handler(SocketChannel socketChannel) throws IOException { ByteBuffer requestBuffer = ByteBuffer.allocate(1024); while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) { // In the case of long connection, it is necessary to manually judge whether the data reading has ended (a simple judgment is made here: if it exceeds 0 bytes, the request is considered to have ended) if (requestBuffer.position() > 0) break; } if (requestBuffer.position() == 0) return; // If there is no data, the subsequent processing will not be continued requestBuffer.flip(); byte[] content = new byte[requestBuffer.remaining()]; requestBuffer.get(content); System.out.println(new String(content)); System.out.println("Data received,come from:" + socketChannel.getRemoteAddress()); // TODO business operation, database interface call, etc workPool.execute(new Runnable() { @Override public void run() { // Processing business } }); // Response result 200 String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World"; ByteBuffer buffer = ByteBuffer.wrap(response.getBytes()); while (buffer.hasRemaining()) { socketChannel.write(buffer); } } }
- Finally, make a binding and start the server in the mian method
public static void main(String[] args) throws Exception { NIOReactorDemo demo = new NIOReactorDemo(); // initialization demo.init(); demo.bind(); } /** * Bind port and start service * * @throws IOException */ private void bind() throws IOException { serverSocketChannel.bind(new InetSocketAddress(8080)); }
The Reactor model adopted adopts event driven to split and split the IO data flow, so as to solve the problem of non blocking batch connection and business processing