Full handwritten NettyIO model

preface

The last chapter introduces the development process and practical application of Io. When talking about the multithreading model of multiplexer, I have gone through it. This chapter will supplement here, and then go to Netty's IO model. Finally, the idea will be realized through code.

1, Introduce

In the previous chapter, a multiplexer was implemented by using a single thread. However, even if epoll is very efficient, if there is data in 100000 connections, the event notification within the program still needs to be processed one by one, so the user experience is very poor and even useless.

The previous chapter said that if the multiplexer is used, the data packets will be disordered. In fact, there is a solution to this situation, that is, each time the client receives an event notification, the connection will be removed from the multiplexer and added to the multiplexer after processing. Think about it. This can indeed solve the disadvantages of single thread reading, but it also brings new bottlenecks. The client needs to make two system calls every time it processes the data of each connection. In this way, the overhead of the client in completing the processing of 100000 connection data is very large.

On the above question, we can change our thinking. Here, can we use multiple multiplexers, each multiplexer is serial internally, and multiple multiplexers are parallel, so that parallel processing can be achieved logically, and there are no redundant system calls.

The following figure should be familiar to those familiar with Netty. The design idea is based on the above theory. It is mainly divided into two parts:

  1. Pick up. It is mainly responsible for receiving connections and handing over the received connections to the workers according to a certain allocation algorithm;
  2. Work. Mainly responsible for reading and writing related sockets;

There are three stages in each Loop:

  1. Accept/Read/Write;
  2. Handle connection allocation and service related;
  3. It is generally used to handle the wake-up of the Selector. In this way, when multiple threads are started, the select state that all threads have entered is blocked, and there is no monitored fd in the multiplexer at this time. Therefore, when registering a new fd with the Selector, it is necessary to wake up the Selector thread and re select.

2, Experiment

Now that the theory here is clear, let's think about how to write code changes?

  • Multiple selectors, serial in each Selector. I can monopolize one thread with one Selector;
  • Since there are multiple threads, create a thread group to manage these threads;
  • The thread group needs to distinguish between the threads that receive connections and the threads that process messages. There are two thread pools in the thread group, one is responsible for receiving connections and the other is responsible for processing connection messages;
  • For the socket of the listening port itself and the socket received, the program needs to allocate it to different selectors, so here we need a dynamic allocation connection algorithm;

Based on these design ideas, let's take a look at the source code after implementation.

package com.dxg.socket;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class SocketNIOSingleMultiplexerForNetty {

    public static void main(String[] args) {
        // Create a thread group to manage all thread loop s
        ThreadSelectorGroup threadSelectorGroup = new ThreadSelectorGroup(3,
                4);

        // Binding port
        threadSelectorGroup.bind(8381);
        threadSelectorGroup.bind(8382);
        threadSelectorGroup.bind(8383);
        threadSelectorGroup.bind(8384);
        threadSelectorGroup.bind(8385);
    }

}

/**
 * Management thread
 * Allocate fd
 */
class ThreadSelectorGroup {

    public static final String BOSS = "boss";
    public static final String WORKER = "worker";

    /**
     * Allocate thread index
     */
    private AtomicInteger nowBossIndex = new AtomicInteger(0);
    private AtomicInteger nowWorkerIndex = new AtomicInteger(0);

    /**
     * Thread collection
     */
    private SelectorThread[] bossSelectorThread = null;
    private SelectorThread[] workerSelectorThread = null;


    private ServerSocketChannel serverSocketChannel = null;

    public ThreadSelectorGroup(int bossThread, int workerThread) {
        int boss = bossThread <= 0 ? 1 : bossThread;
        int worker = workerThread <= 0 ? 1 : workerThread;
        this.bossSelectorThread = new SelectorThread[boss];
        this.workerSelectorThread = new SelectorThread[worker];

        ExecutorService bossThreadPool = Executors.newFixedThreadPool(boss,
                this.getThreadFactory(BOSS));
        ExecutorService workerThreadPool = Executors.newFixedThreadPool(worker,
                this.getThreadFactory(WORKER));

        // Initialize receive connection thread
        for (int i = 0; i < boss; i++) {
            this.bossSelectorThread[i] = new SelectorThread(this);
            bossThreadPool.execute(this.bossSelectorThread[i]);
        }
        // Initialize the message processing thread
        for (int i = 0; i < worker; i++) {
            this.workerSelectorThread[i] = new SelectorThread(this);
            workerThreadPool.execute(this.workerSelectorThread[i]);
        }
    }


    /**
     * Binding port
     *
     * @param prot
     */
    public void bind(int prot) {
        try {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocketChannel.bind(new InetSocketAddress(prot));

            this.distribute(this.serverSocketChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    /**
     * Associate fd with the corresponding Selector thread for autonomous registration
     */
    public void distribute(Channel channel) {
        SelectorThread selectorThread = this.next(channel);
        try {
            // You need to add the channel to be registered to the queue first, and wake up the selector after the thread processing is completed
            selectorThread.getBlockingQueue().put(channel);
            selectorThread.getSelector().wakeup();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * Poll to get the selector that needs to be registered
     *
     * @param channel
     * @return
     */
    private SelectorThread next(Channel channel) {
        SelectorThread selectorThread = null;
        if (channel instanceof ServerSocketChannel) {
            selectorThread = this.bossSelectorThread[
                    this.nowBossIndex.getAndIncrement() %
                            this.bossSelectorThread.length];
        }
        if (channel instanceof SocketChannel) {
            selectorThread = this.workerSelectorThread[
                    this.nowWorkerIndex.getAndIncrement() %
                            this.workerSelectorThread.length];
        }
        return selectorThread;
    }


    /**
     * Get thread factory
     *
     * @param type
     * @return
     */
    private ThreadFactory getThreadFactory(String type) {
        ThreadFactory threadFactory = null;
        switch (type) {
            case BOSS:
                threadFactory =
                        new ThreadFactoryBuilder().setNameFormat(BOSS +
                                "-pool-%d").build();
                break;
            case WORKER:
                threadFactory =
                        new ThreadFactoryBuilder().setNameFormat(WORKER +
                                "-pool-%d").build();
                break;
        }
        return threadFactory;
    }

}

/**
 * A thread occupies a multiplexer independently
 */
@Getter
class SelectorThread extends ThreadLocal<LinkedBlockingQueue<Channel>> implements Runnable {

    private Selector selector = null;

    /**
     * Each thread needs to hold a reference to the thread group, which maintains the connection allocation rules
     */
    private ThreadSelectorGroup threadSelectorGroup;

    private BlockingQueue<Channel> blockingQueue = get();

    public SelectorThread(ThreadSelectorGroup threadSelectorGroup) {
        // Initialize multiplexer
        try {
            this.selector = Selector.open();
            this.threadSelectorGroup = threadSelectorGroup;

        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void run() {
        System.out.println("Thread name started:" + Thread.currentThread().getName());
        while (true) {
            try {
                int n = this.selector.select();
                if (n > 0) {
                    Set<SelectionKey> keys = this.selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isAcceptable()) {
                            // Receive connection
                            this.acceptHandler(key);
                        } else if (key.isReadable()) {
                            // Read data
                            this.readHandler(key);
                        } else if (key.isWritable()) {
                            // Write data
                            this.writeHandler(key);
                        }
                    }
                } else if (n == 0) {
                    // When the program starts, the thread blocks to select. When it is awakened, it will receive an event of empty data.
                    // You can get the data in the queue here for registration processing
                    this.registerHandle();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * register
     */
    private void registerHandle() {
        if (this.blockingQueue.isEmpty()) {
            return;
        }
        try {
            Channel channel = this.blockingQueue.take();
            // The server focuses on receiving events
            if (channel instanceof ServerSocketChannel) {
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) channel;
                serverSocketChannel.register(this.selector,
                        SelectionKey.OP_ACCEPT);
                System.out.println("Server registration-Thread Name:" + Thread.currentThread().getName());
            }
            // The client focuses on reading events
            else if (channel instanceof SocketChannel) {
                SocketChannel socketChannel = (SocketChannel) channel;

                ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
                socketChannel.register(this.selector, SelectionKey.OP_READ,
                        buffer);
                System.out.println("Thread Name:" + Thread.currentThread().getName() +
                        ",Client received:" + socketChannel.getRemoteAddress());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * Receive connection
     */
    private void acceptHandler(SelectionKey key) {

        try {
            ServerSocketChannel channel = (ServerSocketChannel) key.channel();
            SocketChannel client = channel.accept();
            client.configureBlocking(false);

            // Assign the received connection to the corresponding Selector
            threadSelectorGroup.distribute(client);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    /**
     * Read data
     */
    private void readHandler(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        while (true) {
            try {
                int n = socketChannel.read(buffer);

                if (n > 0) {
                    // Output the read data to the console
                    buffer.flip();
                    byte[] datas = new byte[buffer.limit()];
                    buffer.get(datas);
                    String str = new String(datas);
                    System.out.println("Thread Name:" + Thread.currentThread().getName() + ",Content:" + str);

                    // Write the read data back to the client
                    socketChannel.register(this.selector, SelectionKey.OP_WRITE,
                            buffer);
                } else if (n == 0) {
                    // Null data
                    break;
                } else {
                    // Processing of disconnection
                    System.out.println("client close:" + socketChannel.getRemoteAddress());
                    key.cancel();
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * Write data
     *
     * @param key
     */
    private void writeHandler(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.flip();
        while (buffer.hasRemaining()) {
            try {
                socketChannel.write(buffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        try {
            buffer.clear();
            // The multiplexer registers an event read / write / receive / connect for a single fd
            // The read event registered during reading is overwritten by the write event. Re register here
            socketChannel.register(this.selector, SelectionKey.OP_READ,
                    buffer);
        } catch (ClosedChannelException e) {
            e.printStackTrace();
        }
    }


    @Override
    protected LinkedBlockingQueue<Channel> initialValue() {
        return new LinkedBlockingQueue<>();
    }
}

It mainly includes three classes:

  1. The main method binds the port of the thread group to enable listening;
  2. Thread group;
  3. Single Selector;

Thus, the client can independently control the number of threads receiving connections and the number of threads processing messages, and receive and process data separately.

Take a brief look at the running results:

As shown in the figure:

  1. At startup, the program creates the boss thread and worker thread respectively according to the number of incoming threads, and binds the port listener to the boss thread.
  2. The client accesses and is allocated to the worker thread for processing.
  3. The data entered by the client is printed on the console and written to the client.

Such a simple version of the Netty IO model has been implemented.

The length of IO will be updated here for the time being.
Netty source code, which will be expanded in the source code chapter.

Keywords: Java Netty NIO epoll

Added by mcmuney on Sat, 22 Jan 2022 23:07:30 +0200