Kotlin thread synchronization method

During the interview, you are often asked about multi-threaded synchronization, such as:

"There are multiple parallel tasks such as task 1 and task 2. How to wait for all tasks to be completed before starting to execute task 3?"

There are many implementation methods available in Kotlin. This paper arranges all these methods:

  1. Thread.join
  2. Synchronized
  3. ReentrantLock
  4. BlockingQueue
  5. CountDownLatch
  6. CyclicBarrier
  7. CAS
  8. Future
  9. CompletableFuture
  10. Rxjava
  11. Coroutine
  12. Flow

We first define three tasks to simulate the above scenario. Task 3 splices strings based on the results returned by task 1 and task 2. Each task takes time to simulate through sleep:

val task1: () -> String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it") }
}

val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it") }
}

val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it") }
}

1. Thread.join()

Kotlin is compatible with Java, and all threading tools in Java can be used by default. The simplest way to synchronize threads is to use Thread's join():

@Testfun test_join() {    lateinit var s1: String    lateinit var s2: String    val t1 = Thread { s1 = task1() }    val t2 = Thread { s2 = task2() }    t1.start()    t2.start()    t1.join()    t2.join()        task3(s1, s2)}

2. Synchronized

Synchronize using synchronized locks

 @Test
    fun test_synchrnoized() {
        lateinit var s1: String
        lateinit var s2: String
        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        s2 = task2()

        synchronized(Unit) {
            task3(s1, s2)
        }

    }

However, if there are more than three tasks, it is awkward to use synchronized. In order to synchronize the results of multiple parallel tasks, you need to declare n locks and nest n synchronized.

3. ReentrantLock

ReentrantLock is a thread lock provided by JUC, which can replace the use of synchronized

 @Test
    fun test_ReentrantLock() {

        lateinit var s1: String
        lateinit var s2: String

        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        s2 = task2()

        lock.lock()
        task3(s1, s2)
        lock.unlock()    }

The advantage of ReentrantLock is that when there are multiple parallel tasks, there will be no nested synchronized problem, but you still need to create multiple locks to manage different tasks,

4. BlockingQueue

The blocking queue is also implemented through Lock, so the effect of synchronous Lock can also be achieved

 @Test
    fun test_blockingQueue() {

        lateinit var s1: String
        lateinit var s2: String

        val queue = SynchronousQueue<Unit>()

        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()

        s2 = task2()

        queue.take()
        task3(s1, s2)
    }

Of course, blocking queues are more used for synchronization in production / consumption scenarios.

5. CountDownLatch

Most locks in JUC are implemented based on AQS and can be divided into exclusive locks and shared locks. ReentrantLock is an exclusive lock. In contrast, shared locks are more suitable for this scenario. For example, CountDownLatch can keep one thread blocked until the execution of other threads is completed:

 @Test
    fun test_countdownlatch() {
        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2)
        Thread() {
            s1 = task1()
            cd.countDown()
        }.start()

        Thread() {
            s2 = task2()
            cd.countDown()
        }.start()

        cd.await()
        task3(s1, s2)
    }

The advantage of shared locks is that it is not necessary to create separate locks for each task, and it is easy to write even more parallel tasks

6. CyclicBarrier

CyclicBarrier is another shared lock mechanism provided by JUC. It allows a group of threads to continue running together after reaching a synchronization point. If any thread fails to reach the synchronization point, other threads that have reached the synchronization point will be blocked.

The difference between CountDownLatch and CountDownLatch is that CountDownLatch is disposable, and the CyclicBarrier can be reused after being reset, which is the naming origin of Cyclic and can be recycled

 @Test
    fun test_CyclicBarrier() {

        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)

        Thread {
            s1 = task1()
            cb.await()
        }.start()

        Thread() {
            s2 = task1()
            cb.await()
        }.start()

        cb.await()        task3(s1, s2)

    }

7. CAS

AQS achieves synchronization through spin lock. The essence of spin lock is to use CompareAndSwap to avoid the overhead of thread blocking. Therefore, we can use atomic class counting based on CAS to achieve the purpose of lock free operation.

  @Test
    fun test_cas() {

        lateinit var s1: String
        lateinit var s2: String

        val cas = AtomicInteger(2)

        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()

        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()

        while (cas.get() != 0) {}

        task3(s1, s2)

    }

while loop idling seems a waste of resources, but this is the essence of spin lock, so CAS is only suitable for some cpu intensive short task synchronization.

volatile

Seeing the lock free implementation of CAS, many people may think of volatile. Can it also achieve lock free thread safety?

  @Test
    fun test_Volatile() {
        lateinit var s1: String
        lateinit var s2: String

        Thread {
            s1 = task1()
            cnt--
        }.start()

        Thread {
            s2 = task2()
            cnt--
        }.start()

        while (cnt != 0) {
        }

        task3(s1, s2)

    }

Note that this method is wrong. volatile can ensure visibility, but it cannot guarantee atomicity. cnt -- it is not thread safe and requires locking

8. Future

Whether there is a lock operation or no lock operation above, it is very inconvenient to define two variables s1 and s2 to record the results. Starting with Java 1.5, Callable and Future are provided, which can return results at the end of task execution.

@Testfun test_future() {

    val future1 = FutureTask(Callable(task1))
    val future2 = FutureTask(Callable(task2))

    Executors.newCachedThreadPool().execute(future1)
    Executors.newCachedThreadPool().execute(future2)

    task3(future1.get(), future2.get())

}

Through future Get() can synchronously wait for the result to return, which is very convenient to write

9. CompletableFuture

future.get() is convenient, but it blocks threads. Completable Future is introduced into Java 8. It implements both the Future interface and the CompletionStage interface. Completable Future can logically combine multiple completionstages and realize complex asynchronous programming. These logically combined methods avoid thread blocking in the form of callback:

@Testfun test_CompletableFuture() {
    CompletableFuture.supplyAsync(task1)
        .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
             task3(p1, p2)
        }.join()
}

10. RxJava

Various operators and thread switching capabilities provided by RxJava can also help us realize our requirements: the zip operator can combine the results of two Observable; subscribeOn is used to start asynchronous tasks

@Testfun test_Rxjava() {

    Observable.zip(
        Observable.fromCallable(Callable(task1))
            .subscribeOn(Schedulers.newThread()),
        Observable.fromCallable(Callable(task2))
            .subscribeOn(Schedulers.newThread()),
        BiFunction(task3)
    ).test().awaitTerminalEvent()

}

11. Coroutine

As mentioned earlier, they are actually Java tools. Coroutine is finally a Kotlin specific tool:

@Testfun test_coroutine() {

    runBlocking {
        val c1 = async(Dispatchers.IO) {
            task1()
        }

        val c2 = async(Dispatchers.IO) {
            task2()
        }

        task3(c1.await(), c2.await())
    }
}

It's very comfortable to write. It can be said that it integrates the advantages of all kinds of tools in front.

12. Flow

Flow is the Coroutine version of RxJava, with many RxJava operators, such as zip:

@Test
fun test_flow() {

    val flow1 = flow<String> { emit(task1()) }
    val flow2 = flow<String> { emit(task2()) }

            runBlocking {
         flow1.zip(flow2) { t1, t2 ->
             task3(t1, t2)
        }.flowOn(Dispatchers.IO)
        .collect()

    }

}

flowOn enables the Task to calculate and transmit results asynchronously.

summary

There are so many ways above, just like the four ways of writing the word "fennel" in fennel beans. It is not necessary to master them all. As a conclusion, the best thread synchronization scheme on Kotlin is coprocessing! Original link: Interview prerequisites: N methods of Kotlin thread synchronization - Nuggets (juejin.cn)

end of document

Your favorite collection is my greatest encouragement! Welcome to follow me, share Android dry goods and exchange Android technology. If you have any opinions on the article or any technical problems, please leave a message in the comment area for discussion!

Added by simwiz on Thu, 16 Dec 2021 09:57:44 +0200