Java non-blocking IO and asynchronous IO

Links to the original text: https://www.javadoop.com/post/nio-and-aio

The last article introduced the basic operations of Buffer, Channel and Selector in Java NIO, mainly some interface operations, which are relatively simple.

This article will introduce non-blocking IO and asynchronous IO, which are well-known NIO and AIO. Many beginners may not be able to tell the difference between asynchronous and non-blocking, but the word asynchronous and non-blocking can only be heard on various occasions.

This article first introduces and demonstrates the blocking mode, then introduces the non-blocking mode to optimize the blocking mode, and finally introduces the asynchronous IO introduced by JDK7. Because the introduction of asynchronous IO on the Internet is relatively small, I will introduce this part of the content in detail.

Hope that after reading this article, readers can see the fog of non-blocking IO and asynchronous IO more clearly, or for beginners to solve a little doubt is also good.

Blocking mode IO

We have already introduced the Server Socket Channel, Socket Channel and Buffer needed to make a simple client-server network communication using Java NIO packages. Let's integrate them here and give a complete working example:

public class Server {

    public static void main(String[] args) throws IOException {

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // Listen for TCP links coming in from port 8080
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));

        while (true) {

            // This will block until a connection to a request comes in.
            SocketChannel socketChannel = serverSocketChannel.accept();

            // Open a new thread to process the request and continue listening on port 8080 in the while loop
            SocketHandler handler = new SocketHandler(socketChannel);
            new Thread(handler).start();
        }
    }
}

Here's a look at what new threads need to do, SocketHandler:

public class SocketHandler implements Runnable {

    private SocketChannel socketChannel;

    public SocketHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        try {
            // Read request data into Buffer
            int num;
            while ((num = socketChannel.read(buffer)) > 0) {
                // flip before reading Buffer content
                buffer.flip();

                // Extracting data from Buffer
                byte[] bytes = new byte[num];
                buffer.get(bytes);

                String re = new String(bytes, "UTF-8");
                System.out.println("Request received:" + re);

                // Response Client
                ByteBuffer writeBuffer = ByteBuffer.wrap(("I have received your request. The content of your request is:" + re).getBytes());
                socketChannel.write(writeBuffer);

                buffer.clear();
            }
        } catch (IOException e) {
            IOUtils.closeQuietly(socketChannel);
        }
    }
}

Finally, the use of the client Socket Channel is pasted. The client is relatively simple:

public class SocketChannelTest {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 8080));

        // Send requests
        ByteBuffer buffer = ByteBuffer.wrap("1234567890".getBytes());
        socketChannel.write(buffer);

        // Read response
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        int num;
        if ((num = socketChannel.read(readBuffer)) > 0) {
            readBuffer.flip();

            byte[] re = new byte[num];
            readBuffer.get(re);

            String result = new String(re, "UTF-8");
            System.out.println("Return value: " + result);
        }
    }
}

The blocking mode code described above should be well understood: when a new connection is made, we open a new thread to handle the connection, and the subsequent operations are all performed by that thread.

So, where are the performance bottlenecks in this mode?

  1. First, it's certainly not appropriate to open a new thread every time a connection comes. This is certainly possible when the number of active connections is in the tens or hundreds, but if the number of active connections is in the tens of thousands or hundreds of thousands, so many threads will obviously fail. Each thread needs a portion of memory, which will be consumed quickly, and the cost of thread switching is very high.
  2. Secondly, blocking is also a problem here. First, accept() is a blocking operation. When accept() returns, it means that a connection can be used. We will create a new thread to process the Socket Channel right away, but this does not mean that the other party will transfer the data. So the SocketChannel#read method will block and wait for data, which is obviously not worth the wait. Similarly, the write method also needs to wait for the channel to be writable to perform the write operation, which is not worth the blocking wait.

Non-blocking IO

Having said the use of blocking mode and its shortcomings, we can introduce non-blocking IO here.

The core of non-blocking IO is to use a Selector to manage multiple channels, either Socket Channel or Server Socket Channel, to register each channel on the Selector and specify the events to listen on.

Then you can poll the Selector with only one thread to see if the channel is ready. When the channel is ready to read or write, then you can start to read and write, which is very fast. We don't need to have a thread for every channel at all.

Selector in NIO is an abstraction of the implementation of the underlying operating system. In fact, the management channel state is implemented by the underlying system. Here, we briefly introduce the implementation under different systems.

select: It was implemented in the 1980s, and it supports the registration of FD_SETSIZE(1024) socket s, which was certainly enough in that era, but now it is definitely not.

Pol: In 1997, poll emerged as a substitute for select. The biggest difference is that poll no longer limits the number of socket s.

Both select and poll have a common problem, that is, they only tell you how many channels are ready, but they don't tell you exactly which ones are. So, once you know that there are channels ready, you still need a scan. Obviously, this is not good. When there are few channels, it is OK. Once the number of channels is more than hundreds of thousands, the scanning time is considerable, and the time complexity is O(n). Therefore, the following implementations came into being later.

Epoll: With the release of Linux Kernel 2.5.44 in 2002, epoll can directly return to specific ready channels with time complexity O(1).

In addition to epoll in Linux, Kqueue appeared in FreeBSD in 2000 and / dev/poll in Solaris.

There are so many implementations mentioned above, but there is no Windows. The non-blocking IO of Windows platform uses select. We don't need to feel that Windows is very backward. In Windows, the asynchronous IO provided by IOCP is relatively powerful.

Let's go back to Selector. After all, JVM is such a platform to shield the underlying implementation. We can program for Selector.

The basic usage of Selector has been known before. Here's a runnable example code. Let's take a look at it.

public class SelectorServer {

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();

        ServerSocketChannel server = ServerSocketChannel.open();
        server.socket().bind(new InetSocketAddress(8080));

        // Register it in Selector to listen for OP_ACCEPT events
        server.configureBlocking(false);
        server.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            int readyChannels = selector.select();
            if (readyChannels == 0) {
                continue;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            // ergodic
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();

                if (key.isAcceptable()) {
                    // There are accepted new connections to the server
                    SocketChannel socketChannel = server.accept();

                    // A new connection does not mean that the channel has data.
                    // Here, register this new Socket Channel with Selector, listen for OP_READ events, and wait for data
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // Data Readable
                    // The Socket Channel that monitors OP_READ events is registered in the above if branch
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int num = socketChannel.read(readBuffer);
                    if (num > 0) {
                        // Processing incoming data...
                        System.out.println("Data received:" + new String(readBuffer.array()).trim());
                        ByteBuffer buffer = ByteBuffer.wrap("Data returned to the client...".getBytes());
                        socketChannel.write(buffer);
                    } else if (num == -1) {
                        // - 1 represents that the connection has been closed
                        socketChannel.close();
                    }
                }
            }
        }
    }
}

As for the client, you can continue to use the client described in the previous section when blocking mode for testing.

NIO.2 Asynchronous IO

More New IO, or NIO.2, was released with JDK 1.7, including file access interfaces such as asynchronous IO interface and Paths.

Asynchronism is a term that I think is familiar to most developers. We use asynchronism in many scenarios.

Usually, we have a thread pool for asynchronous tasks, and the thread submitting the task submits the task to the thread pool and returns immediately without waiting for the task to be completed. If you want to know the result of a task, you usually call it after the task is finished by passing a callback function.

The same principle applies to asynchronous IO in Java, where a thread pool is responsible for executing tasks and then using callbacks or querying results on its own.

Most developers know why they design this way, so let's talk a little bit more about it. The main purpose of asynchronous IO is to control the number of threads and reduce the memory consumption and CPU overhead on thread scheduling caused by too many threads.

In Unix/Linux and other systems, JDK uses thread pools in concurrent packages to manage tasks, specifically to view the source code of Asynchronous Channel Group.

In the Windows operating system, there is a service called I/O Completion Ports The scheme is usually referred to as IOCP. The operating system manages the thread pool, and its performance is very excellent. So JDK directly uses IOCP support in Windows, uses system support, and exposes more operation information to the operating system, which also enables the operating system to optimize our IO to a certain extent.

In fact, there are asynchronous IO systems implemented in Linux, but there are many limitations, and the performance is general, so JDK adopts the way of self-built thread pool.

This article is mainly practical. If you want to know more information, please look for other information by yourself. Here is a practical introduction to Java asynchronous IO.

There are three classes that need our attention. They are Asynchronous Socket Channel, Asynchronous Server Socket Channel and Asynchronous File Channel. They just add a prefix Asynchronous to the class names of File Channel, Socket Channel and Server Socket Channel introduced earlier.

Java asynchronous IO provides two ways to use it, one is to return a Future instance and the other is to use a callback function.

1. Return to Future instance

We should be familiar with the way to return java.util.concurrent.Future instances, which is how JDK thread pools are used. Several method semantics of the Future interface are also common here, so here is a brief introduction.

  • future.isDone();

    Determine whether the operation has been completed, including normal completion, exception throwing, cancellation

  • future.cancel(true);

    Cancel the operation by interrupting. The parameter true says that even if the task is being performed, it will be interrupted.

  • future.isCancelled();

    Whether cancelled or not, this method will return true only if cancelled before the normal end of the task.

  • future.get();

    This is our old friend, get the execution results, block.

  • future.get(10, TimeUnit.SECONDS);

    If you are not satisfied with the blocking of the get() method above, set a timeout.

2. Provide CompletionHandler callback function

java.nio.channels.CompletionHandler interface definition:

public interface CompletionHandler<V,A> {

    void completed(V result, A attachment);

    void failed(Throwable exc, A attachment);
}

Note that there is an attachment on the parameter, which, although not commonly used, can be passed in various supported methods.

AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null);

// The first parameter of the accept method can pass attachment
listener.accept(attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
    public void completed(
      AsynchronousSocketChannel client, Object attachment) {
          // 
      }
    public void failed(Throwable exc, Object attachment) {
          // 
      }
});

AsynchronousFileChannel

There are many articles about Non-Blocking IO on the Internet, but there are fewer articles about Asynchronous IO, so I will introduce more about it here.

First, let's focus on asynchronous file IO. As we said earlier, file IO does not support non-blocking mode in all operating systems, but we can use asynchronous mode to improve the performance of file IO.

Next, I'll introduce some of the important interfaces in Asynchronous File Channel, which are very simple. If the reader feels bored, just slide to the next title.

Instance:

AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("/Users/hongjie/test.txt"));

Once the instantiation is complete, we can start preparing to read the data into Buffer:

ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> result = channel.read(buffer, 0);

Both read and write operations of asynchronous file channels need to provide a starting position for the file, which is 0.

In addition to returning Future instances, callback functions can also be used to operate with the following interfaces:

public abstract <A> void read(ByteBuffer dst,
                              long position,
                              A attachment,
                              CompletionHandler<Integer,? super A> handler);

By the way, also paste the interface of two versions of write operation:

public abstract Future<Integer> write(ByteBuffer src, long position);

public abstract <A> void write(ByteBuffer src,
                               long position,
                               A attachment,
                               CompletionHandler<Integer,? super A> handler);

We can see that AIO's reading and writing are mainly dealing with Buffer, which is in line with NIO.

In addition, a method for brushing data in memory to disk is provided.

public abstract void force(boolean metaData) throws IOException;

Because we write files, the operating system does not directly operate on files. The system caches them and periodically brushes them to disk. This method can be invoked if you want to write the data to disk in time so as to avoid losing part of the data caused by power failure. If the parameter is set to true, it means that the file attribute information is also updated to disk.

In addition, it also provides the function of locking files. We can lock some data of files, so that we can perform exclusive operations.

public abstract Future<FileLock> lock(long position, long size, boolean shared);

Position is the starting position of the content to be locked, size indicates the size of the area to be locked, and share indicates whether a shared lock or an exclusive lock is needed

Of course, you can also use the version of the callback function:

public abstract <A> void lock(long position,
                              long size,
                              boolean shared,
                              A attachment,
                              CompletionHandler<FileLock,? super A> handler);

The tryLock method is also provided in the file locking function, which returns the result quickly:

public abstract FileLock tryLock(long position, long size, boolean shared)
    throws IOException;

This is a simple way to try to get a lock. If the region is locked by other threads or other applications, return null immediately or FileLock object.

The operation of Asynchronous File Channel is basically the same as the interfaces described above. It's still relatively simple. There's less nonsense here and it's over sooner.

AsynchronousServerSocketChannel

This class corresponds to Server Socket Channel of non-blocking IO. You can use it by analogy.

Let's talk less nonsense and use code to talk about things.

package com.javadoop.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class Server {

    public static void main(String[] args) throws IOException {

      	// Instantiate and listen on ports
        AsynchronousServerSocketChannel server =
                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));

        // Define an Attachment class for passing some information
        Attachment att = new Attachment();
        att.setServer(server);

        server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Attachment att) {
                try {
                    SocketAddress clientAddr = client.getRemoteAddress();
                    System.out.println("Receive a new connection:" + clientAddr);

                    // After receiving a new connection, the server should call the accept method again and wait for the new connection to come in.
                    att.getServer().accept(att, this);

                    Attachment newAtt = new Attachment();
                    newAtt.setServer(server);
                    newAtt.setClient(client);
                    newAtt.setReadMode(true);
                    newAtt.setBuffer(ByteBuffer.allocate(2048));

                    // Anonymous implementation classes can also continue to be used here, but the code is ugly, so a class is specifically defined here.
                    client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable t, Attachment att) {
                System.out.println("accept failed");
            }
        });
        // To prevent main threads from exiting
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
        }
    }
}

Look at the ChannelHandler class:

package com.javadoop.aio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;

public class ChannelHandler implements CompletionHandler<Integer, Attachment> {

    @Override
    public void completed(Integer result, Attachment att) {
        if (att.isReadMode()) {
            // Read data from the client
            ByteBuffer buffer = att.getBuffer();
            buffer.flip();
            byte bytes[] = new byte[buffer.limit()];
            buffer.get(bytes);
            String msg = new String(buffer.array()).toString().trim();
            System.out.println("Receive data from the client: " + msg);

            // Respond to client request and return data
            buffer.clear();
            buffer.put("Response from server!".getBytes(Charset.forName("UTF-8")));
            att.setReadMode(false);
            buffer.flip();
            // Writing data to the client is also asynchronous
            att.getClient().write(buffer, att, this);
        } else {
            // Here, it means that writing data to the client is over. There are two choices:
            // 1. Continue to wait for the client to send new data.
//            att.setReadMode(true);
//            att.getBuffer().clear();
//            att.getClient().read(att.getBuffer(), att, this);
            // 2. Since the server has returned data to the client, disconnect this connection.
            try {
                att.getClient().close();
            } catch (IOException e) {
            }
        }
    }

    @Override
    public void failed(Throwable t, Attachment att) {
        System.out.println("Disconnection");
    }
}

By the way, paste a custom Attachment class:

public class Attachment {
    private AsynchronousServerSocketChannel server;
    private AsynchronousSocketChannel client;
    private boolean isReadMode;
    private ByteBuffer buffer;
    // getter & setter
}

In this way, a simple server is written, and then the client request can be received. All of the above are callback functions. If the reader is interested, you can try to write one using Future.

AsynchronousSocketChannel

In fact, after talking about Asynchronous Server Socket Channel above, basically readers will know how to use Asynchronous Socket Channel, which is basically similar to non-blocking IO.

Here's a simple demonstration so that readers can test and use it in conjunction with Server introduced earlier.

package com.javadoop.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class Client {

    public static void main(String[] args) throws Exception {
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
      	// In Future form
        Future<?> future = client.connect(new InetSocketAddress(8080));
        // Block it, wait for the connection to succeed
        future.get();

        Attachment att = new Attachment();
        att.setClient(client);
        att.setReadMode(false);
        att.setBuffer(ByteBuffer.allocate(2048));
        byte[] data = "I am obot!".getBytes();
        att.getBuffer().put(data);
        att.getBuffer().flip();

        // Asynchronous sending of data to server
        client.write(att.getBuffer(), att, new ClientChannelHandler());

        // Take a break here and quit again, giving you enough time to process the data.
        Thread.sleep(2000);
    }
}

Look inside at the ClientChannelHandler class:

package com.javadoop.aio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;

public class ClientChannelHandler implements CompletionHandler<Integer, Attachment> {

    @Override
    public void completed(Integer result, Attachment att) {
        ByteBuffer buffer = att.getBuffer();
        if (att.isReadMode()) {
            // Read data from the server
            buffer.flip();
            byte[] bytes = new byte[buffer.limit()];
            buffer.get(bytes);
            String msg = new String(bytes, Charset.forName("UTF-8"));
            System.out.println("Receive response data from the server: " + msg);

            // Next, there are two options:
            // 1. Send new data to the server
//            att.setReadMode(false);
//            buffer.clear();
//            String newMsg = "new message from client";
//            byte[] data = newMsg.getBytes(Charset.forName("UTF-8"));
//            buffer.put(data);
//            buffer.flip();
//            att.getClient().write(buffer, att, this);
            // 2. Close the connection
            try {
                att.getClient().close();
            } catch (IOException e) {
            }
        } else {
            // When the write operation is complete, it will come in here.
            att.setReadMode(true);
            buffer.clear();
            att.getClient().read(buffer, att, this);
        }
    }

    @Override
    public void failed(Throwable t, Attachment att) {
        System.out.println("Server no response");
    }
}

The above code can be run and debugged, if the reader encounters problems, please leave a message in the comment area.

Asynchronous Channel Groups

For the sake of knowledge integrity, it is necessary to introduce group, that is, Asynchronous Channel Group. As we said before, there must be a thread pool for asynchronous IO, which is responsible for receiving tasks, handling IO events, callbacks, etc. This thread pool is inside the group. Once the group closes, the corresponding thread pool closes.

Asynchronous Server Socket Channels and Asynchronous Socket Channels belong to group. When we call the open() method of Asynchronous Server Socket Channel or Asynchronous Socket Channel, the corresponding channel belongs to the default group, which is automatically constructed and managed by JVM.

If we want to configure this default group, we can specify the following system variables in the JVM startup parameters:

  • java.nio.channels.DefaultThreadPool.threadFactory

    This system variable is used to set the ThreadFactory, which should be the fully qualified class name of the java.util.concurrent.ThreadFactory implementation class. Once we specify this ThreadFactory, threads in the group will use this class to generate.

  • java.nio.channels.DefaultThreadPool.initialSize

    This system variable is also well understood to set the initial size of the thread pool.

Maybe you want to use your own defined group so that you can have more control over the threads in it. You can use the following methods:

  • AsynchronousChannelGroup.withCachedThreadPool(ExecutorService executor, int initialSize)
  • AsynchronousChannelGroup.withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
  • AsynchronousChannelGroup.withThreadPool(ExecutorService executor)

Readers familiar with thread pools should have a good understanding of these methods, which are static methods in Asynchronous Channel Group.

As for the use of group, it's easy to understand the code at first glance.

AsynchronousChannelGroup group = AsynchronousChannelGroup
        .withFixedThreadPool(10, Executors.defaultThreadFactory());
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group);

Asynchronous File Channels do not belong to a group. But they are also associated with a thread pool. If not specified, the default thread pool will be used. If you want to use the specified thread pool, you can use the following methods when instantiating:

public static AsynchronousFileChannel open(Path file,
                                           Set<? extends OpenOption> options,
                                           ExecutorService executor,
                                           FileAttribute<?>... attrs) {
    ...
}

Here, asynchronous IO is done.

Summary

I think this article should clarify the non-blocking IO and asynchronous IO. For asynchronous IO, there is less information on the Internet, so there is more space.

We also need to know that if we understand these, we can learn some things and know more about them, but we seldom translate these knowledge into engineering code in our work. Generally speaking, we need to use NIO or AIO in network applications to improve performance. However, in engineering, we do not understand some concepts, know some interfaces, and need to deal with a lot of details.

That's why Netty/Mina is so popular, because they help encapsulate a lot of details and provide us with user-friendly interfaces. I'll introduce Netty later.

(End of the text)

Keywords: Java socket Windows JDK

Added by Nile on Sat, 10 Aug 2019 17:16:59 +0300