LinkedBlockingQueue performance test of Java&Go high performance queue

After writing Application of high performance queue Disruptor in testing and Ten million log playback engine design draft video version Since then, I have been preparing the performance test of several high-performance message queues in Java & go language, including several benchmark test scenarios and application scenarios in the performance test.

The idea of test scenario design refers to two aspects:

  • The size of the message body is different from that of the GET request
  • The number of producer and consumer threads is called coroutine in Go language

PS: in subsequent articles, if threads appear in Go language articles, they all refer to goroutine.

conclusion

Overall, Java util. concurrent. The performance of linkedblockingqueue is still at the level of 500000 QPS, which meets the current pressure test requirements. The only thing to avoid is the unstable performance when the queue is long. To sum up, there are three general references:

  • The message body should be as small as possible
  • Limited thread gain
  • Try to avoid message backlog

brief introduction

First, let's introduce the first tested object Java util. concurrent. Linkedblockingqueue, which can be obtained by decomposing the name, is a blocking one-way object implemented by a linked list. The official definition is:

Optional bounded blocking queue based on linked nodes. This queue sorts elements in FIFO (first in first out). The head of the queue is the longest element in the queue. The end of the queue is the element with the shortest time in the queue. The new element is inserted into the tail of the queue, and the queue retrieval operation obtains the element of the queue head. Linked queues typically have higher throughput than array based queues, but can be more unpredictable in most concurrent applications.

Among the several queue implementation classes that I found in JDK, Java util. concurrent. Linkedblockingqueue has the highest performance, and there is also a candidate class java util. concurrent. Arrayblockingqueue, the data says Java util. concurrent. The performance of blockedqueue is probably Java util. concurrent. The performance of arrayblockingqueue is 2 ~ 3 times that of arrayblockingqueue. The gap is too obvious. I have a chance to test it again.

test result

Here, only the number of messages (objects) processed per millisecond is recorded as the only criterion for evaluating performance.

Data description

Here I use three kinds of org apache. http. client. methods. Httpget and creation methods all use native API s. In order to distinguish between cases, I will add some header and URL length to the response.

Small objects:

def get = new HttpGet()

Objects in:

def get = new HttpGet(url)
get.addHeader("token", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)

Large objects:

def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)

producer

Object size

Queue length (million)

Number of threads

Rate (/ ms)

Small

1

1

838

Small

1

5

837

Small

1

10

823

Small

5

1

483

Small

10

1

450

in

1

1

301

in

1

5

322

in

1

10

320

in

1

20

271

in

5

1

fail

in

10

1

fail

in

0.5

1

351

in

0.5

5

375

large

1

1

214

large

1

5

240

large

1

10

241

large

0.5

1

209

large

0.5

5

250

large

0.5

10

246

large

0.2

1

217

large

0.2

5

309

large

0.2

10

321

large

0.2

20

243

The two tests in the middle failed because the waiting time was too long. When it reached about 3 million, it began to stagnate, so I gave up.

For org apache. http. client. methods. The conclusion of httprequestbase message body is as follows:

  1. The length remains on the order of 100000
  2. Number of producer threads: 5-10 threads
  3. The message body should be as small as possible

consumer

Object size

Queue length (million)

Number of threads

Rate (/ ms)

Small

1

1

1893

Small

1

5

1706

Small

1

10

1594

Small

1

20

1672

Small

2

1

2544

Small

2

5

2024

Small

5

1

3419

in

1

1

1897

in

1

5

1485

in

1

10

1345

in

1

20

1430

in

2

1

2971

in

2

5

1576

large

1

1

1980

large

1

5

1623

large

1

10

1689

large

0.5

1

1136

large

0.5

5

1096

large

0.5

10

1072

For org apache. http. client. methods. The conclusion of httprequestbase message body is as follows:

  1. The longer the data, the better
  2. The fewer threads the consumer has, the better
  3. The message body should be as small as possible

This is a little different from the producer standard. Basically, the fewer lock competition, the better, and the more test messages, the better (temporarily unavailable in this work).

Producer & Consumer

The number of threads here refers to the number of producers or consumers, and the total number of threads is twice this value.

Object size

Times (million)

Number of threads

Queue length (million)

Rate (/ ms)

Small

1

1

0.1

1326

Small

1

1

0.2

1050

Small

1

1

0.5

1054

Small

1

5

0.1

1091

Small

1

10

0.1

1128

Small

2

1

0.1

1798

Small

2

1

0.2

1122

Small

2

5

0.2

946

Small

5

5

0.1

1079

Small

5

10

0.1

1179

in

1

1

0.1

632

in

1

1

0.2

664

in

1

5

0.2

718

in

1

10

0.2

683

in

2

1

0.2

675

in

2

5

0.2

735

in

2

10

0.2

788

in

2

15

0.2

828

large

1

1

0.1

505

large

1

1

0.2

558

large

1

5

0.2

609

large

1

10

0.2

496

large

2

1

0.2

523

large

2

5

0.2

759

large

2

10

0.2

668

For org apache. http. client. methods. The conclusion of httprequestbase message body is as follows:

  1. The fewer messages the message queue accumulates, the faster the rate
  2. The consumption rate is getting faster and faster over time, which is not obvious
  3. The message body should be as small as possible

test case

The test cases are written in Groovy language. Since I customized the asynchronous keyword fun and reviewed the syntax of closures, I feel like I'm on the light. I'm a little fascinated by all kinds of multithreaded syntax implementations. So this use case may look familiar to Java students. It's a little hard to read carefully. I'll try to write some comments. You can put the end point on the test results, which can be used in Java util. concurrent. The linkedblockingqueue class has a basic reference.

The test examples will be fine tuned according to the above test scenarios, such as the number of threads, the size of message body objects, etc. I will focus on the test of three use case scenarios. Of course, the use scenario in work is certainly much more complex than the three I mentioned. If you are interested, you can test it yourself. I won't teach you here.

Producer scenario

package com.funtest.groovytest

import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.utils.CountUtil
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger

class QueueT extends SourceCode {

    static AtomicInteger index = new AtomicInteger(0)

    static int total = 100_0000

    static int size = 10

    static int threadNum = 1

    static int piece = total / size

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) {

        LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()

        def start = Time.getTimeStamp()
        def latch = new CountDownLatch(threadNum)
        def ts = []
        def barrier = new CyclicBarrier(threadNum + 1)
        def funtester = {//Method of creating asynchronous closure
            fun {
                barrier.await()
                while (true) {
                    if (index.getAndIncrement() % piece == 0) {
                        def l = Time.getTimeStamp() - start
                        ts << l
                        output("${formatLong(index.get())}Add total consumption ${formatLong(l)}")
                        start = Time.getTimeStamp()
                    }
                    if (index.get() > total) break

                    def get = new HttpGet(url)
                    get.addHeader("token",token)
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    get.addHeader(HttpClientConstant.CONNECTION)
                    linkedQ.put(get)
                }
                latch.countDown()
            }
        }
        threadNum.times {funtester()}
        def st = Time.getTimeStamp()
        barrier.await()
        latch.await()
        def et = Time.getTimeStamp()
        outRGB("Rate per millisecond ${total / (et - st)}")
        outRGB(CountUtil.index(ts).toString())
    }


}

Consumer scenario

package com.funtest.groovytest

import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.utils.CountUtil
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class QueueTconsume extends SourceCode {

    static AtomicInteger index = new AtomicInteger(1)

    static int total = 100_0000

    static int size = 10

    static int threadNum = 5

    static int piece = total / size

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) {

        LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()
        def pwait = new CountDownLatch(10)
        def produces = {
            fun {
                while (true) {
                    if (linkedQ.size() > total) break
                    def get = new HttpGet(url)
                    get.addHeader("token", token)
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    get.addHeader(HttpClientConstant.CONNECTION)
                    linkedQ.add(get)
                }
                pwait.countDown()
            }
        }
        10.times {produces()}
        pwait.await()
        outRGB("Data construction completed!${linkedQ.size()}")


        def start = Time.getTimeStamp()
        def barrier = new CyclicBarrier(threadNum + 1 )
        def latch = new CountDownLatch(threadNum)
        def ts = []
        def funtester = {
            fun {
                barrier.await()
                while (true) {
                    if (index.getAndIncrement() % piece == 0) {
                        def l = Time.getTimeStamp() - start
                        ts << l
                        output("${formatLong(index.get())}Total consumption ${formatLong(l)}")
                        start = Time.getTimeStamp()
                    }
                    def poll = linkedQ.poll(100, TimeUnit.MILLISECONDS)
                    if (poll == null) break
                }
                latch.countDown()
            }
        }
        threadNum.times {funtester()}
        def st = Time.getTimeStamp()
        barrier.await()
        latch.await()
        def et = Time.getTimeStamp()
        outRGB("Rate per millisecond ${total / (et - st)}")
        outRGB(CountUtil.index(ts).toString())
    }


}


Producer consumer scenario

Here I introduce another variable: the length of the initial queue. Before the use case runs, the queue will be filled in a single thread according to this length.

package com.funtest.groovytest

import com.funtester.frame.SourceCode
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class QueueBoth extends SourceCode {

    static AtomicInteger index = new AtomicInteger(1)

    static int total = 500_0000

    static int length = 50_0000

    static int threadNum = 5

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) {
        LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()

        def latch = new CountDownLatch(threadNum * 2)
        def barrier = new CyclicBarrier(threadNum * 2 + 1)
        def ts = []
        def funtester = {f ->
            {
                fun {
                    barrier.await()
                    while (true) {
                        if (index.getAndIncrement() > total) break
                        f()
                    }
                    latch.countDown()
                }
            }
        }
        def produces =  {
            def get = new HttpGet(url)
            get.addHeader("token", token)
            get.addHeader(HttpClientConstant.USER_AGENT)
            get.addHeader(HttpClientConstant.CONNECTION)
            linkedQ.put(get)
        }
        length.times {produces()}

        threadNum.times {
            funtester produces
            funtester {linkedQ.poll(100, TimeUnit.MILLISECONDS)}
        }
        def st = Time.getTimeStamp()
        barrier.await()
        latch.await()
        def et = Time.getTimeStamp()
        outRGB("Rate per millisecond ${total / (et - st) / 2}")
    }


}

supplement

Very unstable performance

There are two questions that need to be added, Java util. concurrent. The performance of linkedblockingqueue is very unstable during the test. Every time I print the log, I print the timestamp with 1 / 10 as the node. Here are some logs in the producer mode when the queue length is 1 million:

INFO-> 23.731 F-2  107,942 Add total consumption 523
INFO-> 23.897 F-10 200,061 Add total consumption 165
INFO-> 24.137 F-9  300,024 Add total consumption 239
INFO-> 24.320 F-2  400,037 Add total consumption 182
INFO-> 25.200 F-5  500,065 Add total consumption 879
INFO-> 25.411 F-2  600,094 Add total consumption 211
INFO-> 25.604 F-8  700,090 Add total consumption 193
INFO-> 26.868 F-1  800,047 Add total consumption 1,264
INFO-> 26.927 F-4  900,053 Add total consumption 57
INFO-> 28.454 F-3  1,000,009 Add total consumption 1,527
INFO-> 28.457 main Rate per millisecond 190.0779319521
INFO-> 28.476 main average value:524.0 ,1527 Max.0 ,minimum value:57.0 ,median:239.0 p99:1527.0 p95:1527.0


INFO-> 43.930 F-10 112,384 Add total consumption 385
INFO-> 44.072 F-9  200,159 Add total consumption 140
INFO-> 44.296 F-1  300,058 Add total consumption 223
INFO-> 44.445 F-7  400,075 Add total consumption 149
INFO-> 45.311 F-10 500,086 Add total consumption 866
INFO-> 45.498 F-8  600,080 Add total consumption 187
INFO-> 45.700 F-1  700,088 Add total consumption 202
INFO-> 45.760 F-9  800,057 Add total consumption 59
INFO-> 47.245 F-6  900,095 Add total consumption 1,485
INFO-> 47.303 F-6  1,000,009 Add total consumption 58
INFO-> 47.305 main Rate per millisecond 262.7430373095
INFO-> 47.320 main average value:375.4 ,1485 Max.0 ,minimum value:58.0 ,median:202.0 p99:1485.0 p95:1485.0


INFO-> 00.916 F-1  100,000 Add total consumption 568
INFO-> 01.269 F-1  200,000 Add total consumption 353
INFO-> 01.461 F-1  300,000 Add total consumption 192
INFO-> 01.635 F-1  400,000 Add total consumption 174
INFO-> 02.536 F-1  500,000 Add total consumption 899
INFO-> 02.777 F-1  600,000 Add total consumption 240
INFO-> 03.015 F-1  700,000 Add total consumption 237
INFO-> 03.107 F-1  800,000 Add total consumption 91
INFO-> 04.519 F-1  900,000 Add total consumption 1,412
INFO-> 05.940 F-1  1,000,000 Add total consumption 96
INFO-> 05.943 main Rate per millisecond 184.5358922310
INFO-> 05.959 main average value:426.2 ,1412 Max.0 ,minimum value:91.0 ,median:240.0 p99:1412.0 p95:1412.0

It can be seen that the maximum and minimum values can differ by more than ten times or even more than twenty times. This situation increases with the growth of the total length of the message queue. Most of them occur in the stage of 800000 ~ 1 million. If the length is reduced to 500000, this situation will be significantly improved. So there is an additional point: the message queue length should be as small as possible.

Benchmark test

The following is the test results of the production code of three message objects using the FunTester performance test framework.

Test object

Number of threads

Number (million)

Rate (/ ms)

Small

1

1

5681

Small

5

1

8010

Small

5

5

15105

in

1

1

1287

in

5

1

2329

in

5

5

4176

large

1

1

807

large

5

1

2084

large

5

5

3185

The test cases are as follows:

package com.funtest.groovytest

import com.funtester.base.constaint.FixedThread
import com.funtester.config.HttpClientConstant
import com.funtester.frame.execute.Concurrent
import com.funtester.httpclient.FunLibrary
import org.apache.http.client.methods.HttpGet

class TTT extends FunLibrary {

    static int total = 100_0000

    static int thread = 1

    static int times = total / thread

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) {
        RUNUP_TIME = 0
        def tasks = []
        thread.times {tasks << new FunTester(times)}
        new Concurrent(tasks,"Test producer code performance").start()

    }

    private static class FunTester extends FixedThread {

        FunTester(int limit) {
            super(null, limit, true)
        }

        @Override
        protected void doing() throws Exception {
//            def get = new HttpGet()

//            def get = new HttpGet(url)
//            get.addHeader("token", token)
//            get.addHeader(HttpClientConstant.USER_AGENT)
//            get.addHeader(HttpClientConstant.CONNECTION)

            def get = new HttpGet(url + token)
            get.addHeader("token", token)
            get.addHeader("token1", token)
            get.addHeader("token5", token)
            get.addHeader("token4", token)
            get.addHeader("token3", token)
            get.addHeader("token2", token)
            get.addHeader(HttpClientConstant.USER_AGENT)
            get.addHeader(HttpClientConstant.CONNECTION)

        }

        @Override
        FixedThread clone() {
            return new FunTester(limit)
        }
    }

}

Added by erikw46 on Tue, 08 Feb 2022 22:12:54 +0200