JEP early adopter series 3 - non blocking principle of synchronous network IO using virtual threads

Relevant JEP:

Use virtual threads for network IO

The main goal of project room is to provide a JVM feature and API with easy-to-use, high-throughput, lightweight concurrency and a new programming model on the Java platform. This brings many interesting and exciting prospects, one of which is to simplify the code of network interaction while taking into account performance. Today's servers can handle the number of open socket connections, far exceeding the number of threads they can support, which brings both opportunities and challenges.

Unfortunately, it is difficult to write scalable code that interacts with the network. We generally use the synchronization API for coding, but after exceeding a certain threshold, the synchronization code will encounter a bottleneck and it is difficult to scale. Because such an API will block when performing I/O operations, and I/O operations will bind threads until the operation is ready, such as when trying to read data from a socket but there is no data to read. The current thread is an expensive resource in the Java platform, so it can't wait for the I/O operation to complete before releasing. In order to solve this limitation, we usually use asynchronous I/O or Ractor frameworks, because they can construct code that does not bind threads in I/O operations, but use callbacks or events to notify threads for processing when I/O operations are completed or ready.

Using asynchronous and non blocking APIs is more challenging than using synchronous APIs, in part because the code written with these APIs is more anti-human. The synchronization API is much easier to use; The code is easier to write, easier to read and easier to debug. Most of the information in the stack is useful when debugging. However, as mentioned earlier, code using synchronous API cannot scale and extend like asynchronous code, so we must make a difficult choice: choose simpler synchronous code and accept that it will not expand; Or choose more scalable asynchronous code and handle all the complexity. Neither is a good choice! The main purpose of project room is to make synchronous code flexible and scalable.

In this article, we will look at how the Java platform's network API works when invoked on a virtual thread. Only by understanding the underlying details can we use virtual threads better and more confidently.

Virtual thread (fiber)

Before further research, we need to know about the new thread in project room - virtual thread (also known as fiber).

Virtual threads are user threads that are managed by the JVM, not the operating system. Virtual threads occupy very few system resources. A JVM can accommodate millions of virtual threads. It is especially suitable for tasks with long blocking time and often waiting for IO.

Platform thread (that is, the threads of the current Java platform) correspond to the kernel threads of the operating system one by one. Platform threads usually have a very large stack and other resources maintained by the system. Virtual threads use a small group of platform threads as carrier threads. The code executed in virtual threads usually does not know the underlying thread. Locks and I/O operations will be accepted The scheduling point at which a load thread is rescheduled from one virtual thread to another. Virtual threads may be parked (for example, LockSupport.park()), making them unscheduled. A virtual thread that has been parked may be canceled (for example, LockSupport.unpark(Thread)), thus re enabling its scheduling.

Network API

There are two main network API s in the Java platform:

  1. Asynchronous - asynchronous serversocketchannel, asynchronous socketchannel
  2. Synchronization - Java net. Socket,java.net.ServerSocket,java.net.DatagramSocket,java.nio.channels.SocketChannel,java.nio.channels.ServerSocketChannel,java.nio.channels.DatagramChannel

The first type of asynchronous API creates an I/O operation that is completed at a later time, which may be completed on a thread other than the thread that starts the I/O operation. By definition, these APIs do not cause blocked system calls, so no special processing is required when running in a virtual thread

The second type of synchronization APIs is more interesting from the point of view of their behavior when running in virtual threads. In these APIs, NIO channel related can be configured as non blocking mode. This channel is usually implemented by I/O event notification mechanism, such as registering on the Selector to listen for events. Similar to the asynchronous network API, no additional processing is required for execution in the virtual thread, because I/O operations do not call blocked system calls themselves, which are left to the Selector. Finally, let's look at configuring channel into blocking mode and Java Net related APIs (we call this API synchronous blocking API here). The semantics of synchronous API requires that once the I/O operation is started, it completes or fails in the calling thread, and then returns control to the caller. However, what if the I/O operation is "not ready"? For example, there is no data to read at present.

Synchronous blocking API

The Java synchronous network API running in the virtual thread will switch the underlying native socket to non blocking mode. When the Java code enables an I/O request and the request is not completed immediately (the native socket returns EAGAIN - for "not ready" / "blocking"), the underlying socket will be registered to a JVM internal event notification mechanism (Poller), and the virtual thread will be parked. When the underlying I/O operation is ready (related events will reach Poller), the virtual thread will be unparked and the underlying socket operation will be retried.

Let's take a closer look at the principle with an example. First, we need to download the JDK of project room (address: http://jdk.java.net/loom/ ), and extract it for use.

Next, write the code:

//The Record object in Java 16 can be understood as a class containing two final attributes (url and response)
static record URLData (URL url, byte[] response) { }

static List<URLData> retrieveURLs(URL... urls) throws Exception {
  //Create virtual thread pool
  try (var executor = Executors.newVirtualThreadExecutor()) {
    //Generate the task of reading the getURL method for each url
    var tasks = Arrays.stream(urls)
            .map(url -> (Callable<URLData>)() -> getURL(url))
            .toList();
    //Submit the task, wait and return all results
    return executor.submit(tasks)
            .filter(Future::isCompletedNormally)
            .map(Future::join)
            .toList();
  }
}

//Read the contents of the url
static URLData getURL(URL url) throws IOException {
  try (InputStream in = url.openStream()) {
    return new URLData(url, in.readAllBytes());
  }
}

public static void main(String[] args) throws Exception {
    //Visiting google, because you know, will be slow
    List<URLData> urlData = retrieveURLs(new URL("https://www.google.com/"));
}

When we use retrieveURLs to access Google, it will be very slow to ensure that the stack can be collected. At the same time, it is not allowed to collect the stack with jstack (at present, jstack cannot collect the virtual thread stack, but only the stack carrying the thread). We need to collect it with JavaThread.dump in jcmd command. At the same time, in order to collect the stack we want, we need some small operations.

First, we break the point in the first line of the geturl (URL) method, where debug pauses. Then execute the command:

> jps
25496 LoomThreadMain
12512 Jps

> jcmd 25496 JavaThread.dump threads.txt -overwrite

Then continue to execute the program, execute the command, and collect the stack when the virtual thread performs I/O operations:

> jcmd 25496 JavaThread.dump threads2.txt -overwrite

Let's look at threads Txt file, in which the thread information we are concerned about is:

"main" #1
      java.base@17-loom/jdk.internal.misc.Unsafe.park(Native Method)
      java.base@17-loom/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue$Node.block(LinkedTransferQueue.java:470)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3470)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3441)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:669)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:616)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1286)
      java.base@17-loom/java.util.concurrent.ExecutorServiceHelper$BlockingQueueSpliterator.tryAdvance(ExecutorServiceHelper.java:197)
      java.base@17-loom/java.util.Spliterator.forEachRemaining(Spliterator.java:326)
      java.base@17-loom/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      java.base@17-loom/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      java.base@17-loom/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
      java.base@17-loom/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
      java.base@17-loom/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
      java.base@17-loom/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622)
      java.base@17-loom/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627)
      app//com.github.hashjang.LoomThreadMain.retrieveURLs(LoomThreadMain.java:43)
      app//com.github.hashjang.LoomThreadMain.main(LoomThreadMain.java:29)

"ForkJoinPool-1-worker-1" #27
      java.base@17-loom/java.lang.Continuation.run(Continuation.java:300)
      java.base@17-loom/java.lang.VirtualThread.runContinuation(VirtualThread.java:240)
      java.base@17-loom/java.lang.VirtualThread$$Lambda$25/0x0000000801053fc0.run(Unknown Source)
      java.base@17-loom/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
      java.base@17-loom/java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:373)
      java.base@17-loom/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
      java.base@17-loom/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1177)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1648)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1615)
      java.base@17-loom/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
      
"<unnamed>" #26 virtual
      java.base/java.util.concurrent.ConcurrentHashMap.transfer(ConcurrentHashMap.java:2431)
      java.base/java.util.concurrent.ConcurrentHashMap.addCount(ConcurrentHashMap.java:2354)
      java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1075)
      java.base/java.util.concurrent.ConcurrentHashMap.putIfAbsent(ConcurrentHashMap.java:1541)
      java.base/sun.util.locale.LocaleObjectCache.get(LocaleObjectCache.java:68)
      java.base/java.util.Locale.getInstance(Locale.java:841)
      java.base/java.util.Locale.forLanguageTag(Locale.java:1736)
      java.base/sun.util.locale.provider.LocaleProviderAdapter.toLocaleArray(LocaleProviderAdapter.java:323)
      java.base/sun.util.locale.provider.CalendarDataProviderImpl.getAvailableLocales(CalendarDataProviderImpl.java:63)
      java.base/java.util.spi.LocaleServiceProvider.isSupportedLocale(LocaleServiceProvider.java:217)
      java.base/sun.util.locale.provider.LocaleServiceProviderPool.findProviders(LocaleServiceProviderPool.java:306)
      java.base/sun.util.locale.provider.LocaleServiceProviderPool.getLocalizedObjectImpl(LocaleServiceProviderPool.java:274)
      java.base/sun.util.locale.provider.LocaleServiceProviderPool.getLocalizedObject(LocaleServiceProviderPool.java:256)
      java.base/sun.util.locale.provider.CalendarDataUtility.retrieveFirstDayOfWeek(CalendarDataUtility.java:76)
      java.base/java.util.Calendar.setWeekCountData(Calendar.java:3419)
      java.base/java.util.Calendar.<init>(Calendar.java:1612)
      java.base/java.util.GregorianCalendar.<init>(GregorianCalendar.java:738)
      java.base/java.util.Calendar$Builder.build(Calendar.java:1494)
      java.base/sun.util.locale.provider.CalendarProviderImpl.getInstance(CalendarProviderImpl.java:87)
      java.base/java.util.Calendar.createCalendar(Calendar.java:1697)
      java.base/java.util.Calendar.getInstance(Calendar.java:1661)
      java.base/java.text.SimpleDateFormat.initializeCalendar(SimpleDateFormat.java:680)
      java.base/java.text.SimpleDateFormat.<init>(SimpleDateFormat.java:624)
      java.base/java.text.SimpleDateFormat.<init>(SimpleDateFormat.java:603)
      java.base/sun.security.util.DisabledAlgorithmConstraints$DenyAfterConstraint.<clinit>(DisabledAlgorithmConstraints.java:695)
      java.base/sun.security.util.DisabledAlgorithmConstraints$Constraints.<init>(DisabledAlgorithmConstraints.java:424)
      java.base/sun.security.util.DisabledAlgorithmConstraints.<init>(DisabledAlgorithmConstraints.java:149)
      java.base/sun.security.ssl.SSLAlgorithmConstraints.<clinit>(SSLAlgorithmConstraints.java:49)
      java.base/sun.security.ssl.ProtocolVersion.<init>(ProtocolVersion.java:158)
      java.base/sun.security.ssl.ProtocolVersion.<clinit>(ProtocolVersion.java:41)
      java.base/sun.security.ssl.SSLContextImpl$AbstractTLSContext.<clinit>(SSLContextImpl.java:539)
      java.base/java.lang.Class.forName0(Native Method)
      java.base/java.lang.Class.forName(Class.java:375)
      java.base/java.security.Provider$Service.getImplClass(Provider.java:1937)
      java.base/java.security.Provider$Service.getDefaultConstructor(Provider.java:1968)
      java.base/java.security.Provider$Service.newInstanceOf(Provider.java:1882)
      java.base/java.security.Provider$Service.newInstanceUtil(Provider.java:1890)
      java.base/java.security.Provider$Service.newInstance(Provider.java:1865)
      java.base/sun.security.jca.GetInstance.getInstance(GetInstance.java:236)
      java.base/sun.security.jca.GetInstance.getInstance(GetInstance.java:164)
      java.base/javax.net.ssl.SSLContext.getInstance(SSLContext.java:184)
      java.base/javax.net.ssl.SSLContext.getDefault(SSLContext.java:110)
      java.base/javax.net.ssl.SSLSocketFactory.getDefault(SSLSocketFactory.java:83)
      java.base/javax.net.ssl.HttpsURLConnection.getDefaultSSLSocketFactory(HttpsURLConnection.java:334)
      java.base/javax.net.ssl.HttpsURLConnection.<init>(HttpsURLConnection.java:291)
      java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.<init>(HttpsURLConnectionImpl.java:81)
      java.base/sun.net.www.protocol.https.Handler.openConnection(Handler.java:62)
      java.base/sun.net.www.protocol.https.Handler.openConnection(Handler.java:57)
      java.base/java.net.URL.openConnection(URL.java:1093)
      java.base/java.net.URL.openStream(URL.java:1159)
      com.github.hashjang.LoomThreadMain.getURL(LoomThreadMain.java:48)
      com.github.hashjang.LoomThreadMain.lambda$retrieveURLs$0(LoomThreadMain.java:38)
      java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:295)
      java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
      java.base/java.util.concurrent.ThreadExecutor$TaskRunner.run(ThreadExecutor.java:385)
      java.base/java.lang.VirtualThread.run(VirtualThread.java:295)
      java.base/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:172)
      java.base/java.lang.Continuation.enter0(Continuation.java:372)
      java.base/java.lang.Continuation.enter(Continuation.java:365)

"< unnamed >" #26 virtual is the virtual thread created in our program, and it can be seen from the stack that the virtual thread is not in I/O operation. It can also be seen from the thread stack that the hosting thread of this virtual thread is "ForkJoinPool-1-worker-1" #27 It can be seen that the default hosting thread of the virtual thread is the thread in the common ForkJoinPool that will be started by default after Java 8. And it executes the work of the virtual thread through the Continuation class.

View threads2 txt:

"main" #1
      java.base@17-loom/jdk.internal.misc.Unsafe.park(Native Method)
      java.base@17-loom/java.util.concurrent.locks.LockSupport.park(LockSupport.java:371)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue$Node.block(LinkedTransferQueue.java:470)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3470)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3441)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:669)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:616)
      java.base@17-loom/java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1286)
      java.base@17-loom/java.util.concurrent.ExecutorServiceHelper$BlockingQueueSpliterator.tryAdvance(ExecutorServiceHelper.java:197)
      java.base@17-loom/java.util.Spliterator.forEachRemaining(Spliterator.java:326)
      java.base@17-loom/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      java.base@17-loom/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      java.base@17-loom/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
      java.base@17-loom/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
      java.base@17-loom/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
      java.base@17-loom/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622)
      java.base@17-loom/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627)
      app//com.github.hashjang.LoomThreadMain.retrieveURLs(LoomThreadMain.java:43)
      app//com.github.hashjang.LoomThreadMain.main(LoomThreadMain.java:29)
      
"ForkJoinPool-1-worker-1" #25
      java.base@17-loom/jdk.internal.misc.Unsafe.park(Native Method)
      java.base@17-loom/java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:449)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.awaitWork(ForkJoinPool.java:1719)
      java.base@17-loom/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1616)
      java.base@17-loom/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

"Read-Poller" #41
      java.base@17-loom/sun.nio.ch.WEPoll.wait(Native Method)
      java.base@17-loom/sun.nio.ch.WEPollPoller.poll(WEPollPoller.java:64)
      java.base@17-loom/sun.nio.ch.Poller.poll(Poller.java:196)
      java.base@17-loom/sun.nio.ch.Poller.lambda$startPollerThread$0(Poller.java:66)
      java.base@17-loom/sun.nio.ch.Poller$$Lambda$89/0x00000008010e5168.run(Unknown Source)
      java.base@17-loom/java.lang.Thread.run(Thread.java:1521)
      java.base@17-loom/jdk.internal.misc.InnocuousThread.run(InnocuousThread.java:161)
      
"<unnamed>" #24 virtual
      java.base/java.lang.Continuation.yield(Continuation.java:402)
      java.base/java.lang.VirtualThread.yieldContinuation(VirtualThread.java:367)
      java.base/java.lang.VirtualThread.park(VirtualThread.java:534)
      java.base/java.lang.System$2.parkVirtualThread(System.java:2373)
      java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:60)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:184)
      java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:212)
      java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:607)
      java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:331)
      java.base/java.net.Socket.connect(Socket.java:642)
      java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:299)
      java.base/sun.security.ssl.BaseSSLSocketImpl.connect(BaseSSLSocketImpl.java:174)
      java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:182)
      java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:497)
      java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:600)
      java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
      java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:380)
      java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:189)
      java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1232)
      java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1120)
      java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:175)
      java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1653)
      java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1577)
      java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:224)
      java.base/java.net.URL.openStream(URL.java:1159)
      com.github.hashjang.LoomThreadMain.getURL(LoomThreadMain.java:48)
      com.github.hashjang.LoomThreadMain.lambda$retrieveURLs$0(LoomThreadMain.java:38)
      java.base/java.util.concurrent.FutureTask.run(FutureTask.java:295)
      java.base/java.util.concurrent.ThreadExecutor$TaskRunner.run(ThreadExecutor.java:385)
      java.base/java.lang.VirtualThread.run(VirtualThread.java:295)
      java.base/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:172)
      java.base/java.lang.Continuation.enter0(Continuation.java:372)
      java.base/java.lang.Continuation.enter(Continuation.java:365)

It can be seen from the thread stack here that the virtual thread will call Java when performing I/O operations lang.Continuation. Yield gives up the resources of the hosting thread (equivalent to park). Check that the original hosting thread "ForkJoinPool-1-worker-1" #25 is indeed idle. Where is the I/O operation? This leads to the thread "read poller" #41.

This thread is a read poller shared by the JVM. Its core logic is to execute a basic event loop, Listen for all synchronous network read, connect and accept (accept network connection ready) operation. When these I/O operations are ready, the poller will be notified, and the virtual thread corresponding to unpark will continue to execute. At the same time, in addition to the read poller, there is also a write poller for writing events.

I use Windows to test, poller in Windows The underlying implementation is based on wepoll So we see that the stack contains WEPoll. For MacOS, yes kqueue , for Linux epoll

Poller maintains a map with the file descriptor of the virtual thread as the key. When a virtual thread registers its file descriptor on the poller, it will put the file descriptor of the virtual thread as the key and the virtual thread itself as the value into the map. When the relevant events in the poller's event loop are ready, find the corresponding virtual thread unpark in the map through the virtual thread file descriptor in the event.

Scalability

In brief, the above design is not much different from using NIO Channel and Selector. NIO Channel and Selector can be found in many server-side frameworks and libraries, such as Netty. However, NIO Channel and Selector provide a more complex model. User code must implement event loops and maintain application logic across I/O boundaries, while virtual thread provides a simpler and more intuitive programming model. The Java platform is responsible for scheduling tasks and maintaining corresponding contexts across I/O boundaries.

As we saw earlier, the default hosting thread of virtual thread is ForkJoinPool. This is a very suitable thread pool for virtual threads. The work stealing algorithm can schedule and run virtual threads to the extreme.

conclusion

The synchronous Java Network API has been re implemented, and the related JEP s include JEP 353 and JEP 373 . When running in a virtual thread, I/O operations that cannot be completed immediately will cause the virtual thread to be parsed . When I/O is ready, the virtual thread will be unparked. Compared with the current asynchronous non blocking I/O implementation code, this implementation is easier to use and hides many implementation details that businesses do not care about.

Added by akiratoya13 on Thu, 30 Dec 2021 23:55:05 +0200