Interviewer: how to make Java programs support millions of long connections under single machine?

1. Simulate stand-alone connection bottleneck

As we know, a server is usually bound to a port, such as port 8000. Of course, the client connection port is limited. Except for the maximum port 65535 and the default port 1024 and below, there are only 1024 ~ 65535. After deducting some common ports, there are only about 60000 actually available ports. So, how do we realize single machine million connection? Assume that [8 000, 8 100) of these 100 ports are started on the server, 100 × 60000 can realize about 6 million connections, which is a basic knowledge of TCP. Although it is the same port number for the client, it is different port numbers for the server. Because TCP is a concept of private source group, that is, it is determined by the source IP address, source slogan, destination IP address and destination port number. When the source IP address and source port number are the same However, if the destination port numbers are different, the bottom layer of the system will eventually treat it as two TCP connections. Therefore, 100 port numbers are opened for the server, which is the preparation for a single million connection, as shown in the figure below.

Ports 1024 and below of a single machine can only be reserved for ROOT. The range of client ports is 1 02565 535. Next, use code to realize the simulation scenario of one million connection of a single machine. First look at the server class, cycle open [8 0008 100) these 100 listening ports and wait for the client to connect. The following code has been written with Netty as an example.

package com.tom.netty.connection;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author Tom
 */
public final class Server {
    public static final int BEGIN_PORT = 8000;
    public static final int N_PORT = 8100;

    public static void main(String[] args) {
        new Server().start(Server.BEGIN_PORT, Server.N_PORT);
    }

    public void start(int beginPort, int nPort) {
        System.out.println("The server is starting...");

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);

        bootstrap.childHandler(new ConnectionCountHandler());


        for (int i = 0; i <= (nPort - beginPort); i++) {
            final int port = beginPort + i;

            bootstrap.bind(port).addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    System.out.println("Successfully bound listening port: " + port);
                }
            });
        }
        System.out.println("Server started!");
    }
}

Then look at the implementation logic of ConnectionCountHandler class, which is mainly used to count the number of requests per unit time. For each connection, a number will be added automatically and counted every 2s. The code is as follows.

package com.tom.netty.connection;


import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Tom.
 */
@ChannelHandler.Sharable
public class ConnectionCountHandler extends ChannelInboundHandlerAdapter {

    private AtomicInteger nConnection = new AtomicInteger();

    public ConnectionCountHandler() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
            public void run() {
                System.out.println("Current number of client connections: " + nConnection.get());
            }
        },0, 2, TimeUnit.SECONDS);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        nConnection.incrementAndGet();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        nConnection.decrementAndGet();
    }

}

Look at the client code. The main function is to send requests to the 100 ports opened by the server in turn until the server does not respond and the thread hangs. The code is as follows.

package com.tom.netty.connection;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * Created by Tom.
 */
public class Client {

    private static final String SERVER_HOST = "127.0.0.1";

    public static void main(String[] args) {
        new Client().start(Server.BEGIN_PORT, Server.N_PORT);
    }

    public void start(final int beginPort, int nPort) {
        System.out.println("Client started...");
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_REUSEADDR, true);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
            }
        });


        int index = 0;
        int port;
        while (!Thread.interrupted()) {

            port = beginPort + index;
            try {
                ChannelFuture channelFuture = bootstrap.connect(SERVER_HOST, port);
                channelFuture.addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            System.out.println("Connection failed, program closed!");
                            System.exit(0);
                        }
                    }
                });
                channelFuture.get();
            } catch (Exception e) {
            }

            if (port == nPort) { index = 0; }else { index ++; }
        }
    }
}

Finally, package and publish the server program to a Linux server, and package and publish the client program to another Linux server. Next, start the server and client programs respectively. After running for a period of time, you will find that the number of connections monitored by the server is fixed at a value and does not change, as shown below.

Current number of client connections: 870
 Current number of client connections: 870
 Current number of client connections: 870
 Current number of client connections: 870
 Current number of client connections: 870
 Current number of client connections: 870
 Current number of client connections: 870
 Current number of client connections: 870
 Current number of client connections: 870
...

And throw the following exception.

Exception in thread "nioEventLoopGroup-2-1" java.lang.InternalError: java.io.FileNotFoundException: /usr/java/jdk1.8.0_121/jre/lib/ext/cldrdata.jar (Too many open files)
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1040)
        at sun.misc.URLClassPath.getResource(URLClassPath.java:239)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.util.ResourceBundle$RBClassLoader.loadClass(ResourceBundle.java:503)
        at java.util.ResourceBundle$Control.newBundle(ResourceBundle.java:2640)
        at java.util.ResourceBundle.loadBundle(ResourceBundle.java:1501)
        at java.util.ResourceBundle.findBundle(ResourceBundle.java:1465)
        at java.util.ResourceBundle.findBundle(ResourceBundle.java:1419)
        at java.util.ResourceBundle.getBundleImpl(ResourceBundle.java:1361)
        at java.util.ResourceBundle.getBundle(ResourceBundle.java:845)
        at java.util.logging.Level.computeLocalizedLevelName(Level.java:265)
        at java.util.logging.Level.getLocalizedLevelName(Level.java:324)
        at java.util.logging.SimpleFormatter.format(SimpleFormatter.java:165)
        at java.util.logging.StreamHandler.publish(StreamHandler.java:211)
        at java.util.logging.ConsoleHandler.publish(ConsoleHandler.java:116)
        at java.util.logging.Logger.log(Logger.java:738)
        at io.netty.util.internal.logging.JdkLogger.log(JdkLogger.java:606)
        at io.netty.util.internal.logging.JdkLogger.warn(JdkLogger.java:482)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run (SingleThreadEventExecutor.java:876)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run (DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)

At this time, we should know that this is the bottleneck value of the number of client connections that the server can accept, that is, the server can support up to 870 connections. The next thing to do is to find a way to break through this bottleneck so that a single server can support 1 million connections. This is a very exciting thing.

2. Single machine million connection optimization solution

2.1 break through the limit of local file handle

First, enter the command on the server to see the maximum number of handles that a single process can support.

ulimit -n

After entering the command, a number of 1024 will appear, indicating the maximum number of files that can be opened by a process in the Linux system. Since opening a TCP connection will create a file in the Linux system, it is limited by the maximum number of files in this file. Why is the number of server connections in the previous demonstration finally fixed at 870, which is smaller than 1024? In fact, it is because in addition to connection The number of connections and the files opened by the JVM. Class class is also counted as the files opened in the process. Therefore, after subtracting the number of files opened by the JVM from 1024, the remaining number is the number of connections that TCP can support. Next, find a way to break this limit. First, enter the following command on the server command line and open the / etc/security/limits.conf file.

sudo vi /etc/security/limits.conf

Then add the following two lines of code at the end of the file.

* hard nofile 1000000
* soft nofile 1000000

The front * indicates the current user, hard and soft respectively indicate the limit and warning limit, nofile indicates the maximum number of files, and the rear number 1000 indicates that any user can open 1 million files, which is also the maximum value supported by the operating system, as shown in the following figure.

Next, enter the following command.

ulimit -n

At this time, we find that it is still 1024. Restart the server. Re run the server and client programs respectively. At this time, just quietly observe the changes in the number of connections. Finally, the number of connections stays at 137 920, and an exception is thrown, as shown below.

Current number of client connections: 137920
 Current number of client connections: 137920
 Current number of client connections: 137920
 Current number of client connections: 137920
 Current number of client connections: 137920
Exception in thread "nioEventLoopGroup-2-1" java.lang.InternalError: java.io.FileNotFoundException: /usr/java/jdk1.8.0_121/jre/lib/ext/cldrdata.jar (Too many open files)
...

Why? There must be a limit on the number of connections. If you want to break this limit, you need to break through the limit on the number of global file handles.

2.2 break through the limit of global file handle

First, enter the following command on the Linux command line to view the number of files that can be opened by all user processes in the Linux system.

cat /proc/sys/fs/file-max

You can see the global limit through the above command, and the result is 10000. It is conceivable that the number of local file handles cannot be greater than the number of global file handles. Therefore, you must increase the global limit of file handles to break through this limit. First, switch to the ROOT user, or you will not have permission.

sudo  -s
echo 2000> /proc/sys/fs/file-max
exit

Let's change it to 20000 to test and continue the test. Start the server program and the client program respectively and find that the number of connections has exceeded the limit of 20000. If you use echo to configure / proc / sys / FS / file Max earlier, the server will fail to restart and return to the original 10000. Therefore, directly modify it with vi command and enter the following command line.

sodu vi /etc/sysctl.conf

Add the following at the end of the / etc/sysctl.conf file.

fs.file-max=1000000

The results are shown in the figure below.

Next, restart the Linux server, and then start the server program and client program.

Current number of client connections: 9812451
 Current number of client connections: 9812462
 Current number of client connections: 9812489
 Current number of client connections: 9812501
 Current number of client connections: 9812503
...

The final number of connections is about 980000. We find that it is mainly limited by the performance of the machine itself. Check with the htop command and find that the CPU is close to 100%, as shown in the figure below.

The above is the tuning and performance improvement at the operating system level. The following mainly introduces the tuning based on Netty application level.

3 Netty application level performance tuning

3.1 recurrence of performance bottlenecks at netty application level

First, let's take a look at the application scenario. Here is a standard server-side application code.


package com.tom.netty.thread;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;

/**
 * Created by Tom.
 */
public class Server {

    private static final int port = 8000;

    public static void main(String[] args) {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EventLoopGroup businessGroup = new NioEventLoopGroup(1000);

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.SO_REUSEADDR, true);


        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                //Custom length decoding, sending length data of one long type at a time
                //The timestamp of the system is passed one at a time
                ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES));
                ch.pipeline().addLast(businessGroup, ServerHandler.INSTANCE);
            }
        });


        ChannelFuture channelFuture = bootstrap.bind(port).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                System.out.println("The server is started successfully, and the binding port is: " + port);
            }
        });
    }

}

We focus on the server-side logical processing ServerHandler class.

package com.tom.netty.thread;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.ThreadLocalRandom;

/**
 * Created by Tom.
 */
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    public static final ChannelHandler INSTANCE = new ServerHandler();


    //channelread0 is the main thread
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        ByteBuf data = Unpooled.directBuffer();
        //Read a timestamp from the client
        data.writeBytes(msg);
        //Simulate a business process, which may be database operation or logical processing
        Object result = getResult(data);
        //Write back to client
        ctx.channel().writeAndFlush(result);
    }

    //Simulate going to the database to get a result
    protected Object getResult(ByteBuf data) {

        int level = ThreadLocalRandom.current().nextInt(1, 1000);

        //The time required for each response is calculated as the reference data of QPS

        //90.0% == 1ms   1000 100 > 1ms
        int time;
        if (level <= 900) {
            time = 1;
        //95.0% == 10ms    1000 50 > 10ms
        } else if (level <= 950) {
            time = 10;
        //99.0% == 100ms    1000 10 > 100ms
        } else if (level <= 990) {
            time = 100;
        //99.9% == 1000ms    1000 1 > 1000ms
        } else {
            time = 1000;
        }

        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
        }

        return data;
    }

}

There is a getResult() method in the above code. The getResult() method can be regarded as a method to query data in the database and return the results of each query to the client. In fact, in order to simulate the performance of query data, the parameter passed in by getResult() is the timestamp passed by the client, and the final value returned is the value passed by the client. Just before returning, a random thread hibernation process is performed to simulate the real business processing performance. The following table shows the performance parameters of the simulation scenario.

Proportion of business interfaces for data processing

Processing time

90%

1ms

95%

10ms

99%

100ms

99.9%

1000ms

Let's look at the client, which is also a section of standard code.

package com.tom.netty.thread;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;

/**
 * Created by Tom.
 */
public class Client {

    private static final String SERVER_HOST = "127.0.0.1";

    public static void main(String[] args) throws Exception {
        new Client().start(8000);
    }

    public void start(int port) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new FixedLengthFrameDecoder(Long.BYTES));
                        ch.pipeline().addLast(ClientHandler.INSTANCE);
                    }
        });

        //The client sends 1 message to the server every second   000 requests
        for (int i = 0; i < 1000; i++) {
            bootstrap.connect(SERVER_HOST, port).get();
        }
    }
}

As can be seen from the above code, the client will send 1000 requests to the server. Focus on the ClientHandler class that the client logic handles.

package com.tom.netty.thread;


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by Tom.
 */
@ChannelHandler.Sharable
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    public static final ChannelHandler INSTANCE = new ClientHandler();

    private static AtomicLong beginTime = new AtomicLong(0);
    //Total response time
    private static AtomicLong totalResponseTime = new AtomicLong(0);
    //Total requests
    private static AtomicInteger totalRequest = new AtomicInteger(0);

    public static final Thread THREAD = new Thread(){
        @Override
        public void run() {
            try {
                while (true) {
                    long duration = System.currentTimeMillis() - beginTime.get();
                    if (duration != 0) {
                        System.out.println("QPS: " + 1000 * totalRequest.get() / duration + ", " + "Average response time: " + ((float) totalResponseTime.get()) / totalRequest.get() + "ms.");
                        Thread.sleep(2000);
                    }
                }

            } catch (InterruptedException ignored) {
            }
        }
    };

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        ctx.executor().scheduleAtFixedRate(new Runnable() {
            public void run() {
                ByteBuf byteBuf = ctx.alloc().ioBuffer();
                //Send the current system time to the server
                byteBuf.writeLong(System.currentTimeMillis());
                ctx.channel().writeAndFlush(byteBuf);
            }
        }, 0, 1, TimeUnit.SECONDS);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
        //Get a response time difference, the response time of this request
        totalResponseTime.addAndGet(System.currentTimeMillis() - msg.readLong());
        //Each self increment
        totalRequest.incrementAndGet();

        if (beginTime.compareAndSet(0, System.currentTimeMillis())) {
            THREAD.start();
        }
    }

}

The above code mainly simulates the processing time of Netty's real business environment. The QPS is about 1000 times and is counted every 2s. Next, start the server and client to view the console log. First run the server and see the console log, as shown in the figure below.

Then run the client and see the console log as shown in the figure below. After a period of time, it is found that the QPS is kept within 1000 times, and the average response time is getting longer and longer.

Go back to the getResult() method of the server-side serverhandler. In the getResult() method, a thread sleeps and blocks. It is not difficult to find that it will eventually block the main thread and squeeze all requests into one thread. If you put the following code into the thread pool, the effect will be completely different.

Object result =getResult(data);
ctx.channel().wrteAndFlush(result);

Put these two lines of code into the business thread pool, run continuously in the background, and return the results immediately after running.

3.2 Netty application level performance tuning scheme

Let's modify the code and create a serverthreadpoolhandler class in the server code.

package com.tom.netty.thread;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by Tom.
 */
@ChannelHandler.Sharable
public class ServerThreadPoolHandler extends ServerHandler {
    public static final ChannelHandler INSTANCE = new ServerThreadPoolHandler();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(1000);


    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, ByteBuf msg) {
        final ByteBuf data = Unpooled.directBuffer();
        data.writeBytes(msg);
        threadPool.submit(new Runnable() {
            public void run() {
                Object result = getResult(data);
                ctx.channel().writeAndFlush(result);
            }
        });

    }
}

Then register the Handler on the server side as serverthreadpoolhandler and delete the original ServerHandler. The code is as follows.

ch.pipeline().addLast(ServerThreadPoolHandler.INSTANCE);

Then, start the server and client programs to view the console log, as shown in the figure below.

The final time consumption is stable at about 15ms, and the QPS is more than 1000 times. In fact, this result is not the optimal state. Continue to adjust. Adjust the number of threads of serverthreadpoolhandler to 20. The code is as follows.

    public static final ChannelHandler INSTANCE = new ServerThreadPoolHandler();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(20);

Then start the program and find that the difference in average response time is not too much, as shown in the figure below.

The conclusion is that the specific number of threads needs to be continuously adjusted and tested in the real environment in order to determine the most appropriate value. This chapter aims to tell you the optimization method, not the results.

If this article is helpful to you, you are welcome to pay attention and praise; If you have any suggestions, you can also leave comments or private letters. Your support is the driving force for me to adhere to my creation.

Keywords: Java Back-end architecture

Added by zszucs on Tue, 26 Oct 2021 08:37:38 +0300