Implementation of non blocking echo server by Kotlin NIO

Traditional blocking schemes usually use other threads to process when there are new links, and each thread is responsible for processing a socket, such as

      ServerSocket serverSocket = new ServerSocket(3000);

            while(true){
                Socket socket = serverSocket.accept();
                new Thread(()->{
                    ///socket.getInputStream()
                   /// ...
                }).start();
            }

The disadvantage of this method is that if there are many connections, creating a large number of threads will have a lot of overhead and eventually lead to resource depletion. Of course, we can optimize with thread pool. such as

         ServerSocket serverSocket = new ServerSocket(3000);
            ExecutorService executorService = Executors.newFixedThreadPool(200);
            while(true){
                Socket socket = serverSocket.accept();
                executorService.execute(()->{
                   // socket.getInputStream()
                    //...
                });
            }

After using thread pool for optimization, threads can be reused, which reduces a lot of overhead when repeatedly creating and destroying threads. However, if the number of connections exceeds the threads owned by the thread pool, the connection cannot get a timely response.

In Web projects, most of the waiting time is not because the cpu needs to process complex operations, but because it needs to wait for the response of the database or other networks. When blocking, the operating system will suspend the thread until the socket has resources.
For example, in the operation just now, we need to wait for the message from the client

          InputStream inputStream = socket.getInputStream();
                        int read = inputStream.read();  

read() is a blocking method

    /**
     * Reads the next byte of data from the input stream. The value byte is
     * returned as an <code>int</code> in the range <code>0</code> to
     * <code>255</code>. If no byte is available because the end of the stream
     * has been reached, the value <code>-1</code> is returned. This method
     * blocks until input data is available, the end of the stream is detected,
     * or an exception is thrown.
     *
     * <p> A subclass must provide an implementation of this method.
     *
     * @return     the next byte of data, or <code>-1</code> if the end of the
     *             stream is reached.
     * @exception  IOException  if an I/O error occurs.
     */
    public abstract int read() throws IOException;

This method blocks until input data is available, the end of the stream is detected, or an exception is thrown
This means that the method will be blocked until available data arrives. In the process of blocking, the thread calling this method is hung by cpu.

Threads do not consume cpu time slices when suspended, but this does not mean that threads no longer have overhead. If a large number of threads are suspended, it means that a large amount of memory and other resources need to be paid.

To solve this problem, we can use NIO. A non blocking api is implemented to service multiple connections with a small number of threads.
The knowledge about NIO is not repeated here.

Through NIO, we can process code through asynchronous callback.
The following server code is written in kotlin

class MyServer(private val port: Int) {
    val asynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newSingleThreadExecutor())
    fun start() {
        val socket =
            AsynchronousServerSocketChannel.open(asynchronousChannelGroup)
                .bind(InetSocketAddress(port))
        runBlocking {
            while (true) {
                val client = connect(socket)
                launch {
                    while (true) {
                        val message = readFromConnect(client)
                        println("${Thread.currentThread().name}From client ${client.remoteAddress} News of $message")
                        if (message == "bye\n") {
                            println("Ready to disconnect")
                            writeIntoConnect(client, "bye, too\n")
                            break
                        } else {
                            writeIntoConnect(client, "I've received it $message")
                        }
                    }
                    println("connection dropped ${client.remoteAddress}")
                }

            }
        }
    }
    // asynchronousChannelGroup.awaitTermination(0, TimeUnit.DAYS)
}

Let's look at the key connect method

suspend fun connect(socket: AsynchronousServerSocketChannel) = suspendCoroutine<AsynchronousSocketChannel> {
    socket.accept(null, object : CompletionHandler<AsynchronousSocketChannel, Any?> {
        override fun completed(client: AsynchronousSocketChannel?, attachment: Any?) {
            println("${Thread.currentThread().name} Connect to the server:${client!!.remoteAddress}")
            it.resume(client)
        }

        override fun failed(exc: Throwable?, attachment: Any?) {
            // TODO("Not yet implemented")
        }
    })
}

This is a suspend method, which means that the coroutine calling this method will be suspended.
Note that the coroutine is suspended here, not the thread. When a coroutine is suspended, its thread can be transferred to perform other tasks.
When there is a new link, the callback is executed

 it.resume(client)

To wake up the process and pass the return value.

In the launch, we open a new collaboration to handle this connection.
For ease of understanding, let's simulate this idea with java and thread

 while(true){
                Socket socket = serverSocket.accept();
                executorService.execute(()->{
                    try {
                        InputStream inputStream = socket.getInputStream();
                        while(true){
                            int read = inputStream.read();
                            //do something
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    //...
                });
            }

You can see that there are two while, of which the outer layer while is used to continuously receive the connection and the inner layer while is used to continuously process the data received in the connection. It's just that here we replace threads with coprocessors.

In the process of continuously processing the received data, we still need to wait, because the client does not necessarily send messages continuously. If we use traditional BIO and thread processing, we may need to block and wait for the client's messages. Whenever there is a message, the thread will be resumed for processing. The thread scheduling process itself brings some overhead.
We can solve this problem with collaborative process.

        while (true) {
                        val message = readFromConnect(client)
                        println("${Thread.currentThread().name}From client ${client.remoteAddress} News of $message")

We use a suspend method to process the request. Like the previous suspend method, this does not block the thread.

suspend fun readFromConnect(client: AsynchronousSocketChannel) = suspendCoroutine<String> {
    val byteBuffer1: ByteBuffer = ByteBuffer.allocateDirect(2048)
    client.read(byteBuffer1, Attachment(byteBuffer1), object : CompletionHandler<Int, Attachment> {
        override fun completed(result: Int?, attachment: Attachment?) {
            val byteBuffer = attachment!!.byteBuffer
            //At this time, there is already data in it, which was written before. Now it needs to be converted to read mode
            //This operation puts the limit to the current position to ensure that there will be no more reading
            byteBuffer.flip()
            val byteArray = ByteArray(byteBuffer.limit())
            byteBuffer.get(byteArray)
            byteBuffer.clear()
            val message = String(byteArray)
            it.resume(message)
        }

        override fun failed(exc: Throwable?, attachment: Attachment?) {
            // TODO("Not yet implemented")
        }
    }
    )
}

After receiving the message, we read it through ByteBuffer.
Next, we do some processing on the read message

                    println("${Thread.currentThread().name}From client ${client.remoteAddress} News of $message")
                        if (message == "bye\n") {
                            println("Ready to disconnect")
                            writeIntoConnect(client, "bye, too\n")
                            break
                        } else {
                            writeIntoConnect(client, "I've received it $message")
                        }

NIO is non blocking, and the write method returns immediately

fun writeIntoConnect(client: AsynchronousSocketChannel, echoMessage: String) {
    val byteBuffer1: ByteBuffer = ByteBuffer.allocateDirect(2048)
    byteBuffer1.put(echoMessage.toByteArray())
    byteBuffer1.flip()
    client.write(byteBuffer1)
}

In this way, we can handle many requests through a small number of threads - in extreme cases, one thread can handle many requests.

Next, conduct the following test. The client code is written before and is still BIO

class MyClint(val port: Int) {
    fun start() = runBlocking {
        val clientSocket = Socket("127.0.0.1", port)
        val scanner = Scanner(System.`in`)
        println("A connection has been established with the server")
        val outputStream = clientSocket.getOutputStream()
        val bufferedWriter = BufferedWriter(OutputStreamWriter(outputStream))
        println("Get output stream succeeded")
        while (scanner.hasNext()) {
            val nextLine = scanner.nextLine()
            bufferedWriter.write(nextLine + "\n")
            bufferedWriter.flush()
            val bufferedReader = BufferedReader(InputStreamReader(clientSocket.getInputStream()))
            //readLine received no newline \ n
            val message = bufferedReader.readLine() ?: break

            println("Message from server: ${message}")
            if(message=="bye, too"){
                clientSocket.close()
                println("connection dropped")
                break
            }
        }
    }
}

fun main() {
    MyClint(5000).start()
}

Let's test whether we can handle multiple connections with only one thread
Note that there is only one thread in the thread pool used by our server

 val asynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newSingleThreadExecutor())

Start two clients to connect


Next, process the message from the client


Keywords: Java kotlin server NIO tcp

Added by coelex on Thu, 03 Feb 2022 10:29:21 +0200