After writing Application of high performance queue Disruptor in testing and Ten million log playback engine design draft video version Since then, I have been preparing the performance test of several high-performance message queues in Java & go language, including several benchmark test scenarios and application scenarios in the performance test.
The idea of test scenario design refers to two aspects:
- The size of the message body is different from that of the GET request
- The number of producer and consumer threads is called coroutine in Go language
PS: in subsequent articles, if threads appear in Go language articles, they all refer to goroutine.
conclusion
Overall, Java util. concurrent. The performance of linkedblockingqueue is still at the level of 500000 QPS, which meets the current pressure test requirements. The only thing to avoid is the unstable performance when the queue is long. To sum up, there are three general references:
- The message body should be as small as possible
- Limited thread gain
- Try to avoid message backlog
brief introduction
First, let's introduce the first tested object Java util. concurrent. Linkedblockingqueue, which can be obtained by decomposing the name, is a blocking one-way object implemented by a linked list. The official definition is:
Optional bounded blocking queue based on linked nodes. This queue sorts elements in FIFO (first in first out). The head of the queue is the longest element in the queue. The end of the queue is the element with the shortest time in the queue. The new element is inserted into the tail of the queue, and the queue retrieval operation obtains the element of the queue head. Linked queues typically have higher throughput than array based queues, but can be more unpredictable in most concurrent applications.
Among the several queue implementation classes that I found in JDK, Java util. concurrent. Linkedblockingqueue has the highest performance, and there is also a candidate class java util. concurrent. Arrayblockingqueue, the data says Java util. concurrent. The performance of blockedqueue is probably Java util. concurrent. The performance of arrayblockingqueue is 2 ~ 3 times that of arrayblockingqueue. The gap is too obvious. I have a chance to test it again.
test result
Here, only the number of messages (objects) processed per millisecond is recorded as the only criterion for evaluating performance.
Data description
Here I use three kinds of org apache. http. client. methods. Httpget and creation methods all use native API s. In order to distinguish between cases, I will add some header and URL length to the response.
Small objects:
def get = new HttpGet()
Objects in:
def get = new HttpGet(url) get.addHeader("token", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION)
Large objects:
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION)
producer
Object size | Queue length (million) | Number of threads | Rate (/ ms) |
---|---|---|---|
Small | 1 | 1 | 838 |
Small | 1 | 5 | 837 |
Small | 1 | 10 | 823 |
Small | 5 | 1 | 483 |
Small | 10 | 1 | 450 |
in | 1 | 1 | 301 |
in | 1 | 5 | 322 |
in | 1 | 10 | 320 |
in | 1 | 20 | 271 |
in | 5 | 1 | fail |
in | 10 | 1 | fail |
in | 0.5 | 1 | 351 |
in | 0.5 | 5 | 375 |
large | 1 | 1 | 214 |
large | 1 | 5 | 240 |
large | 1 | 10 | 241 |
large | 0.5 | 1 | 209 |
large | 0.5 | 5 | 250 |
large | 0.5 | 10 | 246 |
large | 0.2 | 1 | 217 |
large | 0.2 | 5 | 309 |
large | 0.2 | 10 | 321 |
large | 0.2 | 20 | 243 |
The two tests in the middle failed because the waiting time was too long. When it reached about 3 million, it began to stagnate, so I gave up.
For org apache. http. client. methods. The conclusion of httprequestbase message body is as follows:
- The length remains on the order of 100000
- Number of producer threads: 5-10 threads
- The message body should be as small as possible
consumer
Object size | Queue length (million) | Number of threads | Rate (/ ms) |
---|---|---|---|
Small | 1 | 1 | 1893 |
Small | 1 | 5 | 1706 |
Small | 1 | 10 | 1594 |
Small | 1 | 20 | 1672 |
Small | 2 | 1 | 2544 |
Small | 2 | 5 | 2024 |
Small | 5 | 1 | 3419 |
in | 1 | 1 | 1897 |
in | 1 | 5 | 1485 |
in | 1 | 10 | 1345 |
in | 1 | 20 | 1430 |
in | 2 | 1 | 2971 |
in | 2 | 5 | 1576 |
large | 1 | 1 | 1980 |
large | 1 | 5 | 1623 |
large | 1 | 10 | 1689 |
large | 0.5 | 1 | 1136 |
large | 0.5 | 5 | 1096 |
large | 0.5 | 10 | 1072 |
For org apache. http. client. methods. The conclusion of httprequestbase message body is as follows:
- The longer the data, the better
- The fewer threads the consumer has, the better
- The message body should be as small as possible
This is a little different from the producer standard. Basically, the fewer lock competition, the better, and the more test messages, the better (temporarily unavailable in this work).
Producer & Consumer
The number of threads here refers to the number of producers or consumers, and the total number of threads is twice this value.
Object size | Times (million) | Number of threads | Queue length (million) | Rate (/ ms) |
---|---|---|---|---|
Small | 1 | 1 | 0.1 | 1326 |
Small | 1 | 1 | 0.2 | 1050 |
Small | 1 | 1 | 0.5 | 1054 |
Small | 1 | 5 | 0.1 | 1091 |
Small | 1 | 10 | 0.1 | 1128 |
Small | 2 | 1 | 0.1 | 1798 |
Small | 2 | 1 | 0.2 | 1122 |
Small | 2 | 5 | 0.2 | 946 |
Small | 5 | 5 | 0.1 | 1079 |
Small | 5 | 10 | 0.1 | 1179 |
in | 1 | 1 | 0.1 | 632 |
in | 1 | 1 | 0.2 | 664 |
in | 1 | 5 | 0.2 | 718 |
in | 1 | 10 | 0.2 | 683 |
in | 2 | 1 | 0.2 | 675 |
in | 2 | 5 | 0.2 | 735 |
in | 2 | 10 | 0.2 | 788 |
in | 2 | 15 | 0.2 | 828 |
large | 1 | 1 | 0.1 | 505 |
large | 1 | 1 | 0.2 | 558 |
large | 1 | 5 | 0.2 | 609 |
large | 1 | 10 | 0.2 | 496 |
large | 2 | 1 | 0.2 | 523 |
large | 2 | 5 | 0.2 | 759 |
large | 2 | 10 | 0.2 | 668 |
For org apache. http. client. methods. The conclusion of httprequestbase message body is as follows:
- The fewer messages the message queue accumulates, the faster the rate
- The consumption rate is getting faster and faster over time, which is not obvious
- The message body should be as small as possible
test case
The test cases are written in Groovy language. Since I customized the asynchronous keyword fun and reviewed the syntax of closures, I feel like I'm on the light. I'm a little fascinated by all kinds of multithreaded syntax implementations. So this use case may look familiar to Java students. It's a little hard to read carefully. I'll try to write some comments. You can put the end point on the test results, which can be used in Java util. concurrent. The linkedblockingqueue class has a basic reference.
The test examples will be fine tuned according to the above test scenarios, such as the number of threads, the size of message body objects, etc. I will focus on the test of three use case scenarios. Of course, the use scenario in work is certainly much more complex than the three I mentioned. If you are interested, you can test it yourself. I won't teach you here.
Producer scenario
package com.funtest.groovytest import com.funtester.config.HttpClientConstant import com.funtester.frame.SourceCode import com.funtester.utils.CountUtil import com.funtester.utils.Time import org.apache.http.client.methods.HttpGet import org.apache.http.client.methods.HttpRequestBase import java.util.concurrent.CountDownLatch import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.atomic.AtomicInteger class QueueT extends SourceCode { static AtomicInteger index = new AtomicInteger(0) static int total = 100_0000 static int size = 10 static int threadNum = 1 static int piece = total / size static def url = "http://localhost:12345/funtester" static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester" public static void main(String[] args) { LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>() def start = Time.getTimeStamp() def latch = new CountDownLatch(threadNum) def ts = [] def barrier = new CyclicBarrier(threadNum + 1) def funtester = {//Method of creating asynchronous closure fun { barrier.await() while (true) { if (index.getAndIncrement() % piece == 0) { def l = Time.getTimeStamp() - start ts << l output("${formatLong(index.get())}Add total consumption ${formatLong(l)}") start = Time.getTimeStamp() } if (index.get() > total) break def get = new HttpGet(url) get.addHeader("token",token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) linkedQ.put(get) } latch.countDown() } } threadNum.times {funtester()} def st = Time.getTimeStamp() barrier.await() latch.await() def et = Time.getTimeStamp() outRGB("Rate per millisecond ${total / (et - st)}") outRGB(CountUtil.index(ts).toString()) } }
Consumer scenario
package com.funtest.groovytest import com.funtester.config.HttpClientConstant import com.funtester.frame.SourceCode import com.funtester.utils.CountUtil import com.funtester.utils.Time import org.apache.http.client.methods.HttpGet import org.apache.http.client.methods.HttpRequestBase import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger class QueueTconsume extends SourceCode { static AtomicInteger index = new AtomicInteger(1) static int total = 100_0000 static int size = 10 static int threadNum = 5 static int piece = total / size static def url = "http://localhost:12345/funtester" static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester" public static void main(String[] args) { LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>() def pwait = new CountDownLatch(10) def produces = { fun { while (true) { if (linkedQ.size() > total) break def get = new HttpGet(url) get.addHeader("token", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) linkedQ.add(get) } pwait.countDown() } } 10.times {produces()} pwait.await() outRGB("Data construction completed!${linkedQ.size()}") def start = Time.getTimeStamp() def barrier = new CyclicBarrier(threadNum + 1 ) def latch = new CountDownLatch(threadNum) def ts = [] def funtester = { fun { barrier.await() while (true) { if (index.getAndIncrement() % piece == 0) { def l = Time.getTimeStamp() - start ts << l output("${formatLong(index.get())}Total consumption ${formatLong(l)}") start = Time.getTimeStamp() } def poll = linkedQ.poll(100, TimeUnit.MILLISECONDS) if (poll == null) break } latch.countDown() } } threadNum.times {funtester()} def st = Time.getTimeStamp() barrier.await() latch.await() def et = Time.getTimeStamp() outRGB("Rate per millisecond ${total / (et - st)}") outRGB(CountUtil.index(ts).toString()) } }
Producer consumer scenario
Here I introduce another variable: the length of the initial queue. Before the use case runs, the queue will be filled in a single thread according to this length.
package com.funtest.groovytest import com.funtester.frame.SourceCode import com.funtester.utils.Time import org.apache.http.client.methods.HttpGet import org.apache.http.client.methods.HttpRequestBase import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger class QueueBoth extends SourceCode { static AtomicInteger index = new AtomicInteger(1) static int total = 500_0000 static int length = 50_0000 static int threadNum = 5 static def url = "http://localhost:12345/funtester" static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester" public static void main(String[] args) { LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>() def latch = new CountDownLatch(threadNum * 2) def barrier = new CyclicBarrier(threadNum * 2 + 1) def ts = [] def funtester = {f -> { fun { barrier.await() while (true) { if (index.getAndIncrement() > total) break f() } latch.countDown() } } } def produces = { def get = new HttpGet(url) get.addHeader("token", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) linkedQ.put(get) } length.times {produces()} threadNum.times { funtester produces funtester {linkedQ.poll(100, TimeUnit.MILLISECONDS)} } def st = Time.getTimeStamp() barrier.await() latch.await() def et = Time.getTimeStamp() outRGB("Rate per millisecond ${total / (et - st) / 2}") } }
supplement
Very unstable performance
There are two questions that need to be added, Java util. concurrent. The performance of linkedblockingqueue is very unstable during the test. Every time I print the log, I print the timestamp with 1 / 10 as the node. Here are some logs in the producer mode when the queue length is 1 million:
INFO-> 23.731 F-2 107,942 Add total consumption 523 INFO-> 23.897 F-10 200,061 Add total consumption 165 INFO-> 24.137 F-9 300,024 Add total consumption 239 INFO-> 24.320 F-2 400,037 Add total consumption 182 INFO-> 25.200 F-5 500,065 Add total consumption 879 INFO-> 25.411 F-2 600,094 Add total consumption 211 INFO-> 25.604 F-8 700,090 Add total consumption 193 INFO-> 26.868 F-1 800,047 Add total consumption 1,264 INFO-> 26.927 F-4 900,053 Add total consumption 57 INFO-> 28.454 F-3 1,000,009 Add total consumption 1,527 INFO-> 28.457 main Rate per millisecond 190.0779319521 INFO-> 28.476 main average value:524.0 ,1527 Max.0 ,minimum value:57.0 ,median:239.0 p99:1527.0 p95:1527.0 INFO-> 43.930 F-10 112,384 Add total consumption 385 INFO-> 44.072 F-9 200,159 Add total consumption 140 INFO-> 44.296 F-1 300,058 Add total consumption 223 INFO-> 44.445 F-7 400,075 Add total consumption 149 INFO-> 45.311 F-10 500,086 Add total consumption 866 INFO-> 45.498 F-8 600,080 Add total consumption 187 INFO-> 45.700 F-1 700,088 Add total consumption 202 INFO-> 45.760 F-9 800,057 Add total consumption 59 INFO-> 47.245 F-6 900,095 Add total consumption 1,485 INFO-> 47.303 F-6 1,000,009 Add total consumption 58 INFO-> 47.305 main Rate per millisecond 262.7430373095 INFO-> 47.320 main average value:375.4 ,1485 Max.0 ,minimum value:58.0 ,median:202.0 p99:1485.0 p95:1485.0 INFO-> 00.916 F-1 100,000 Add total consumption 568 INFO-> 01.269 F-1 200,000 Add total consumption 353 INFO-> 01.461 F-1 300,000 Add total consumption 192 INFO-> 01.635 F-1 400,000 Add total consumption 174 INFO-> 02.536 F-1 500,000 Add total consumption 899 INFO-> 02.777 F-1 600,000 Add total consumption 240 INFO-> 03.015 F-1 700,000 Add total consumption 237 INFO-> 03.107 F-1 800,000 Add total consumption 91 INFO-> 04.519 F-1 900,000 Add total consumption 1,412 INFO-> 05.940 F-1 1,000,000 Add total consumption 96 INFO-> 05.943 main Rate per millisecond 184.5358922310 INFO-> 05.959 main average value:426.2 ,1412 Max.0 ,minimum value:91.0 ,median:240.0 p99:1412.0 p95:1412.0
It can be seen that the maximum and minimum values can differ by more than ten times or even more than twenty times. This situation increases with the growth of the total length of the message queue. Most of them occur in the stage of 800000 ~ 1 million. If the length is reduced to 500000, this situation will be significantly improved. So there is an additional point: the message queue length should be as small as possible.
Benchmark test
The following is the test results of the production code of three message objects using the FunTester performance test framework.
Test object | Number of threads | Number (million) | Rate (/ ms) |
---|---|---|---|
Small | 1 | 1 | 5681 |
Small | 5 | 1 | 8010 |
Small | 5 | 5 | 15105 |
in | 1 | 1 | 1287 |
in | 5 | 1 | 2329 |
in | 5 | 5 | 4176 |
large | 1 | 1 | 807 |
large | 5 | 1 | 2084 |
large | 5 | 5 | 3185 |
The test cases are as follows:
package com.funtest.groovytest import com.funtester.base.constaint.FixedThread import com.funtester.config.HttpClientConstant import com.funtester.frame.execute.Concurrent import com.funtester.httpclient.FunLibrary import org.apache.http.client.methods.HttpGet class TTT extends FunLibrary { static int total = 100_0000 static int thread = 1 static int times = total / thread static def url = "http://localhost:12345/funtester" static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester" public static void main(String[] args) { RUNUP_TIME = 0 def tasks = [] thread.times {tasks << new FunTester(times)} new Concurrent(tasks,"Test producer code performance").start() } private static class FunTester extends FixedThread { FunTester(int limit) { super(null, limit, true) } @Override protected void doing() throws Exception { // def get = new HttpGet() // def get = new HttpGet(url) // get.addHeader("token", token) // get.addHeader(HttpClientConstant.USER_AGENT) // get.addHeader(HttpClientConstant.CONNECTION) def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) } @Override FixedThread clone() { return new FunTester(limit) } } }