Simplify API design with coroutine and Flow

If you are a library author, you may want users to call your Java or callback based API more easily when using Kotlin coroutine and Flow. In addition, if you are an API user, you may be willing to adapt the third-party API interface to the protocol to make them more friendly to Kotlin.

This article will show you how to simplify the API using coroutine and Flow, and how to create your own adapter using suspendcancelablecoroutine and callbackflow APIs. For those curious readers, this article will also analyze these APIs to help you understand their underlying working principle.

If you prefer watching videos, you can click here .

Check the existing co process adapter

Before you write your own encapsulation for an existing API, check to see if there is an adapter or for your use case Extension method . Here are some libraries that contain common types of coroutine adapters.

Future type

For the future type, Java 8 integrates CompletableFuture And Guava integrates ListenableFuture . Not all of this is mentioned here. You can search online to determine whether there are adapters for your future type.

// Wait for the execution of CompletionStage to complete without blocking the thread
suspend fun <T> CompletionStage<T>.await(): T 
 
// Wait for the execution of ListenableFuture to complete without blocking the thread
suspend fun <T> ListenableFuture<T>.await(): T

Using these functions, you can get rid of the callback and suspend the coroutine until the result of future is returned.

Reactive Stream

For the library of responsive flow, there are RxJava,Java 9 API And Responsive flow Library Integration of:

// Convert the given responsive Publisher to Flow
fun <T : Any> Publisher<T>.asFlow(): Flow<T>

These functions convert a reactive Flow to a Flow.

Android specific API

For Jetpack library or Android platform API, you can refer to Jetpack KTX Library List. There are more than 20 libraries with KTX versions, which constitute the Java API you are familiar with. These include SharedPreferences, ViewModels, SQLite, and Play Core.

Callback

Callback is a very common practice when implementing asynchronous communication. In fact, we are Background thread task running guide Use callbacks as the default solution for the Java programming language. However, callbacks also have many disadvantages: This design leads to confusing callback nesting. At the same time, because there is no simple way of transmission, error handling is more complex. In Kotlin, you can simply call callbacks using a coroutine, but only if you have to create your own adapter.

Create your own adapter

If you don't find an adapter suitable for your use case, it's more direct to write your own adapter. For one-time asynchronous calls, you can use suspendCancellableCoroutine API; For streaming data, you can use callbackFlow API.

As an exercise, the following example will use data from Google Play Services Fused Location Provider API to get location data. The API interface is simple, but it uses callbacks to perform asynchronous operations. When the logic becomes complex, these callbacks tend to make the code unreadable, and we can use coroutines to get rid of them.

If you want to explore other solutions, you can be inspired by the source code linked by the above function.

One time asynchronous call

Fused Location Provider API provides getLastLocation Method to obtain Last known location . For a coroutine, the ideal API is a hang function that returns the exact result directly.

Note: the return value of this API is Task , and there is already a corresponding Adapter . For learning purposes, we use it as an example.

We can get a better API by creating an extension function for FusedLocationProviderClient:

suspend fun FusedLocationProviderClient.awaitLastLocation(): Location

Since this is a one-time asynchronous operation, we use the suspendcancelablecoroutine function: an underlying building block for creating a pending function from the coroutine library.

Suspendcancelablecoroutine executes the code block passed in as a parameter, and then suspends the execution of the coroutine while waiting for the continue signal. When synergetic process Continuation When the resume or resumeWithException method in the object is called, the coroutine will resume execution. For more information about Continuation, see: Kotlin Vocabulary 𞓜 reveal the suspend modifier in the process.

We can add the getlast method to the callback at the appropriate time. See the following implementation:

// The extension function of FusedLocationProviderClient returns the last known location
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location =

  // Create a new cancellable collaboration
  suspendCancellableCoroutine<Location> { continuation ->

    // Add listener for recovery process execution
    lastLocation.addOnSuccessListener { location ->
      // Resume collaboration and return to location
      continuation.resume(location)
    }.addOnFailureListener { e ->
      // Recover the process by throwing an exception
      continuation.resumeWithException(e)
    }

    // The end of the suspendcancelablecoroutine block. The process will be suspended here
    //Until a callback calls the continuation parameter
  }

Note: Although the collaboration library also contains a non cancellable version of the collaboration Builder (i.e suspendCoroutine ), but it's best to always choose to use it suspendCancellableCoroutine Handle the cancellation of the collaboration scope and propagate the cancellation event from the underlying API.

Suspendcancelablecoroutine principle

Inside, suspendCancellableCoroutine use suspendCoroutineUninterceptedOrReturn Obtain Continuation in the coroutine of the suspend function. This Continuation object will be CancellableContinuation Object interception, which will control the life cycle of the collaboration from this time realization It has the function of Job, but there are some limitations).

Next, the lambda expression passed to suspendcancelablecoroutine is executed. If the lambda returns the result, the collaboration will be resumed immediately; Otherwise, the collaboration will remain suspended until cancelablecontinuation is manually restored by lambda.

You can use my code snippet below( Original implementation )To see what happened:

public suspend inline fun <T> suspendCancellableCoroutine(
  crossinline block: (CancellableContinuation<T>) -> Unit
): T =
  // Gets the Continuation object of the coroutine running this pending function 
  suspendCoroutineUninterceptedOrReturn { uCont ->

    // Take over the process. Continuation has been intercepted,
    // Next, the lifecycle of cancelablecontinuationimpl will be followed
    val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...)
    /* ... */
 
    // Use the code block that cancels the Continuation call
    block(cancellable)
        
    // Suspend the collaboration and wait for the Continuation to be resumed in the "block", or return the result when the "block" ends execution 
    cancellable.getResult()
  }

To learn more about how the suspend function works, see this article: Kotlin Vocabulary 𞓜 reveal the suspend modifier in the process.

Stream data

If we instead want the user's device to receive location updates periodically when moving in a real environment (using requestLocationUpdates Function), we need to use Flow To create a data flow. The ideal API should look like this:

fun FusedLocationProviderClient.locationFlow(): Flow<Location>

To convert a callback based API to Flow, you can use callbackFlow Flow builder to create a new flow. The inside of the lambda expression of callbackFlow is in the context of a coroutine, which means that it can call the suspend function. differ flow The flow builder, channelFlow, can be used outside of different CoroutineContext or coroutines offer Method to send data.

In general, building a flow adapter using callbackFlow follows the following three steps:

  1. Create a callback that uses offer to add elements to flow;
  2. Registration callback;
  3. Wait for the consumer to cancel the process and cancel the callback.

Applying the above steps to the current use case, we get the following implementation:

// Send location updates to consumers
fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
  // A new Flow is created. This code will be executed in the collaboration.
  // 1. Create a callback and add elements to the flow
  val callback = object : LocationCallback() {
    override fun onLocationResult(result: LocationResult?) {
      result ?: return  // Ignore empty results
      for (location in result.locations) {
        try {
          offer(location)  // Send location to flow
        } catch (t: Throwable) {
          // Location cannot be sent to flow
        }
      }
    }
  }
  
  // 2. Register the callback and get the location update by calling requestLocationUpdates.
  requestLocationUpdates(
    createLocationRequest(),
    callback,
    Looper.getMainLooper()
  ).addOnFailureListener { e ->
    close(e)  // Close flow on error
  }
  
  // 3. Wait for the consumer to cancel the process and cancel the callback. This process will suspend the collaboration process until Flow is closed.
  awaitClose {
    // Clean up the code here
    removeLocationUpdates(callback)
  }
}

Internal principle of callback flow

Internally, callbackFlow uses a channel . channel is conceptually close to blocking queue ——It needs to specify the capacity when configuring: that is, the number of elements that can be buffered. The default capacity of the channel created in the callback flow is 64 elements. If a new element is added to a full channel, because the offer will not add the element to the channel and will immediately return false, send will pause the producer until there is free space for the new element in the channel.

Internal principle of awaitClose

Interestingly, awaitClose uses suspend cancelable coroutine internally. You can use my comments in the following code snippet (see Original implementation )Take a look:

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
  ...
  try {
    // Use to cancel the continuation pending process
    suspendCancellableCoroutine<Unit> { cont ->
      // Only when the Flow or Channel is closed, the collaboration process is successfully restored, otherwise it remains suspended
      invokeOnClose { cont.resume(Unit) }
    }
  } finally {
    // The caller's cleanup code is always executed
    block()
  }
}

Reuse Flow

Unless additional intermediate operators are used (e.g.: conflate )Otherwise, the flow is cold and inert. This means that the building block is executed every time the terminal operator of flow is called. For our use case, since adding a new location listener costs little, this feature won't be a big problem. However, some may not be realized.

You can use shareIn The intermediate operator reuses the same flow among multiple collectors and turns the cold flow into heat flow.

val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
  ...
}.shareIn(
  // Let flow follow applicationScope
  applicationScope,
  // Send the last sent element to the new collector
  replay = 1,
  // Keep the producer active when there are active subscribers
  started = SharingStarted.WhileSubscribed()
)

You can use the article< Cancellations and exceptions in the coordination process | detailed explanation of resident tasks >To learn more about best practices for using applicationScope in applications.

You should consider making your API or existing API concise, readable, and in line with Kotlin's usage habits by creating a orchestration adapter. First, check whether there is an available adapter. If not, you can use suspend cancelablecoroutine for one-time call; Or use callbackFlow to create your own adapter for streaming data.

You can codelab: create Kotlin extension library , start the topic introduced in this article.

Keywords: Android github kotlin

Added by zxsalmanxz on Tue, 08 Mar 2022 01:44:01 +0200