Concept and principle of collaborative process, etc
Concept of collaborative process
Coroutine (English: coroutine) is a kind of component of computer program, which promotes * * * collaborative * * * multitasking subroutine, and allows execution to be suspended and restored.
English co routine, CO collaboration, routine (another name for computer subroutine)
Cooperative Multitasking is a way of multitasking. Multitasking is a technology that enables computers to process multiple programs at the same time. Compared with Preemptive multitasking, Cooperative Multitasking requires each running program to give up its right to run at a fixed time and inform the operating system that the next program can run.
The performance of collaboration in the collaboration process: let the thread resources hang the collaboration process through the yield() method
The difference between a collaborative process and an ordinary subroutine: a collaborative process can be returned multiple times, and the * * * state can be maintained during multiple calls. A collaborative process can have multiple entry points. The beginning of the collaborative process is the first entry point, and each yield return exit point is the entry point when it is called and executed again. The beginning of the subroutine is the only entry point. Whenever the subroutine is called, the execution starts from the beginning of the called subroutine
Relationship between collaboration and thread: collaboration is a language level structure, which can be regarded as a form of control flow, while thread is a system level structure, and collaboration is based on thread.
Coprocess in kotlin
Abstract class corresponding to the concept of coroutine
AbstractCoroutine
AbstractCoroutine inherits from the JobSupport class and implements three interfaces: job, continuation and coroutinescope.
/** * Abstract base class for implementation of coroutines in coroutine builders. * * This class implements completion [Continuation], [Job], and [CoroutineScope] interfaces. * It stores the result of continuation in the state of the job. * This coroutine waits for children coroutines to finish before completing and * fails through an intermediate _failing_ state. *Implement the abstract base class of the collaborative process in the collaborative process generator. *This class implements the completion [Continuation], [Job] and [CoroutineScope] interfaces. *Store the continuation results in the job status. *This process waits for the sub process to complete before completing and *Through the middle_ failing_ Status failed. * * The following methods are available for override: * * * [onStart] is invoked when the coroutine was created in non-active state and is being [started][Job.start]. * * [onCancelling] is invoked as soon as the coroutine starts being cancelled for any reason (or completes). * * [onCompleted] is invoked when the coroutine completes with a value. * * [onCancelled] in invoked when the coroutine completes with an exception (cancelled). * * @param parentContext the context of the parent coroutine. * @param active when `true` (by default), the coroutine is created in the _active_ state, otherwise it is created in the _new_ state. * See [Job] for details. * * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi public abstract class AbstractCoroutine<in T>( /** * The context of the parent coroutine. */ @JvmField protected val parentContext: CoroutineContext, active: Boolean = true ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { /** * The context of this coroutine that includes this coroutine as a [Job]. */ @Suppress("LeakingThis") public final override val context: CoroutineContext = parentContext + this /** * The context of this scope which is the same as the [context] of this coroutine. */ public override val coroutineContext: CoroutineContext get() = context override val isActive: Boolean get() = super.isActive /** * Initializes the parent job from the `parentContext` of this coroutine that was passed to it during construction. * It shall be invoked at most once after construction after all other initialization. * * Invocation of this function may cause this coroutine to become cancelled if the parent is already cancelled, * in which case it synchronously invokes all the corresponding handlers. * @suppress **This is unstable API and it is subject to change.** */ internal fun initParentJob() { initParentJobInternal(parentContext[Job]) } /** * This function is invoked once when a non-active coroutine (constructed with `active` set to `false) * is [started][start]. */ protected open fun onStart() {} internal final override fun onStartInternal() { onStart() } /** * This function is invoked once when the job was completed normally with the specified [value], * right before all the waiters for the coroutine's completion are notified. */ protected open fun onCompleted(value: T) {} /** * This function is invoked once when the job was cancelled with the specified [cause], * right before all the waiters for coroutine's completion are notified. * * **Note:** the state of the coroutine might not be final yet in this function and should not be queried. * You can use [completionCause] and [completionCauseHandled] to recover parameters that we passed * to this `onCancelled` invocation only when [isCompleted] returns `true`. * * @param cause The cancellation (failure) cause * @param handled `true` if the exception was handled by parent (always `true` when it is a [CancellationException]) */ protected open fun onCancelled(cause: Throwable, handled: Boolean) {} override fun cancellationExceptionMessage(): String = "$classSimpleName was cancelled" @Suppress("UNCHECKED_CAST") protected final override fun onCompletionInternal(state: Any?) { if (state is CompletedExceptionally) onCancelled(state.cause, state.handled) else onCompleted(state as T) } /** * Completes execution of this with coroutine with the specified result. */ public final override fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) } protected open fun afterResume(state: Any?): Unit = afterCompletion(state) internal final override fun handleOnCompletionException(exception: Throwable) { handleCoroutineException(context, exception) } internal override fun nameString(): String { val coroutineName = context.coroutineName ?: return super.nameString() return "\"$coroutineName\":${super.nameString()}" } /** * Starts this coroutine with the given code [block] and [start] strategy. * This function shall be invoked at most once on this coroutine. * * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it * during construction. Second, it starts the coroutine based on [start] parameter: * * * [DEFAULT] uses [startCoroutineCancellable]. * * [ATOMIC] uses [startCoroutine]. * * [UNDISPATCHED] uses [startCoroutineUndispatched]. * * [LAZY] does nothing. */ public fun start(start: CoroutineStart, block: suspend () -> T) { initParentJob() start(block, this) } /** * Starts this coroutine with the given code [block] and [start] strategy. * This function shall be invoked at most once on this coroutine. * * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it * during construction. Second, it starts the coroutine based on [start] parameter: * * * [DEFAULT] uses [startCoroutineCancellable]. * * [ATOMIC] uses [startCoroutine]. * * [UNDISPATCHED] uses [startCoroutineUndispatched]. * * [LAZY] does nothing. */ public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) } }
Job
Job means only a fairly Abstract job interface, which can judge the status, start, join, cancel, and the sequence of childrenJob. It can be seen that the start, join and cancel methods are not directly implemented in AbstractCoroutine. These methods are implemented in JobSupport.
// --------------- core job interfaces --------------- /** * A background job. Conceptually, a job is a cancellable thing with a life-cycle that * culminates in its completion. * * Jobs can be arranged into parent-child hierarchies where cancellation * of a parent leads to immediate cancellation of all its [children] recursively. * Failure of a child with an exception other than [CancellationException] immediately cancels its parent and, * consequently, all its other children. This behavior can be customized using [SupervisorJob]. * * The most basic instances of `Job` interface are created like this: * * * **Coroutine job** is created with [launch][CoroutineScope.launch] coroutine builder. * It runs a specified block of code and completes on completion of this block. * * **[CompletableJob]** is created with a `Job()` factory function. * It is completed by calling [CompletableJob.complete]. * * Conceptually, an execution of a job does not produce a result value. Jobs are launched solely for their * side-effects. See [Deferred] interface for a job that produces a result. * *Background work. Conceptually speaking, a job is something that can be cancelled. Its life cycle is *End with its completion. * *Jobs can be arranged into a parent-child hierarchy in which jobs are cancelled The * of the parent node causes all child nodes to be immediately cancelled recursively. A child exception other than [CancellationException] will immediately cancel its parent exception, Therefore, all other child elements. You can customize this behavior using [SupervisorJob]. * *"Job"The most basic instance of the interface is created as follows: * ** **[launch][CoroutineScope.] Created. (launch) collaborative program generator. *It runs a specified block of code and completes when the block is complete. ** **[CompletableJob]**It is created through the 'Job()' factory function. *Complete by calling [CompletableJob.complete]. * *Conceptually, the execution of a job does not produce a result value. The job is just for their *side effect. See the results of the [Deferred] job. * ### Job states * * A job has the following states: * * | **State** | [isActive] | [isCompleted] | [isCancelled] | * | -------------------------------- | ---------- | ------------- | ------------- | * | _New_ (optional initial state) | `false` | `false` | `false` | * | _Active_ (default initial state) | `true` | `false` | `false` | * | _Completing_ (transient state) | `true` | `false` | `false` | * | _Cancelling_ (transient state) | `false` | `false` | `true` | * | _Cancelled_ (final state) | `false` | `true` | `true` | * | _Completed_ (final state) | `false` | `true` | `false` | * * Usually, a job is created in the _active_ state (it is created and started). However, coroutine builders * that provide an optional `start` parameter create a coroutine in the _new_ state when this parameter is set to * [CoroutineStart.LAZY]. Such a job can be made _active_ by invoking [start] or [join]. * * A job is _active_ while the coroutine is working or until [CompletableJob] is completed, * or until it fails or cancelled. * * Failure of an _active_ job with an exception makes it _cancelling_. * A job can be cancelled at any time with [cancel] function that forces it to transition to * the _cancelling_ state immediately. The job becomes _cancelled_ when it finishes executing its work and * all its children complete. * * Completion of an _active_ coroutine's body or a call to [CompletableJob.complete] transitions the job to * the _completing_ state. It waits in the _completing_ state for all its children to complete before * transitioning to the _completed_ state. * Note that _completing_ state is purely internal to the job. For an outside observer a _completing_ job is still * active, while internally it is waiting for its children. * * ``` * wait children * +-----+ start +--------+ complete +-------------+ finish +-----------+ * | New | -----> | Active | ---------> | Completing | -------> | Completed | * +-----+ +--------+ +-------------+ +-----------+ * | cancel / fail | * | +----------------+ * | | * V V * +------------+ finish +-----------+ * | Cancelling | --------------------------------> | Cancelled | * +------------+ +-----------+ * ``` * * A `Job` instance in the * [coroutineContext](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.coroutines/coroutine-context.html) * represents the coroutine itself. * * ### Cancellation cause * * A coroutine job is said to _complete exceptionally_ when its body throws an exception; * a [CompletableJob] is completed exceptionally by calling [CompletableJob.completeExceptionally]. * An exceptionally completed job is cancelled and the corresponding exception becomes the _cancellation cause_ of the job. * * Normal cancellation of a job is distinguished from its failure by the type of this exception that caused its cancellation. * A coroutine that threw [CancellationException] is considered to be _cancelled normally_. * If a cancellation cause is a different exception type, then the job is considered to have _failed_. * When a job has _failed_, then its parent gets cancelled with the exception of the same type, * thus ensuring transparency in delegating parts of the job to its children. * * Note, that [cancel] function on a job only accepts [CancellationException] as a cancellation cause, thus * calling [cancel] always results in a normal cancellation of a job, which does not lead to cancellation * of its parent. This way, a parent can [cancel] its own children (cancelling all their children recursively, too) * without cancelling itself. * * ### Concurrency and synchronization * * All functions on this interface and on all interfaces derived from it are **thread-safe** and can * be safely invoked from concurrent coroutines without external synchronization. * * ### Not stable for inheritance * * **`Job` interface and all its derived interfaces are not stable for inheritance in 3rd party libraries**, * as new methods might be added to this interface in the future, but is stable for use. */ public interface Job : CoroutineContext.Element { /** * Key for [Job] instance in the coroutine context. */ public companion object Key : CoroutineContext.Key<Job> { init { /* * Here we make sure that CoroutineExceptionHandler is always initialized in advance, so * that if a coroutine fails due to StackOverflowError we don't fail to report this error * trying to initialize CoroutineExceptionHandler */ CoroutineExceptionHandler } } // ------------ state query ------------ /** * Returns `true` when this job is active -- it was already started and has not completed nor was cancelled yet. * The job that is waiting for its [children] to complete is still considered to be active if it * was not cancelled nor failed. * * See [Job] documentation for more details on job states. */ public val isActive: Boolean /** * Returns `true` when this job has completed for any reason. A job that was cancelled or failed * and has finished its execution is also considered complete. Job becomes complete only after * all its [children] complete. * * See [Job] documentation for more details on job states. */ public val isCompleted: Boolean /** * Returns `true` if this job was cancelled for any reason, either by explicit invocation of [cancel] or * because it had failed or its child or parent was cancelled. * In the general case, it does not imply that the * job has already [completed][isCompleted], because it may still be finishing whatever it was doing and * waiting for its [children] to complete. * * See [Job] documentation for more details on cancellation and failures. */ public val isCancelled: Boolean /** * Returns [CancellationException] that signals the completion of this job. This function is * used by [cancellable][suspendCancellableCoroutine] suspending functions. They throw exception * returned by this function when they suspend in the context of this job and this job becomes _complete_. * * This function returns the original [cancel] cause of this job if that `cause` was an instance of * [CancellationException]. Otherwise (if this job was cancelled with a cause of a different type, or * was cancelled without a cause, or had completed normally), an instance of [CancellationException] is * returned. The [CancellationException.cause] of the resulting [CancellationException] references * the original cancellation cause that was passed to [cancel] function. * * This function throws [IllegalStateException] when invoked on a job that is still active. * * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi public fun getCancellationException(): CancellationException // ------------ state update ------------ /** * Starts coroutine related to this job (if any) if it was not started yet. * The result `true` if this invocation actually started coroutine or `false` * if it was already started or completed. */ public fun start(): Boolean /** * Cancels this job with an optional cancellation [cause]. * A cause can be used to specify an error message or to provide other details on * the cancellation reason for debugging purposes. * See [Job] documentation for full explanation of cancellation machinery. */ public fun cancel(cause: CancellationException? = null) /** * @suppress This method implements old version of JVM ABI. Use [cancel]. */ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") public fun cancel(): Unit = cancel(null) /** * @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel]. */ @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") public fun cancel(cause: Throwable? = null): Boolean // ------------ parent-child ------------ /** * Returns a sequence of this job's children. * * A job becomes a child of this job when it is constructed with this job in its * [CoroutineContext] or using an explicit `parent` parameter. * * A parent-child relation has the following effect: * * * Cancellation of parent with [cancel] or its exceptional completion (failure) * immediately cancels all its children. * * Parent cannot complete until all its children are complete. Parent waits for all its children to * complete in _completing_ or _cancelling_ state. * * Uncaught exception in a child, by default, cancels parent. In particular, this applies to * children created with [launch][CoroutineScope.launch] coroutine builder. Note that * [async][CoroutineScope.async] and other future-like * coroutine builders do not have uncaught exceptions by definition, since all their exceptions are * caught and are encapsulated in their result. */ public val children: Sequence<Job> /** * Attaches child job so that this job becomes its parent and * returns a handle that should be used to detach it. * * A parent-child relation has the following effect: * * Cancellation of parent with [cancel] or its exceptional completion (failure) * immediately cancels all its children. * * Parent cannot complete until all its children are complete. Parent waits for all its children to * complete in _completing_ or _cancelling_ states. * * **A child must store the resulting [ChildHandle] and [dispose][DisposableHandle.dispose] the attachment * to its parent on its own completion.** * * Coroutine builders and job factory functions that accept `parent` [CoroutineContext] parameter * lookup a [Job] instance in the parent context and use this function to attach themselves as a child. * They also store a reference to the resulting [ChildHandle] and dispose a handle when they complete. * * @suppress This is an internal API. This method is too error prone for public API. */ // ChildJob and ChildHandle are made internal on purpose to further deter 3rd-party impl of Job @InternalCoroutinesApi public fun attachChild(child: ChildJob): ChildHandle // ------------ state waiting ------------ /** * Suspends the coroutine until this job is complete. This invocation resumes normally (without exception) * when the job is complete for any reason and the [Job] of the invoking coroutine is still [active][isActive]. * This function also [starts][Job.start] the corresponding coroutine if the [Job] was still in _new_ state. * * Note that the job becomes complete only when all its children are complete. * * This suspending function is cancellable and **always** checks for a cancellation of the invoking coroutine's Job. * If the [Job] of the invoking coroutine is cancelled or completed when this * suspending function is invoked or while it is suspended, this function * throws [CancellationException]. * * In particular, it means that a parent coroutine invoking `join` on a child coroutine that was started using * `launch(coroutineContext) { ... }` builder throws [CancellationException] if the child * had crashed, unless a non-standard [CoroutineExceptionHandler] is installed in the context. * * This function can be used in [select] invocation with [onJoin] clause. * Use [isCompleted] to check for a completion of this job without waiting. * * There is [cancelAndJoin] function that combines an invocation of [cancel] and `join`. */ public suspend fun join() /** * Clause for [select] expression of [join] suspending function that selects when the job is complete. * This clause never fails, even if the job completes exceptionally. */ public val onJoin: SelectClause0 // ------------ low-level state-notification ------------ /** * Registers handler that is **synchronously** invoked once on completion of this job. * When the job is already complete, then the handler is immediately invoked * with the job's exception or cancellation cause or `null`. Otherwise, the handler will be invoked once when this * job is complete. * * The meaning of `cause` that is passed to the handler: * * Cause is `null` when the job has completed normally. * * Cause is an instance of [CancellationException] when the job was cancelled _normally_. * **It should not be treated as an error**. In particular, it should not be reported to error logs. * * Otherwise, the job had _failed_. * * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the * registration of this handler and release its memory if its invocation is no longer needed. * There is no need to dispose the handler after completion of this job. The references to * all the handlers are released when this job completes. * * Installed [handler] should not throw any exceptions. If it does, they will get caught, * wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code. * * **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe. * This handler can be invoked concurrently with the surrounding code. * There is no guarantee on the execution context in which the [handler] is invoked. */ public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle /** * Registers handler that is **synchronously** invoked once on cancellation or completion of this job. * when the job was already cancelled and is completed its execution, then the handler is immediately invoked * with the job's cancellation cause or `null` unless [invokeImmediately] is set to false. * Otherwise, handler will be invoked once when this job is cancelled or is complete. * * The meaning of `cause` that is passed to the handler: * * Cause is `null` when the job has completed normally. * * Cause is an instance of [CancellationException] when the job was cancelled _normally_. * **It should not be treated as an error**. In particular, it should not be reported to error logs. * * Otherwise, the job had _failed_. * * Invocation of this handler on a transition to a _cancelling_ state * is controlled by [onCancelling] boolean parameter. * The handler is invoked when the job becomes _cancelling_ if [onCancelling] parameter is set to `true`. * * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the * registration of this handler and release its memory if its invocation is no longer needed. * There is no need to dispose the handler after completion of this job. The references to * all the handlers are released when this job completes. * * Installed [handler] should not throw any exceptions. If it does, they will get caught, * wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code. * * **Note**: This function is a part of internal machinery that supports parent-child hierarchies * and allows for implementation of suspending functions that wait on the Job's state. * This function should not be used in general application code. * Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe. * This handler can be invoked concurrently with the surrounding code. * There is no guarantee on the execution context in which the [handler] is invoked. * * @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state; * when `false` then the [handler] is invoked only when it transitions to _completed_ state. * @param invokeImmediately when `true` and this job is already in the desired state (depending on [onCancelling]), * then the [handler] is immediately and synchronously invoked and no-op [DisposableHandle] is returned; * when `false` then no-op [DisposableHandle] is returned, but the [handler] is not invoked. * @param handler the handler. * * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi public fun invokeOnCompletion( onCancelling: Boolean = false, invokeImmediately: Boolean = true, handler: CompletionHandler): DisposableHandle // ------------ unstable internal API ------------ /** * @suppress **Error**: Operator '+' on two Job objects is meaningless. * Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts. * The job to the right of `+` just replaces the job the left of `+`. */ @Suppress("DeprecatedCallableAddReplaceWith") @Deprecated(message = "Operator '+' on two Job objects is meaningless. " + "Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " + "The job to the right of `+` just replaces the job the left of `+`.", level = DeprecationLevel.ERROR) public operator fun plus(other: Job): Job = other }
JobSupport
JobSupport is a non abstract class that implements four interfaces: Job, ChildJob, ParentJob and SelectClause0. SelectClause0 is a Select expression that can be ignored first. What is a Job? As mentioned above, ChildJob and ParentJob inherit the two interfaces of Job respectively. One is to remind parentCancelled and the other is to getChildJobCancellationCause(). There is a state inside to store the ChildHandleNode linked list. coroutine.start will call parent's JobSupport AttachChild () then calls invokeOnCompletion to join the state of parent. Ordinary invokeoncompletion is to join your own state.
/** * A concrete implementation of [Job]. It is optionally a child to a parent job. * * This is an open class designed for extension by more specific classes that might augment the * state and mare store addition state information for completed jobs, like their result values. * *[Concrete realization of work. It can be a child job of the parent job. * *This is an open class designed to extend more specific classes and may enhance * state And mare store additional status information of completed jobs, such as their result values. * @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details. * @suppress **This is unstable API and it is subject to change.** */ @Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases") public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { final override val key: CoroutineContext.Key<*> get() = Job /* === Internal states === name state class public state description ------ ------------ ------------ ----------- EMPTY_N EmptyNew : New no listeners EMPTY_A EmptyActive : Active no listeners SINGLE JobNode : Active a single listener SINGLE+ JobNode : Active a single listener + NodeList added as its next LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew) LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive) COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*) CANCELLING Finishing : Cancelling -- " -- FINAL_C Cancelled : Cancelled Cancelled (final state) FINAL_R <any> : Completed produced some result === Transitions === New states Active states Inactive states +---------+ +---------+ } | EMPTY_N | ----> | EMPTY_A | ----+ } Empty states +---------+ +---------+ | } | | | ^ | +----------+ | | | | +--> | FINAL_* | | | V | | +----------+ | | +---------+ | } | | | SINGLE | ----+ } JobNode states | | +---------+ | } | | | | } | | V | } | | +---------+ | } | +-------> | SINGLE+ | ----+ } | +---------+ | } | | | V V | +---------+ +---------+ | } | LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states +---------+ +---------+ | } | | | | | | | +--------+ | | | | | V | | | | +------------+ | +------------+ } | +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states | | +------------+ +------------+ } | | | ^ | | | | +--------+---------+--------------------+ This state machine and its transition matrix are optimized for the common case when a job is created in active state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R state without going to COMPLETING state) Note that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor` ---------- TIMELINE of state changes and notification in Job lifecycle ---------- | The longest possible chain of events in shown, shorter versions cut-through intermediate states, | while still performing all the notifications in this order. + Job object is created ## NEW: state == EMPTY_NEW | is InactiveNodeList + initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle) ~ waits for start >> start / join / await invoked ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList + onStartInternal / onStart (lazy coroutine is started) ~ active coroutine is working (or scheduled to execution) >> childCancelled / cancelImpl invoked ## CANCELLING: state is Finishing, state.rootCause != null ------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle ------ new children get immediately cancelled, but are still admitted to the list + onCancelling + notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception) + cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too) ~ waits for completion of coroutine body >> makeCompleting / makeCompletingOnce invoked ## COMPLETING: state is Finishing, state.isCompleting == true ------ new children are not admitted anymore, attachChild returns NonDisposableHandle ~ waits for children >> last child completes - computes the final exception ## SEALED: state is Finishing, state.isSealed == true ------ cancel/childCancelled returns false (cannot handle exceptions anymore) + cancelParent (final exception is communicated to the parent, parent incorporates it) + handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler) ## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled) ------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle + parentHandle.dispose + notifyCompletion (invoke all completion listeners) + onCompletionInternal / onCompleted / onCancelled --------------------------------------------------------------------------------- */ // Note: use shared objects while we have no listeners private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW) private val _parentHandle = atomic<ChildHandle?>(null) internal var parentHandle: ChildHandle? get() = _parentHandle.value set(value) { _parentHandle.value = value } // ------------ initialization ------------ /** * Initializes parent job. * It shall be invoked at most once after construction after all other initialization. */ internal fun initParentJobInternal(parent: Job?) { assert { parentHandle == null } if (parent == null) { parentHandle = NonDisposableHandle return } parent.start() // make sure the parent is started @Suppress("DEPRECATION") val handle = parent.attachChild(this) parentHandle = handle // now check our state _after_ registering (see tryFinalizeSimpleState order of actions) if (isCompleted) { handle.dispose() parentHandle = NonDisposableHandle // release it just in case, to aid GC } } // ------------ state query ------------ /** * Returns current state of this job. * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox] * and should be [unboxed][unboxState] before returning to user code. */ internal val state: Any? get() { _state.loop { state -> // helper loop on state (complete in-progress atomic operations) if (state !is OpDescriptor) return state state.perform(this) } } /** * @suppress **This is unstable API and it is subject to change.** */ private inline fun loopOnState(block: (Any?) -> Unit): Nothing { while (true) { block(state) } } public override val isActive: Boolean get() { val state = this.state return state is Incomplete && state.isActive } public final override val isCompleted: Boolean get() = state !is Incomplete public final override val isCancelled: Boolean get() { val state = this.state return state is CompletedExceptionally || (state is Finishing && state.isCancelling) } // ------------ state update ------------ // Finalizes Finishing -> Completed (terminal state) transition. // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. // Returns final state that was created and updated to private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? { /* * Note: proposed state can be Incomplete, e.g. * async { * something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood * } */ assert { this.state === state } // consistency check -- it cannot change assert { !state.isSealed } // consistency check -- cannot be sealed yet assert { state.isCompleting } // consistency check -- must be marked as completing val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause // Create the final exception and seal the state so that no more exceptions can be added var wasCancelling = false // KLUDGE: we cannot have contract for our own expect fun synchronized val finalException = synchronized(state) { wasCancelling = state.isCancelling val exceptions = state.sealLocked(proposedException) val finalCause = getFinalRootCause(state, exceptions) if (finalCause != null) addSuppressedExceptions(finalCause, exceptions) finalCause } // Create the final state object val finalState = when { // was not cancelled (no exception) -> use proposed update value finalException == null -> proposedUpdate // small optimization when we can used proposeUpdate object as is on cancellation finalException === proposedException -> proposedUpdate // cancelled job final state else -> CompletedExceptionally(finalException) } // Now handle the final exception if (finalException != null) { val handled = cancelParent(finalException) || handleJobException(finalException) if (handled) (finalState as CompletedExceptionally).makeHandled() } // Process state updates for the final state before the state of the Job is actually set to the final state // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet. if (!wasCancelling) onCancelling(finalException) onCompletionInternal(finalState) // Then CAS to completed state -> it must succeed val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete()) assert { casSuccess } // And process all post-completion actions completeStateFinalization(state, finalState) return finalState } private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? { // A case of no exceptions if (exceptions.isEmpty()) { // materialize cancellation exception if it was not materialized yet if (state.isCancelling) return defaultCancellationException() return null } /* * 1) If we have non-CE, use it as root cause * 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE * * It is a CE, so it's not reported by children * * The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace * * The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace * 3) Just return the very first CE */ val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException } if (firstNonCancellation != null) return firstNonCancellation val first = exceptions[0] if (first is TimeoutCancellationException) { val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException } if (detailedTimeoutException != null) return detailedTimeoutException } return first } private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) { if (exceptions.size <= 1) return // nothing more to do here val seenExceptions = identitySet<Throwable>(exceptions.size) /* * Note that root cause may be a recovered exception as well. * To avoid cycles we unwrap the root cause and check for self-suppression against unwrapped cause, * but add suppressed exceptions to the recovered root cause (as it is our final exception) */ val unwrappedCause = unwrap(rootCause) for (exception in exceptions) { val unwrapped = unwrap(exception) if (unwrapped !== rootCause && unwrapped !== unwrappedCause && unwrapped !is CancellationException && seenExceptions.add(unwrapped)) { rootCause.addSuppressedThrowable(unwrapped) } } } // fast-path method to finalize normally completed coroutines without children // returns true if complete, and afterCompletion(update) shall be called private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean { assert { state is Empty || state is JobNode<*> } // only simple state without lists where children can concurrently add assert { update !is CompletedExceptionally } // only for normal completion if (!_state.compareAndSet(state, update.boxIncomplete())) return false onCancelling(null) // simple state is not a failure onCompletionInternal(update) completeStateFinalization(state, update) return true } // suppressed == true when any exceptions were suppressed while building the final completion cause private fun completeStateFinalization(state: Incomplete, update: Any?) { /* * Now the job in THE FINAL state. We need to properly handle the resulting state. * Order of various invocations here is important. * * 1) Unregister from parent job. */ parentHandle?.let { it.dispose() // volatile read parentHandle _after_ state was updated parentHandle = NonDisposableHandle // release it just in case, to aid GC } val cause = (update as? CompletedExceptionally)?.cause /* * 2) Invoke completion handlers: .join(), callbacks etc. * It's important to invoke them only AFTER exception handling and everything else, see #208 */ if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case) try { state.invoke(cause) } catch (ex: Throwable) { handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex)) } } else { state.list?.notifyCompletion(cause) } } private fun notifyCancelling(list: NodeList, cause: Throwable) { // first cancel our own children onCancelling(cause) notifyHandlers<JobCancellingNode<*>>(list, cause) // then cancel parent cancelParent(cause) // tentative cancellation -- does not matter if there is no parent } /** * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent. * Returns `true` if the parent is responsible for handling the exception, `false` otherwise. * * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception * may leak to the [CoroutineExceptionHandler]. */ private fun cancelParent(cause: Throwable): Boolean { // Is scoped coroutine -- don't propagate, will be rethrown if (isScopedCoroutine) return true /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it. * This allow parent to cancel its children (normally) without being cancelled itself, unless * child crashes and produce some other exception during its completion. */ val isCancellation = cause is CancellationException val parent = parentHandle // No parent -- ignore CE, report other exceptions. if (parent === null || parent === NonDisposableHandle) { return isCancellation } // Notify parent but don't forget to check cancellation return parent.childCancelled(cause) || isCancellation } private fun NodeList.notifyCompletion(cause: Throwable?) = notifyHandlers<JobNode<*>>(this, cause) private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) { var exception: Throwable? = null list.forEach<T> { node -> try { node.invoke(cause) } catch (ex: Throwable) { exception?.apply { addSuppressedThrowable(ex) } ?: run { exception = CompletionHandlerException("Exception in completion handler $node for $this", ex) } } } exception?.let { handleOnCompletionException(it) } } public final override fun start(): Boolean { loopOnState { state -> when (startInternal(state)) { FALSE -> return false TRUE -> return true } } } // returns: RETRY/FALSE/TRUE: // FALSE when not new, // TRUE when started // RETRY when need to retry private fun startInternal(state: Any?): Int { when (state) { is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) return FALSE // already active if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY onStartInternal() return TRUE } is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers if (!_state.compareAndSet(state, state.list)) return RETRY onStartInternal() return TRUE } else -> return FALSE // not a new state } } /** * Override to provide the actual [start] action. * This function is invoked exactly once when non-active coroutine is [started][start]. */ internal open fun onStartInternal() {} public final override fun getCancellationException(): CancellationException = when (val state = this.state) { is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling") ?: error("Job is still new or active: $this") is Incomplete -> error("Job is still new or active: $this") is CompletedExceptionally -> state.cause.toCancellationException() else -> JobCancellationException("$classSimpleName has completed normally", null, this) } protected fun Throwable.toCancellationException(message: String? = null): CancellationException = this as? CancellationException ?: defaultCancellationException(message, this) /** * Returns the cause that signals the completion of this job -- it returns the original * [cancel] cause, [CancellationException] or **`null` if this job had completed normally**. * This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor * is being cancelled yet. */ protected val completionCause: Throwable? get() = when (val state = state) { is Finishing -> state.rootCause ?: error("Job is still new or active: $this") is Incomplete -> error("Job is still new or active: $this") is CompletedExceptionally -> state.cause else -> null } /** * Returns `true` when [completionCause] exception was handled by parent coroutine. */ protected val completionCauseHandled: Boolean get() = state.let { it is CompletedExceptionally && it.handled } @Suppress("OverridingDeprecatedMember") public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler) public final override fun invokeOnCompletion( onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler ): DisposableHandle { var nodeCache: JobNode<*>? = null loopOnState { state -> when (state) { //The first time is empty is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) { // try move to SINGLE state val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (_state.compareAndSet(state, node)) return node } else promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine } is Incomplete -> { //Already val list = state.list //If it's not a linked list, turn it into a linked list if (list == null) { // SINGLE/SINGLE+ promoteSingleToNodeList(state as JobNode<*>) } else { //If it is a linked list, insert it directly var rootCause: Throwable? = null var handle: DisposableHandle = NonDisposableHandle if (onCancelling && state is Finishing) { synchronized(state) { // check if we are installing cancellation handler on job that is being cancelled rootCause = state.rootCause // != null if cancelling job // We add node to the list in two cases --- either the job is not being cancelled // or we are adding a child to a coroutine that is not completing yet if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) { // Note: add node the list while holding lock on state (make sure it cannot change) val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (!addLastAtomic(state, list, node)) return@loopOnState // retry // just return node if we don't have to invoke handler (not cancelling yet) if (rootCause == null) return node // otherwise handler is invoked immediately out of the synchronized section & handle returned handle = node } } } if (rootCause != null) { // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job if (invokeImmediately) handler.invokeIt(rootCause) return handle } else { val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (addLastAtomic(state, list, node)) return node } } } else -> { // is complete // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, // because we play type tricks on Kotlin/JS and handler is not necessarily a function there if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) return NonDisposableHandle } } } } private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> { return if (onCancelling) (handler as? JobCancellingNode<*>)?.also { assert { it.job === this } } ?: InvokeOnCancelling(this, handler) else (handler as? JobNode<*>)?.also { assert { it.job === this && it !is JobCancellingNode } } ?: InvokeOnCompletion(this, handler) } private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) = list.addLastIf(node) { this.state === expect } private fun promoteEmptyToNodeList(state: Empty) { // try to promote it to LIST state with the corresponding state val list = NodeList() val update = if (state.isActive) list else InactiveNodeList(list) _state.compareAndSet(state, update) } private fun promoteSingleToNodeList(state: JobNode<*>) { // try to promote it to list (SINGLE+ state) state.addOneIfEmpty(NodeList()) // it must be in SINGLE+ state or state has changed (node could have need removed from state) val list = state.nextNode // either our NodeList or somebody else won the race, updated state // just attempt converting it to list if state is still the same, then we'll continue lock-free loop _state.compareAndSet(state, list) } public final override suspend fun join() { if (!joinInternal()) { // fast-path no wait coroutineContext.checkCompletion() return // do not suspend } return joinSuspend() // slow-path wait } private fun joinInternal(): Boolean { loopOnState { state -> if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait if (startInternal(state) >= 0) return true // wait unless need to retry } } private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont -> // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler)) } public final override val onJoin: SelectClause0 get() = this // registerSelectJoin public final override fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R) { // fast-path -- check state and select/return if needed loopOnState { state -> if (select.isSelected) return if (state !is Incomplete) { // already complete -- select result if (select.trySelect()) { block.startCoroutineUnintercepted(select.completion) } return } if (startInternal(state) == 0) { // slow-path -- register waiter for completion select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler)) return } } } /** * @suppress **This is unstable API and it is subject to change.** */ internal fun removeNode(node: JobNode<*>) { // remove logic depends on the state of the job loopOnState { state -> when (state) { is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler if (state !== node) return // a different job node --> we were already removed // try remove and revert back to empty state if (_state.compareAndSet(state, EMPTY_ACTIVE)) return } is Incomplete -> { // may have a list of completion handlers // remove node from the list if there is a list if (state.list != null) node.remove() return } else -> return // it is complete and does not have any completion handlers } } } /** * Returns `true` for job that do not have "body block" to complete and should immediately go into * completing state and start waiting for children. * * @suppress **This is unstable API and it is subject to change.** */ internal open val onCancelComplete: Boolean get() = false // external cancel with cause, never invoked implicitly from internal machinery public override fun cancel(cause: CancellationException?) { cancelInternal(cause ?: defaultCancellationException()) } protected open fun cancellationExceptionMessage(): String = "Job was cancelled" // HIDDEN in Job interface. Invoked only by legacy compiled code. // external cancel with (optional) cause, never invoked implicitly from internal machinery @Deprecated(level = DeprecationLevel.HIDDEN, message = "Added since 1.2.0 for binary compatibility with versions <= 1.1.x") public override fun cancel(cause: Throwable?): Boolean { cancelInternal(cause?.toCancellationException() ?: defaultCancellationException()) return true } // It is overridden in channel-linked implementation public open fun cancelInternal(cause: Throwable) { cancelImpl(cause) } // Parent is cancelling child public final override fun parentCancelled(parentJob: ParentJob) { cancelImpl(parentJob) } /** * Child was cancelled with a cause. * In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child. * It is overridden in supervisor implementations to completely ignore any child cancellation. * Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception) * * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception * may leak to the [CoroutineExceptionHandler]. */ public open fun childCancelled(cause: Throwable): Boolean { if (cause is CancellationException) return true return cancelImpl(cause) && handlesException } /** * Makes this [Job] cancelled with a specified [cause]. * It is used in [AbstractCoroutine]-derived classes when there is an internal failure. */ public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause) // cause is Throwable or ParentJob when cancelChild was invoked // returns true is exception was handled, false otherwise internal fun cancelImpl(cause: Any?): Boolean { var finalState: Any? = COMPLETING_ALREADY if (onCancelComplete) { // make sure it is completing, if cancelMakeCompleting returns state it means it had make it // completing and had recorded exception finalState = cancelMakeCompleting(cause) if (finalState === COMPLETING_WAITING_CHILDREN) return true } if (finalState === COMPLETING_ALREADY) { finalState = makeCancelling(cause) } return when { finalState === COMPLETING_ALREADY -> true finalState === COMPLETING_WAITING_CHILDREN -> true finalState === TOO_LATE_TO_CANCEL -> false else -> { afterCompletion(finalState) true } } } // cause is Throwable or ParentJob when cancelChild was invoked // It contains a loop and never returns COMPLETING_RETRY, can return // COMPLETING_ALREADY -- if already completed/completing // COMPLETING_WAITING_CHILDREN -- if started waiting for children // final state -- when completed, for call to afterCompletion private fun cancelMakeCompleting(cause: Any?): Any? { loopOnState { state -> if (state !is Incomplete || state is Finishing && state.isCompleting) { // already completed/completing, do not even create exception to propose update return COMPLETING_ALREADY } val proposedUpdate = CompletedExceptionally(createCauseException(cause)) val finalState = tryMakeCompleting(state, proposedUpdate) if (finalState !== COMPLETING_RETRY) return finalState } } @Suppress("NOTHING_TO_INLINE") // Save a stack frame internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) = JobCancellationException(message ?: cancellationExceptionMessage(), cause, this) override fun getChildJobCancellationCause(): CancellationException { // determine root cancellation cause of this job (why is it cancelling its children?) val state = this.state val rootCause = when (state) { is Finishing -> state.rootCause is CompletedExceptionally -> state.cause is Incomplete -> error("Cannot be cancelling child in this state: $state") else -> null // create exception with the below code on normal completion } return (rootCause as? CancellationException) ?: JobCancellationException("Parent job is ${stateString(state)}", rootCause, this) } // cause is Throwable or ParentJob when cancelChild was invoked private fun createCauseException(cause: Any?): Throwable = when (cause) { is Throwable? -> cause ?: defaultCancellationException() else -> (cause as ParentJob).getChildJobCancellationCause() } // transitions to Cancelling state // cause is Throwable or ParentJob when cancelChild was invoked // It contains a loop and never returns COMPLETING_RETRY, can return // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception // final state -- when completed, for call to afterCompletion private fun makeCancelling(cause: Any?): Any? { var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause) loopOnState { state -> when (state) { is Finishing -> { // already finishing -- collect exceptions val notifyRootCause = synchronized(state) { if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled // add exception, do nothing is parent is cancelling child that is already being cancelled val wasCancelling = state.isCancelling // will notify if was not cancelling // Materialize missing exception if it is the first exception (otherwise -- don't) if (cause != null || !wasCancelling) { val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it } state.addExceptionLocked(causeException) } // take cause for notification if was not in cancelling state before state.rootCause.takeIf { !wasCancelling } } notifyRootCause?.let { notifyCancelling(state.list, it) } return COMPLETING_ALREADY } is Incomplete -> { // Not yet finishing -- try to make it cancelling val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it } if (state.isActive) { // active state becomes cancelling if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY } else { // non active state starts completing val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException)) when { finalState === COMPLETING_ALREADY -> error("Cannot happen in $state") finalState === COMPLETING_RETRY -> return@loopOnState else -> return finalState } } } else -> return TOO_LATE_TO_CANCEL // already complete } } } // Performs promotion of incomplete coroutine state to NodeList for the purpose of // converting coroutine state to Cancelling, returns null when need to retry private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?: when (state) { is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state is JobNode<*> -> { // SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot // correctly capture a reference to it promoteSingleToNodeList(state) null // retry } else -> error("State should have list: $state") } // try make new Cancelling state on the condition that we're still in the expected state private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean { assert { state !is Finishing } // only for non-finishing states assert { state.isActive } // only for active states // get state's list or else promote to list to correctly operate on child lists val list = getOrPromoteCancellingList(state) ?: return false // Create cancelling state (with rootCause!) val cancelling = Finishing(list, false, rootCause) if (!_state.compareAndSet(state, cancelling)) return false // Notify listeners notifyCancelling(list, rootCause) return true } /** * Completes this job. Used by [CompletableDeferred.complete] (and exceptionally) * and by [JobImpl.cancel]. It returns `false` on repeated invocation * (when this job is already completing). */ internal fun makeCompleting(proposedUpdate: Any?): Boolean { loopOnState { state -> val finalState = tryMakeCompleting(state, proposedUpdate) when { finalState === COMPLETING_ALREADY -> return false finalState === COMPLETING_WAITING_CHILDREN -> return true finalState === COMPLETING_RETRY -> return@loopOnState else -> { afterCompletion(finalState) return true } } } } /** * Completes this job. Used by [AbstractCoroutine.resume]. * It throws [IllegalStateException] on repeated invocation (when this job is already completing). * Returns: * * [COMPLETING_WAITING_CHILDREN] if started waiting for children. * * Final state otherwise (caller should do [afterCompletion]) */ internal fun makeCompletingOnce(proposedUpdate: Any?): Any? { loopOnState { state -> val finalState = tryMakeCompleting(state, proposedUpdate) when { finalState === COMPLETING_ALREADY -> throw IllegalStateException( "Job $this is already complete or completing, " + "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull ) finalState === COMPLETING_RETRY -> return@loopOnState else -> return finalState // COMPLETING_WAITING_CHILDREN or final state } } } // Returns one of COMPLETING symbols or final state: // COMPLETING_ALREADY -- when already complete or completing // COMPLETING_RETRY -- when need to retry due to interference // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children // final state -- when completed, for call to afterCompletion private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? { if (state !is Incomplete) return COMPLETING_ALREADY /* * FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately * Cancellation (failures) always have to go through Finishing state to serialize exception handling. * Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join) * which may miss unhandled exception. */ if ((state is Empty || state is JobNode<*>) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) { if (tryFinalizeSimpleState(state, proposedUpdate)) { // Completed successfully on fast path -- return updated state return proposedUpdate } return COMPLETING_RETRY } // The separate slow-path function to simplify profiling return tryMakeCompletingSlowPath(state, proposedUpdate) } // Returns one of COMPLETING symbols or final state: // COMPLETING_ALREADY -- when already complete or completing // COMPLETING_RETRY -- when need to retry due to interference // COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children // final state -- when completed, for call to afterCompletion private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? { // get state's list or else promote to list to correctly operate on child lists val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY // promote to Finishing state if we are not in it yet // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet // atomically transition to finishing & completing state val finishing = state as? Finishing ?: Finishing(list, false, null) // must synchronize updates to finishing state var notifyRootCause: Throwable? = null synchronized(finishing) { // check if this state is already completing if (finishing.isCompleting) return COMPLETING_ALREADY // mark as completing finishing.isCompleting = true // if we need to promote to finishing then atomically do it here. // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap. if (finishing !== state) { if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY } // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point assert { !finishing.isSealed } // cannot be sealed // add new proposed exception to the finishing state val wasCancelling = finishing.isCancelling (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) } // If it just becomes cancelling --> must process cancelling notifications notifyRootCause = finishing.rootCause.takeIf { !wasCancelling } } // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!) notifyRootCause?.let { notifyCancelling(list, it) } // now wait for children val child = firstChild(state) if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN // otherwise -- we have not children left (all were already cancelled?) return finalizeFinishingState(finishing, proposedUpdate) } private val Any?.exceptionOrNull: Throwable? get() = (this as? CompletedExceptionally)?.cause private fun firstChild(state: Incomplete) = state as? ChildHandleNode ?: state.list?.nextChild() // return false when there is no more incomplete children to wait // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { val handle = child.childJob.invokeOnCompletion( invokeImmediately = false, handler = ChildCompletion(this, state, child, proposedUpdate).asHandler ) if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it val nextChild = child.nextChild() ?: return false return tryWaitForChild(state, nextChild, proposedUpdate) } // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) { assert { this.state === state } // consistency check -- it cannot change while we are waiting for children // figure out if we need to wait for next child val waitChild = lastChild.nextChild() // try wait for next child if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child // no more children to wait -- try update state val finalState = finalizeFinishingState(state, proposedUpdate) afterCompletion(finalState) } private fun LockFreeLinkedListNode.nextChild(): ChildHandleNode? { var cur = this while (cur.isRemoved) cur = cur.prevNode // rollback to prev non-removed (or list head) while (true) { cur = cur.nextNode if (cur.isRemoved) continue if (cur is ChildHandleNode) return cur if (cur is NodeList) return null // checked all -- no more children } } public final override val children: Sequence<Job> get() = sequence { when (val state = this@JobSupport.state) { is ChildHandleNode -> yield(state.childJob) is Incomplete -> state.list?.let { list -> list.forEach<ChildHandleNode> { yield(it.childJob) } } } } @Suppress("OverridingDeprecatedMember") public final override fun attachChild(child: ChildJob): ChildHandle { /* * Note: This function attaches a special ChildHandleNode node object. This node object * is handled in a special way on completion on the coroutine (we wait for all of them) and * is handled specially by invokeOnCompletion itself -- it adds this node to the list even * if the job is already cancelling. For cancelling state child is attached under state lock. * It's required to properly wait all children before completion and provide linearizable hierarchy view: * If child is attached when the job is already being cancelled, such child will receive immediate notification on * cancellation, but parent *will* wait for that child before completion and will handle its exception. */ return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle } /** * Override to process any exceptions that were encountered while invoking completion handlers * installed via [invokeOnCompletion]. * * @suppress **This is unstable API and it is subject to change.** */ internal open fun handleOnCompletionException(exception: Throwable) { throw exception } /** * This function is invoked once as soon as this job is being cancelled for any reason or completes, * similarly to [invokeOnCompletion] with `onCancelling` set to `true`. * * The meaning of [cause] parameter: * * Cause is `null` when the job has completed normally. * * Cause is an instance of [CancellationException] when the job was cancelled _normally_. * **It should not be treated as an error**. In particular, it should not be reported to error logs. * * Otherwise, the job had been cancelled or failed with exception. * * The specified [cause] is not the final cancellation cause of this job. * A job may produce other exceptions while it is failing and the final cause might be different. * * @suppress **This is unstable API and it is subject to change.* */ protected open fun onCancelling(cause: Throwable?) {} /** * Returns `true` for scoped coroutines. * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency. * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope. * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`. */ protected open val isScopedCoroutine: Boolean get() = false /** * Returns `true` for jobs that handle their exceptions or integrate them into the job's result via [onCompletionInternal]. * A valid implementation of this getter should recursively check parent as well before returning `false`. * * The only instance of the [Job] that does not handle its exceptions is [JobImpl] and its subclass [SupervisorJobImpl]. * @suppress **This is unstable API and it is subject to change.* */ internal open val handlesException: Boolean get() = true /** * Handles the final job [exception] that was not handled by the parent coroutine. * Returns `true` if it handles exception (so handling at later stages is not needed). * It is designed to be overridden by launch-like coroutines * (`StandaloneCoroutine` and `ActorCoroutine`) that don't have a result type * that can represent exceptions. * * This method is invoked **exactly once** when the final exception of the job is determined * and before it becomes complete. At the moment of invocation the job and all its children are complete. */ protected open fun handleJobException(exception: Throwable): Boolean = false /** * Override for completion actions that need to update some external object depending on job's state, * right before all the waiters for coroutine's completion are notified. * * @param state the final state. * * @suppress **This is unstable API and it is subject to change.** */ protected open fun onCompletionInternal(state: Any?) {} /** * Override for the very last action on job's completion to resume the rest of the code in * scoped coroutines. It is called when this job is externally completed in an unknown * context and thus should resume with a default mode. * * @suppress **This is unstable API and it is subject to change.** */ protected open fun afterCompletion(state: Any?) {} // for nicer debugging public override fun toString(): String = "${toDebugString()}@$hexAddress" @InternalCoroutinesApi public fun toDebugString(): String = "${nameString()}{${stateString(state)}}" /** * @suppress **This is unstable API and it is subject to change.** */ internal open fun nameString(): String = classSimpleName private fun stateString(state: Any?): String = when (state) { is Finishing -> when { state.isCancelling -> "Cancelling" state.isCompleting -> "Completing" else -> "Active" } is Incomplete -> if (state.isActive) "Active" else "New" is CompletedExceptionally -> "Cancelled" else -> "Completed" } // Completing & Cancelling states, // All updates are guarded by synchronized(this), reads are volatile @Suppress("UNCHECKED_CAST") private class Finishing( override val list: NodeList, isCompleting: Boolean, rootCause: Throwable? ) : SynchronizedObject(), Incomplete { private val _isCompleting = atomic(isCompleting) var isCompleting: Boolean get() = _isCompleting.value set(value) { _isCompleting.value = value } private val _rootCause = atomic(rootCause) var rootCause: Throwable? // NOTE: rootCause is kept even when SEALED get() = _rootCause.value set(value) { _rootCause.value = value } private val _exceptionsHolder = atomic<Any?>(null) private var exceptionsHolder: Any? // Contains null | Throwable | ArrayList | SEALED get() = _exceptionsHolder.value set(value) { _exceptionsHolder.value = value } // Note: cannot be modified when sealed val isSealed: Boolean get() = exceptionsHolder === SEALED val isCancelling: Boolean get() = rootCause != null override val isActive: Boolean get() = rootCause == null // !isCancelling // Seals current state and returns list of exceptions // guarded by `synchronized(this)` fun sealLocked(proposedException: Throwable?): List<Throwable> { val list = when(val eh = exceptionsHolder) { // volatile read null -> allocateList() is Throwable -> allocateList().also { it.add(eh) } is ArrayList<*> -> eh as ArrayList<Throwable> else -> error("State is $eh") // already sealed -- cannot happen } val rootCause = this.rootCause // volatile read rootCause?.let { list.add(0, it) } // note -- rootCause goes to the beginning if (proposedException != null && proposedException != rootCause) list.add(proposedException) exceptionsHolder = SEALED return list } // guarded by `synchronized(this)` fun addExceptionLocked(exception: Throwable) { val rootCause = this.rootCause // volatile read if (rootCause == null) { this.rootCause = exception return } if (exception === rootCause) return // nothing to do when (val eh = exceptionsHolder) { // volatile read null -> exceptionsHolder = exception is Throwable -> { if (exception === eh) return // nothing to do exceptionsHolder = allocateList().apply { add(eh) add(exception) } } is ArrayList<*> -> (eh as ArrayList<Throwable>).add(exception) else -> error("State is $eh") // already sealed -- cannot happen } } private fun allocateList() = ArrayList<Throwable>(4) override fun toString(): String = "Finishing[cancelling=$isCancelling, completing=$isCompleting, rootCause=$rootCause, exceptions=$exceptionsHolder, list=$list]" } private val Incomplete.isCancelling: Boolean get() = this is Finishing && isCancelling // Used by parent that is waiting for child completion private class ChildCompletion( private val parent: JobSupport, private val state: Finishing, private val child: ChildHandleNode, private val proposedUpdate: Any? ) : JobNode<Job>(child.childJob) { override fun invoke(cause: Throwable?) { parent.continueCompleting(state, child, proposedUpdate) } } private class AwaitContinuation<T>( delegate: Continuation<T>, private val job: JobSupport ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) { override fun getContinuationCancellationCause(parent: Job): Throwable { val state = job.state /* * When the job we are waiting for had already completely completed exceptionally or * is failing, we shall use its root/completion cause for await's result. */ if (state is Finishing) state.rootCause?.let { return it } if (state is CompletedExceptionally) return state.cause return parent.getCancellationException() } protected override fun nameString(): String = "AwaitContinuation" } /* * ================================================================================================= * This is ready-to-use implementation for Deferred interface. * However, it is not type-safe. Conceptually it just exposes the value of the underlying * completed state as `Any?` * ================================================================================================= */ public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally public fun getCompletionExceptionOrNull(): Throwable? { val state = this.state check(state !is Incomplete) { "This job has not completed yet" } return state.exceptionOrNull } /** * @suppress **This is unstable API and it is subject to change.** */ internal fun getCompletedInternal(): Any? { val state = this.state check(state !is Incomplete) { "This job has not completed yet" } if (state is CompletedExceptionally) throw state.cause return state.unboxState() } /** * @suppress **This is unstable API and it is subject to change.** */ internal suspend fun awaitInternal(): Any? { // fast-path -- check state (avoid extra object creation) while (true) { // lock-free loop on state val state = this.state if (state !is Incomplete) { // already complete -- just return result if (state is CompletedExceptionally) { // Slow path to recover stacktrace recoverAndThrow(state.cause) } return state.unboxState() } if (startInternal(state) >= 0) break // break unless needs to retry } return awaitSuspend() // slow-path } private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont -> /* * Custom code here, so that parent coroutine that is using await * on its child deferred (async) coroutine would throw the exception that this child had * thrown and not a JobCancellationException. */ val cont = AwaitContinuation(uCont.intercepted(), this) cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)) cont.getResult() } /** * @suppress **This is unstable API and it is subject to change.** */ // registerSelectAwaitInternal @Suppress("UNCHECKED_CAST") internal fun <T, R> registerSelectClause1Internal(select: SelectInstance<R>, block: suspend (T) -> R) { // fast-path -- check state and select/return if needed loopOnState { state -> if (select.isSelected) return if (state !is Incomplete) { // already complete -- select result if (select.trySelect()) { if (state is CompletedExceptionally) { select.resumeSelectWithException(state.cause) } else { block.startCoroutineUnintercepted(state.unboxState() as T, select.completion) } } return } if (startInternal(state) == 0) { // slow-path -- register waiter for completion select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler)) return } } } /** * @suppress **This is unstable API and it is subject to change.** */ @Suppress("UNCHECKED_CAST") internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) { val state = this.state // Note: await is non-atomic (can be cancelled while dispatched) if (state is CompletedExceptionally) select.resumeSelectWithException(state.cause) else block.startCoroutineCancellable(state.unboxState() as T, select.completion) } }
Continuation
The continuation indicates the status of the pending process at the starting point
The part after each hanging point of the collaborative process can be regarded as a continuation
sequence { for (i in 1..10) yield(i * i) println("over") }
The above code will have 10 continuations
/** * Interface representing a continuation after a suspension point that returns a value of type `T`. * Interface, which represents the continuation after the hanging point, and returns a value of type'T '. */ @SinceKotlin("1.3") public interface Continuation<in T> { /** * The context of the coroutine that corresponds to this continuation. */ public val context: CoroutineContext /** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value of the last suspension point. */ public fun resumeWith(result: Result<T>) }
CPS
Return value not through return, return value through callback
object CPS { interface Continuation{ fun next(result:Any) } operator fun invoke() { println(normal(1,2)) cps(1,2,object :Continuation{ override fun next(result: Any) { println(result) } }) } fun normal(a: Int, b: Int): Int { return a + b } fun cps(a:Int,b:Int,continuation:Continuation){ continuation.next(a+b) } }
There are two kinds of cps in kotlin. The first is the ordinary suspend fun d. After compilation, a completion is added. If there is no pending call in the method, or the pending call is only at the end of the method, or even no state machine is generated
suspend fun test2(s:String):Int{ pt("test2 $s") return s.toInt() } @Nullable public final Object test1(int i, @NotNull Continuation $completion) { Object $continuation; label20: { if ($completion instanceof <undefinedtype>) { $continuation = (<undefinedtype>)$completion; if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) { ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE; break label20; } } $continuation = new ContinuationImpl($completion) { // $FF: synthetic field Object result; int label; Object L$0; int I$0; @Nullable public final Object invokeSuspend(@NotNull Object $result) { this.result = $result; this.label |= Integer.MIN_VALUE; return Test.this.test1(0, this); } }; } Object $result = ((<undefinedtype>)$continuation).result; Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(((<undefinedtype>)$continuation).label) { case 0: ResultKt.throwOnFailure($result); this.pt("test1 " + i); ((<undefinedtype>)$continuation).L$0 = this; ((<undefinedtype>)$continuation).I$0 = i; ((<undefinedtype>)$continuation).label = 1; if (YieldKt.yield((Continuation)$continuation) == var5) { return var5; } break; case 1: i = ((<undefinedtype>)$continuation).I$0; Test var6 = (Test)((<undefinedtype>)$continuation).L$0; ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return String.valueOf(i); } suspend fun test2(s:String):Int{ pt("test2 $s") return s.toInt() } @Nullable public final Object test2(@NotNull String s, @NotNull Continuation $completion) { pt("test2 " + s); String str = s; boolean bool = false; return Boxing.boxInt(Integer.parseInt(str)); }
The other is the suspend code block, which is generated as suspend lambda
State machine
The performance of coprocessor implementation is very important, which requires creating as few classes and objects as possible. Many languages are implemented through state machines, and so does Kotlin. For Kotlin, using this method allows the compiler to create only one class regardless of how many hanging points are in the body of the suspended lambda expression.
Main idea: the suspend function is compiled into a state machine, and its state corresponds to the suspend starting point. Example: write a pending code block with two pending points:
val a = a() val y = foo(a).await() // Hanging point #1 b() val z = bar(a, y).await() // Hanging point #2 c(z)
This code block has three states:
Initialization (before all hang points)
After the first hanging point
After the second hanging point
Each state is the entry point of a continuation of the code block (the initial continuation starts from the first line).
The code will be compiled into an anonymous class (SuspendLambda). One of its methods implements the state machine, a field holds the current state of the state machine, and the local variable field of the co process is shared between the states (there may also be a field of the co process closure, but in this case it is empty. This is the Java pseudo code of the above code block calling the suspend function await through CPS:
class <anonymous_for_state_machine> extends SuspendLambda<...> { // Current state of state machine int label = 0 // Local variables of CO process A a = null Y y = null void resumeWith(Object result) { if (label == 0) goto L0 if (label == 1) goto L1 if (label == 2) goto L2 else throw IllegalStateException() L0: // For this call, the result should be empty a = a() label = 1 result = foo(a).await(this) // 'this' is passed as a continuation if (result == COROUTINE_SUSPENDED) return // Returns if await suspends execution L1: // External code passed in Recovery process as a result of await() y = (Y) result b() label = 2 result = bar(a, y).await(this) // 'this' is passed as a continuation if (result == COROUTINE_SUSPENDED) return // Returns if await suspends execution L2: // External code passed in Recovery process as a result of await() Z z = (Z) result c(z) label = -1 // There are no other steps return } }
Note that there are goto operators and tags, because the changes described in this example occur in bytecode rather than in the source code.
Now, when the coroutine starts, we call its resumeWith() - the label is 0, then we skip to L0, and then we do some work, set the label to the next state - 1, call await(), which returns if the execution of the coroutine is suspended. When we want to continue, we call resumeWith() again. Now it continues to L1, does some work, sets the state to 2, and calls await(), also returns when suspended. Next time it continues from L3 and sets the status to - 1, which means "it's over and there's no more work to do".
Take out an actual example after compilation. First, decompile class
@DebugMetadata(f = "Test1.kt", l = {20, 21}, i = {}, s = {}, n = {}, m = "invokeSuspend", c = "Test1$invoke$2") @Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 3, d1 = {"\000\n\n\000\n\002\020\002\n\002\b\002\020\000\032\0020\001H\006\004\b\002\020\003"}, d2 = {"<anonymous>", "", "invoke", "(Ljava/lang/Object;)Ljava/lang/Object;"}) static final class Test1$invoke$2 extends SuspendLambda implements Function1<Continuation<? super Unit>, Object> { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { String str; boolean bool; Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch (this.label) { case 0: ResultKt.throwOnFailure($result); this.label = 1; if (Test.INSTANCE.test1(1, (Continuation<? super String>)this) == object) return object; Test.INSTANCE.test1(1, (Continuation<? super String>)this); this.label = 2; if (Test.INSTANCE.test2("1", (Continuation<? super Integer>)this) == object) return object; Test.INSTANCE.test2("1", (Continuation<? super Integer>)this); Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()"); str = Thread.currentThread().getName(); bool = false; System.out.println(str); return Unit.INSTANCE; case 1: ResultKt.throwOnFailure($result); this.label = 2; if (Test.INSTANCE.test2("1", (Continuation<? super Integer>)this) == object) return object; Test.INSTANCE.test2("1", (Continuation<? super Integer>)this); Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()"); str = Thread.currentThread().getName(); bool = false; System.out.println(str); return Unit.INSTANCE; case 2: ResultKt.throwOnFailure($result); Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()"); str = Thread.currentThread().getName(); bool = false; System.out.println(str); return Unit.INSTANCE; } throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } Test1$invoke$2(Continuation param1Continuation) { super(1, param1Continuation); } @NotNull public final Continuation<Unit> create(@NotNull Continuation completion) { Intrinsics.checkParameterIsNotNull(completion, "completion"); return (Continuation<Unit>)new Test1$invoke$2(completion); } public final Object invoke(Object param1Object) { return ((Test1$invoke$2)create((Continuation)param1Object)).invokeSuspend(Unit.INSTANCE); } }
Then comes the decompiled version of kotlin's bytecode
coroutine.start(CoroutineStart.DEFAULT, (Function1)(new Function1((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { label17: { Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); Test var10000; switch(this.label) { case 0: ResultKt.throwOnFailure($result); var10000 = Test.INSTANCE; this.label = 1; if (var10000.test1(1, this) == var4) { return var4; } break; case 1: ResultKt.throwOnFailure($result); break; case 2: ResultKt.throwOnFailure($result); break label17; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } var10000 = Test.INSTANCE; this.label = 2; if (var10000.test2("1", this) == var4) { return var4; } } Thread var5 = Thread.currentThread(); Intrinsics.checkExpressionValueIsNotNull(var5, "Thread.currentThread()"); String var2 = var5.getName(); boolean var3 = false; System.out.println(var2); return Unit.INSTANCE; } @NotNull public final Continuation create(@NotNull Continuation completion) { Intrinsics.checkParameterIsNotNull(completion, "completion"); Function1 var2 = new <anonymous constructor>(completion); return var2; } public final Object invoke(Object var1) { return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE); } }));
invokeSuspend
internal abstract class BaseContinuationImpl( // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet // it has a public getter (since even untrusted code is allowed to inspect its call stack). public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { // This implementation is final. This fact is used to unroll resumeWith recursion. public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { //Execute invokeSuspend val outcome = invokeSuspend(param) //Suspend direct return if (outcome === COROUTINE_SUSPENDED) return //success Result.success(outcome) } catch (exception: Throwable) { //fail Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop //The outward recursive loop continues to call current = completion param = outcome } else { // top-level completion reached -- invoke and return //At this time, the completion is AbstractCoroutine completion.resumeWith(outcome) return } } } } protected abstract fun invokeSuspend(result: Result<Any?>): Any? protected open fun releaseIntercepted() { // does nothing here, overridden in ContinuationImpl } public open fun create(completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Continuation) has not been overridden") } public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden") } public override fun toString(): String = "Continuation at ${getStackTraceElement() ?: this::class.java.name}" // --- CoroutineStackFrame implementation public override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame public override fun getStackTraceElement(): StackTraceElement? = getStackTraceElementImpl() }
CoroutineScope
Defines the scope of the collaboration, including a CoroutineContext
/** * Cancels this scope, including its job and all its children with an optional cancellation [cause]. * A cause can be used to specify an error message or to provide other details on * a cancellation reason for debugging purposes. * Throws [IllegalStateException] if the scope does not have a job in it. */ public fun CoroutineScope.cancel(cause: CancellationException? = null) { val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this") job.cancel(cause) }
You can see that the cancel method of the scope is associated with the job
/** * Defines a scope for new coroutines. Every **coroutine builder** (like [launch], [async], etc) * is an extension on [CoroutineScope] and inherits its [coroutineContext][CoroutineScope.coroutineContext] * to automatically propagate all its elements and cancellation. * * The best ways to obtain a standalone instance of the scope are [CoroutineScope()] and [MainScope()] factory functions. * Additional context elements can be appended to the scope using the [plus][CoroutineScope.plus] operator. * * ### Convention for structured concurrency * * Manual implementation of this interface is not recommended, implementation by delegation should be preferred instead. * By convention, the [context of a scope][CoroutineScope.coroutineContext] should contain an instance of a * [job][Job] to enforce the discipline of **structured concurrency** with propagation of cancellation. * * Every coroutine builder (like [launch], [async], etc) * and every scoping function (like [coroutineScope], [withContext], etc) provides _its own_ scope * with its own [Job] instance into the inner block of code it runs. * By convention, they all wait for all the coroutines inside their block to complete before completing themselves, * thus enforcing the structured concurrency. See [Job] documentation for more details. * *Define the scope for the new collaboration. Each * * co process generator * * (like [launch], [async], etc.) *It is an extension of [CoroutineScope], which inherits its [coroutinescontext] [CoroutineScope. Coroutinescontext] *Automatically propagate all its elements and cancel. * *The best way to get a separate scope instance is the [CoroutineScope()] and [MainScope()] factory functions. *Other context elements can use the [plus][CoroutineScope. +) operator. * Structured concurrency conventions * *Manual implementation of this interface is not recommended, but should be implemented by delegation. By convention, [context of scope] [CoroutineScope.]coroutineContext] should contain a * [job][job]Strengthen the rules of * * structured concurrency by propagating cancellation. * *Each co process generator (such as [launch], [async], etc.) *And each scope function (such as [coroutineScope], [withContext], etc.)_ its own_ Scope *Put your own [Job] instance into the internal code block it runs. *By convention, they all have to wait for all the collaborative processes in their block to complete before they can complete themselves, *Therefore, structural concurrency is enforced. Please refer to the [Job] documentation for more details. * ### Android usage * * Android has first-party support for coroutine scope in all entities with the lifecycle. * See [the corresponding documentation](https://developer.android.com/topic/libraries/architecture/coroutines#lifecyclescope). * * ### Custom usage * * [CoroutineScope] should be implemented or declared as a property on entities with a well-defined lifecycle that are * responsible for launching children coroutines, for example: * * ``` * class MyUIClass { * val scope = MainScope() // the scope of MyUIClass * * fun destroy() { // destroys an instance of MyUIClass * scope.cancel() // cancels all coroutines launched in this scope * // ... do the rest of cleanup here ... * } * * /* * * Note: if this instance is destroyed or any of the launched coroutines * * in this method throws an exception, then all nested coroutines are cancelled. * */ * fun showSomeData() = scope.launch { // launched in the main thread * // ... here we can use suspending functions or coroutine builders with other dispatchers * draw(data) // draw in the main thread * } * } * ``` */ public interface CoroutineScope { /** * The context of this scope. * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope. * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages. * * By convention, should contain an instance of a [job][Job] to enforce structured concurrency. */ public val coroutineContext: CoroutineContext }
CoroutineContext
A collaboration context is a set of persistent user-defined objects that can be attached to a collaboration. It can include the object responsible for the thread policy of the collaboration, the log, the object about the security and transaction of the execution of the collaboration, the identification and name of the collaboration, and so on. The following is a simple cognitive model of synergy and its context. Think of a coroutine as a lightweight thread. In this case, the coroutine context is like a set of thread local variables. The difference is that thread local variables are variable, while the context of the collaboration process is immutable, but this is not a serious limitation for the collaboration process, because they are so lightweight that it is easy to open a new collaboration process when the context needs to be changed.
The standard library does not contain any concrete implementation of the context, but there are interfaces and abstract classes to define all these aspects in the library in a composable way, so aspects from different libraries can coexist peacefully in the same context.
Conceptually, a coroutine context is a set of index elements, each of which has a unique key. It is a mixture of set and map. Its elements have keys like those in map, but its keys are directly associated with elements, more like set. The Job mentioned above inherits the Element. In addition, there are ContinuationInterceptor, CoroutineExceptionHandler, ThreadContextElement, CoroutineName and so on
/** * Persistent context for the coroutine. It is an indexed set of [Element] instances. * An indexed set is a mix between a set and a map. * Every element in this set has a unique [Key]. *Persistence context of the coroutine. It is the index set of an instance of [Element]. *An index set is a mixture of sets and mappings. *Each element in this collection has a unique [Key]. */ @SinceKotlin("1.3") public interface CoroutineContext { /** * Returns the element with the given [key] from this context or `null`. */ public operator fun <E : Element> get(key: Key<E>): E? /** * Accumulates entries of this context starting with [initial] value and applying [operation] * from left to right to current accumulator value and each element of this context. */ public fun <R> fold(initial: R, operation: (R, Element) -> R): R /** * Returns a context containing elements from this context and elements from other [context]. * The elements from this context with the same key as in the other one are dropped. */ public operator fun plus(context: CoroutineContext): CoroutineContext = if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation context.fold(this) { acc, element -> val removed = acc.minusKey(element.key) if (removed === EmptyCoroutineContext) element else { // make sure interceptor is always last in the context (and thus is fast to get when present) val interceptor = removed[ContinuationInterceptor] if (interceptor == null) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } } /** * Returns a context containing elements from this context, but without an element with * the specified [key]. */ public fun minusKey(key: Key<*>): CoroutineContext /** * Key for the elements of [CoroutineContext]. [E] is a type of element with this key. */ public interface Key<E : Element> /** * An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself. */ public interface Element : CoroutineContext { /** * A key of this coroutine context element. */ public val key: Key<*> public override operator fun <E : Element> get(key: Key<E>): E? = @Suppress("UNCHECKED_CAST") if (this.key == key) this as E else null public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = operation(initial, this) public override fun minusKey(key: Key<*>): CoroutineContext = if (this.key == key) EmptyCoroutineContext else this } }
AbstractCoroutine.start
This is an entry method that is started by a collaborative process (here refers to an AbstractCoroutine). You can take a look at the runBlocking process first
/** * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion. * This function should not be used from a coroutine. It is designed to bridge regular blocking code * to libraries that are written in suspending style, to be used in `main` functions and in tests. * * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations * in this blocked thread until the completion of this coroutine. *Run a new coroutine and * * block * * the current thread_ interruptibly_ Until it's done. *This function should not be used in a coroutine. It is designed to bridge conventional blocking code *To the library written in suspended mode, which is used in 'main' function and test. * *The default [CoroutineDispatcher] of this constructor is the internal implementation of the event loop that handles continuation *In the blocked thread until the coroutine is completed. * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. * * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, * then this invocation uses the outer event loop. * * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and * this `runBlocking` invocation throws [InterruptedException]. * * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available * for a newly created coroutine. * * @param context the context of the coroutine. The default value is an event loop on the current thread. * @param block the coroutine code. */ @Throws(InterruptedException::class) public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } val currentThread = Thread.currentThread() val contextInterceptor = context[ContinuationInterceptor] val eventLoop: EventLoop? val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified eventLoop = ThreadLocalEventLoop.eventLoop newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { // See if context's interceptor is an event loop that we shall use (to support TestContext) // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } ?: ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) } //Create BlockingCoroutine //private class BlockingCoroutine<T>( //parentContext: CoroutineContext, //private val blockedThread: Thread, //private val eventLoop: EventLoop? //) : AbstractCoroutine<T>(parentContext, true) val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop) //Start collaboration coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() }
In addition, launch and async are both additional methods of CoroutineScope, which cannot be separated from CoroutineScope. Finally, abstractcoroutine will be called Start method.
AbstractCoroutine. There are two start methods, one with receiver and one without receiver
/** * Starts this coroutine with the given code [block] and [start] strategy. * This function shall be invoked at most once on this coroutine. * * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it * during construction. Second, it starts the coroutine based on [start] parameter: * * * [DEFAULT] uses [startCoroutineCancellable]. * * [ATOMIC] uses [startCoroutine]. * * [UNDISPATCHED] uses [startCoroutineUndispatched]. * * [LAZY] does nothing. */ public fun start(start: CoroutineStart, block: suspend () -> T) { initParentJob() start(block, this) } /** * Starts this coroutine with the given code [block] and [start] strategy. * This function shall be invoked at most once on this coroutine. * * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it * during construction. Second, it starts the coroutine based on [start] parameter: * * * [DEFAULT] uses [startCoroutineCancellable]. * * [ATOMIC] uses [startCoroutine]. * * [UNDISPATCHED] uses [startCoroutineUndispatched]. * * [LAZY] does nothing. */ public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) }
The difference between with and without receiver is the difference in writing
coroutine.start(CoroutineStart.DEFAULT){ println(Thread.currentThread().name) //The following is not possible // launch{ // // } } coroutine.start(CoroutineStart.DEFAULT,coroutine,{ println(this) //But the following is OK launch { } })
Generally, kotlin's built-in builder uses the receiver method, so you can easily launch a subprocess in a launch
Then start will finally come to this method
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit
Or version with receiver
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(receiver, completion) ATOMIC -> block.startCoroutine(receiver, completion) UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) LAZY -> Unit // will start lazily }
For the time being, just look at the DEFAULT
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ) = runSafely(completion) { createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation) }
It can be seen that there are three steps. The first step is to wrap the completion (here should be AbstractCoroutine) into an entry Continuation. The second step is to try to intercept and the third step is to resume
Let's look at the first step
createCoroutineUnintercepted
/** * Creates unintercepted coroutine with receiver type [R] and result type [T]. * This function creates a new, fresh instance of suspendable computation every time it is invoked. * * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. * The [completion] continuation is invoked when coroutine completes with result or exception. * * This function returns unintercepted continuation. * Invocation of `resume(Unit)` starts coroutine immediately in the invoker's call stack without going through the * [ContinuationInterceptor] that might be present in the completion's [CoroutineContext]. * It is the invoker's responsibility to ensure that a proper invocation context is established. * Note that [completion] of this function may get invoked in an arbitrary context. * * [Continuation.intercepted] can be used to acquire the intercepted continuation. * Invocation of `resume(Unit)` on intercepted continuation guarantees that execution of * both the coroutine and [completion] happens in the invocation context established by * [ContinuationInterceptor]. * * Repeated invocation of any resume function on the resulting continuation corrupts the * state machine of the coroutine and may result in arbitrary behaviour or exception. */ @SinceKotlin("1.3") public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted( receiver: R, completion: Continuation<T> ): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(receiver, probeCompletion) else { createCoroutineFromSuspendFunction(probeCompletion) { (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it) } } }
If it is BaseContinuationImpl, call BaseContinuationImpl directly The Continuation generated by create() is a SuspendLambda, but according to decompilation and breakpoints, BaseContinuationImpl is actually a SuspendLambda, and it is the same class as the generated SuspendLambda. I don't know whether it is a special case or all
[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-2SXXGF4C-1619404344871)(assets/16190739710252.jpg)]
[the external chain picture transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-SHvy0t46-1619404344874)(assets/16190739922160.jpg)]
Or if it is a suspend function, call createcoroutinefrom suspend function
/** * This function is used when [createCoroutineUnintercepted] encounters suspending lambda that does not extend BaseContinuationImpl. * * It happens in two cases: * 1. Callable reference to suspending function, * 2. Suspending function reference implemented by Java code. * * We must wrap it into an instance that extends [BaseContinuationImpl], because that is an expectation of all coroutines machinery. * As an optimization we use lighter-weight [RestrictedContinuationImpl] base class (it has less fields) if the context is * [EmptyCoroutineContext], and a full-blown [ContinuationImpl] class otherwise. * * The instance of [BaseContinuationImpl] is passed to the [block] so that it can be passed to the corresponding invocation. */ @SinceKotlin("1.3") private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any? ): Continuation<Unit> { val context = completion.context // label == 0 when coroutine is not started yet (initially) or label == 1 when it was return if (context === EmptyCoroutineContext) object : RestrictedContinuationImpl(completion as Continuation<Any?>) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith block(this) // run the block, may return or suspend } 1 -> { label = 2 result.getOrThrow() // this is the result if the block had suspended } else -> error("This coroutine had already completed") } } else object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith block(this) // run the block, may return or suspend } 1 -> { label = 2 result.getOrThrow() // this is the result if the block had suspended } else -> error("This coroutine had already completed") } } }
You can see that another layer is wrapped outside the suspend function
intercepted
/** * Intercepts this continuation with [ContinuationInterceptor]. * * This function shall be used on the immediate result of [createCoroutineUnintercepted] or [suspendCoroutineUninterceptedOrReturn], * in which case it checks for [ContinuationInterceptor] in the continuation's [context][Continuation.context], * invokes [ContinuationInterceptor.interceptContinuation], caches and returns the result. * * If this function is invoked on other [Continuation] instances it returns `this` continuation unchanged. */ @SinceKotlin("1.3") public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it }
The final call is to get the interceptContinuation method of the interceptor of the context.
in addition
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor
CoroutineDispatcher class implements the ContinuationInterceptor interface, so we use dispatchers IO,Dispatchers.Main and so on are essentially an interceptor. Take another look at the interceptContinuation method of CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation)
Take a look at DispatchedContinuation, which inherits DispatchedTask, and DispatchedTask finally implements runnable interface and Continuation, so it will definitely resume with in the end. If the thread is to be cut, it has been cut during run.
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation { ... override fun resumeWith(result: Result<T>) { val context = continuation.context val state = result.toState() //dispatch required if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_ATOMIC //this is the runnable inherited from DispatchedTask dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_ATOMIC) { withCoroutineContext(this.context, countOrElement) { continuation.resumeWith(result) } } } } ... public final override fun run() { assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation<T> val continuation = delegate.continuation n //This Continuation was generated by createcoroutineinntercepted before val context = continuation.context val state = takeState() // NOTE: Must take state in any case, even if cancelled //delegate.countOrElement finds ThreadContextElement, updates thread status, and restores after running withCoroutineContext(context, delegate.countOrElement) { val exception = getExceptionalResult(state) /* * Check whether continuation was originally resumed with an exception. * If so, it dominates cancellation, otherwise the original exception * will be silently lost. */ val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null //The last big piece of judgment, and then called continuation according to various states resumeWith() if (job != null && !job.isActive) { val cause = job.getCancellationException() cancelCompletedResult(state, cause) continuation.resumeWithStackTrace(cause) } else { if (exception != null) { continuation.resumeWithException(exception) } else { continuation.resume(getSuccessfulResult(state)) } } } } catch (e: Throwable) { // This instead of runCatching to have nicer stacktrace and debug experience fatalException = e } finally { val result = runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } } /** * Executes a block using a given coroutine context. */ internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T { val oldValue = updateThreadContext(context, countOrElement) try { return block() } finally { restoreThreadContext(context, oldValue) } }
Then find a dispatcher and see what the dispatcher has done
val IO: CoroutineDispatcher = LimitingDispatcher( this, systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), "Dispatchers.IO", TASK_PROBABLY_BLOCKING ) private class LimitingDispatcher( //The actual distribution is handed over to the dispatcher, that is, the DefaultScheduler. At the same time, dispatchers Default can also be a DefaultScheduler (determined by a systemproperty) private val dispatcher: ExperimentalCoroutineDispatcher, private val parallelism: Int, private val name: String?, override val taskMode: Int ) : ExecutorCoroutineDispatcher(), TaskContext, Executor { ... override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false) private fun dispatch(block: Runnable, tailDispatch: Boolean) { //It's a constant cycle of judgment and distribution var taskToSchedule = block while (true) { // Commit in-flight tasks slot val inFlight = inFlightTasks.incrementAndGet() // Fast path, if parallelism limit is not reached, dispatch task and return if (inFlight <= parallelism) { //This method is implemented in the parent class ExecutorCoroutineDispatcher dispatchWithContext(taskToSchedule, this, tailDispatch) return } // Parallelism limit is reached, add task to the queue queue.add(taskToSchedule) /* * We're not actually scheduled anything, so rollback committed in-flight task slot: * If the amount of in-flight tasks is still above the limit, do nothing * If the amount of in-flight tasks is lesser than parallelism, then * it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue * to avoid starvation. * * Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: * * T1: submit task, start execution, R == 1 * T2: commit slot for next task, R == 2 * T1: finish T1, R == 1 * T2: submit next task to local queue, decrement R, R == 0 * Without retries, task from T2 will be stuck in the local queue */ if (inFlightTasks.decrementAndGet() >= parallelism) { return } taskToSchedule = queue.poll() ?: return } } override fun dispatchYield(context: CoroutineContext, block: Runnable) { dispatch(block, tailDispatch = true) } ... /** * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any. * * Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid * non-blocking continuations starvation. * E.g. for * ``` * foo() * blocking() * bar() * ``` * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task */ override fun afterTask() { var next = queue.poll() // If we have pending tasks in current blocking context, dispatch first if (next != null) { dispatcher.dispatchWithContext(next, this, true) return } inFlightTasks.decrementAndGet() /* * Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue. * Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: * T1: submit task, start execution, R == 1 * T2: commit slot for next task, R == 2 * T1: finish T1, poll queue (it's still empty), R == 2 * T2: submit next task to the local queue, decrement R, R == 1 * T1: decrement R, finish. R == 0 * * The task from T2 is stuck is the local queue */ next = queue.poll() ?: return dispatch(next, true) } } public open class ExperimentalCoroutineDispatcher( private val corePoolSize: Int, private val maxPoolSize: Int, private val idleWorkerKeepAliveNs: Long, private val schedulerName: String = "CoroutineScheduler" ) : ExecutorCoroutineDispatcher() { ... private var coroutineScheduler = createScheduler() internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { try { coroutineScheduler.dispatch(block, context, tailDispatch) } catch (e: RejectedExecutionException) { // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved // for testing purposes, so we don't have to worry about cancelling the affected Job here. // TaskContext shouldn't be lost here to properly invoke before/after task DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context)) } } //So every executor coroutine dispatcher will have a Scheduler private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) ... } internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { ... fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { trackTask() // this is needed for virtual time support val task = createTask(block, taskContext) // try to submit the task to the local queue and act depending on the result val currentWorker = currentWorker() val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker != null // Checking 'task' instead of 'notAdded' is completely okay if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return signalCpuWork() } else { // Increment blocking tasks anyway signalBlockingWork(skipUnpark = skipUnpark) } } ...
The Scheduler's workflow is roughly like this. First, the Scheduler has two queues, one is to handle the globalBlockingQueue that may be blocked, the other is to handle the non blocking globalCpuQueue, and then there is an internal class Worker that inherits Thread, which has a val localQueue: WorkQueue = WorkQueue(). Then judge whether you need to create a new Worker, and directly start() after creating a new Worker.
override fun run() = runWorker() private fun runWorker() { var rescanned = false while (!isTerminated && state != WorkerState.TERMINATED) { val task = findTask(mayHaveLocalTasks) // Task found. Execute and repeat if (task != null) { rescanned = false minDelayUntilStealableTaskNs = 0L executeTask(task) continue } else { mayHaveLocalTasks = false } /* * No tasks were found: * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline. * Then its deadline is stored in [minDelayUntilStealableTask] * * Then just park for that duration (ditto re-scanning). * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations, * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve * it with "spinning via scans" mechanism. * NB: this short potential parking does not interfere with `tryUnpark` */ if (minDelayUntilStealableTaskNs != 0L) { if (!rescanned) { rescanned = true } else { rescanned = false tryReleaseCpu(WorkerState.PARKING) interrupted() LockSupport.parkNanos(minDelayUntilStealableTaskNs) minDelayUntilStealableTaskNs = 0L } continue } /* * 2) Or no tasks available, time to park and, potentially, shut down the thread. * Add itself to the stack of parked workers, re-scans all the queues * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks. */ tryPark() } tryReleaseCpu(WorkerState.TERMINATED) } ... //It's about taking your own / global / stealing from other workers fun findTask(scanLocalQueue: Boolean): Task? { if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) // If we can't acquire a CPU permit -- attempt to find blocking task val task = if (scanLocalQueue) { localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() } else { globalBlockingQueue.removeFirstOrNull() } return task ?: trySteal(blockingOnly = true) } ...
Then runTask, and the final run() is the run() of DispatchedTask
fun runSafely(task: Task) { try { task.run() } catch (e: Throwable) { val thread = Thread.currentThread() thread.uncaughtExceptionHandler.uncaughtException(thread, e) } finally { unTrackTask() } }
Again, the place where CoroutineExceptionHandler takes effect is also in finally in the run of DispatchedTask, or abstractcoroutine Resumewith (result. Failure (exception)) until jobsupply Finalize finishing state() (mostly here)
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) { // Invoke an exception handler from the context if present try { context[CoroutineExceptionHandler]?.let { it.handleException(context, exception) return } } catch (t: Throwable) { handleCoroutineExceptionImpl(context, handlerException(exception, t)) return } // If a handler is not present in the context or an exception was thrown, fallback to the global handler handleCoroutineExceptionImpl(context, exception) }
// ------------ state update ------------ // Finalizes Finishing -> Completed (terminal state) transition. // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method. // Returns final state that was created and updated to private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? { /* * Note: proposed state can be Incomplete, e.g. * async { * something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood * } */ assert { this.state === state } // consistency check -- it cannot change assert { !state.isSealed } // consistency check -- cannot be sealed yet assert { state.isCompleting } // consistency check -- must be marked as completing val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause // Create the final exception and seal the state so that no more exceptions can be added var wasCancelling = false // KLUDGE: we cannot have contract for our own expect fun synchronized val finalException = synchronized(state) { wasCancelling = state.isCancelling val exceptions = state.sealLocked(proposedException) //Get cause val finalCause = getFinalRootCause(state, exceptions) if (finalCause != null) addSuppressedExceptions(finalCause, exceptions) finalCause } // Create the final state object val finalState = when { // was not cancelled (no exception) -> use proposed update value finalException == null -> proposedUpdate // small optimization when we can used proposeUpdate object as is on cancellation finalException === proposedException -> proposedUpdate // cancelled job final state else -> CompletedExceptionally(finalException) } // Now handle the final exception if (finalException != null) { //Do you want to cancel parent or handleJobException? Have you handled it val handled = cancelParent(finalException) || handleJobException(finalException) if (handled) (finalState as CompletedExceptionally).makeHandled() } // Process state updates for the final state before the state of the Job is actually set to the final state // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet. // if (!wasCancelling) onCancelling(finalException) onCompletionInternal(finalState) // Then CAS to completed state -> it must succeed val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete()) assert { casSuccess } // And process all post-completion actions completeStateFinalization(state, finalState) return finalState } private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? { // A case of no exceptions if (exceptions.isEmpty()) { // materialize cancellation exception if it was not materialized yet if (state.isCancelling) return defaultCancellationException() return null } /* * 1) If we have non-CE, use it as root cause * 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE * * It is a CE, so it's not reported by children * * The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace * * The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace * 3) Just return the very first CE */ val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException } if (firstNonCancellation != null) return firstNonCancellation val first = exceptions[0] if (first is TimeoutCancellationException) { val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException } if (detailedTimeoutException != null) return detailedTimeoutException } return first } private fun cancelParent(cause: Throwable): Boolean { // Is scoped coroutine -- don't propagate, will be rethrown if (isScopedCoroutine) return true /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it. * This allow parent to cancel its children (normally) without being cancelled itself, unless * child crashes and produce some other exception during its completion. */ val isCancellation = cause is CancellationException val parent = parentHandle // No parent -- ignore CE, report other exceptions. if (parent === null || parent === NonDisposableHandle) { return isCancellation } // Notify parent but don't forget to check cancellation return parent.childCancelled(cause) || isCancellation } //Childcanceled of JobSupport. Non CancellationException will cancel itself public open fun childCancelled(cause: Throwable): Boolean { if (cause is CancellationException) return true return cancelImpl(cause) && handlesException } //StandaloneCoroutine generated by launch. See handleJobException private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit>(parentContext, active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true } } @InternalCoroutinesApi public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) { // Invoke an exception handler from the context if present try { context[CoroutineExceptionHandler]?.let { it.handleException(context, exception) return } } catch (t: Throwable) { handleCoroutineExceptionImpl(context, handlerException(exception, t)) return } // If a handler is not present in the context or an exception was thrown, fallback to the global handler handleCoroutineExceptionImpl(context, exception) }
Then how does yield work
//First, we arrive at the Dispatcher's yield (so the yield is useless in the process without Dispatcher) override fun dispatchYield(context: CoroutineContext, block: Runnable) { dispatch(block, tailDispatch = true) } //Then it's still dispatcher Dispatchwithcontext (tasktoschedule, this, tailDispatch) is only true //Then I went to the Worker /** * Returns `null` if task was successfully added or an instance of the * task that was not added or replaced (thus should be added to global queue). */ private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? { if (this == null) return task /* * This worker could have been already terminated from this thread by close/shutdown and it should not * accept any more tasks into its local queue. */ if (state === WorkerState.TERMINATED) return task // Do not add CPU tasks in local queue if we are not able to execute it if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) { return task } mayHaveLocalTasks = true return localQueue.add(task, fair = tailDispatch) } //Finally, I went to the WorkQueue /** * Invariant: Called only by the owner of the queue, returns * `null` if task was added, task that wasn't added otherwise. */ private fun addLast(task: Task): Task? { if (task.isBlocking) blockingTasksInBuffer.incrementAndGet() if (bufferSize == BUFFER_CAPACITY - 1) return task val nextIndex = producerIndex.value and MASK /* * If current element is not null then we're racing with a really slow consumer that committed the consumer index, * but hasn't yet nulled out the slot, effectively preventing us from using it. * Such situations are very rare in practise (although possible) and we decided to give up a progress guarantee * to have a stronger invariant "add to queue with bufferSize == 0 is always successful". * This algorithm can still be wait-free for add, but if and only if tasks are not reusable, otherwise * nulling out the buffer wouldn't be possible. */ while (buffer[nextIndex] != null) { //I finally got to the place of yield, but I didn't understand the logic Thread.yield() } buffer.lazySet(nextIndex, task) producerIndex.incrementAndGet() return null }
resumeCancellableWith
/** * It is not inline to save bytecode (it is pretty big and used in many places) * and we leave it public so that its name is not mangled in use stack traces if it shows there. * It may appear in stack traces when coroutines are started/resumed with unconfined dispatcher. * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi public fun <T> Continuation<T>.resumeCancellableWith( result: Result<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ): Unit = when (this) { //DispatchedContinuation calls its resumeCancellableWith instead of resumeWith. Of course, the result is the same. Dispatcher will be called dispatch is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) //If you don't need to dispatch, just resume and you're done else -> resumeWith(result) }
delay
/ * * Delay the process for a period of time without blocking the thread, and recover it after a specified time. * *The pause function can be cancelled. *Current process if[Job]Is cancelled or completed, and the pending function is waiting for this function *Immediate recovery and[CancellationException]. *have**Immediate cancellation of guarantee**. When this function is cancelled, the job is cancelled *Pause, it will not resume successfully. See[suspendCancellableCoroutine]Documents understand low-level details. * *If you want to delay forever(Until cancelled),Consider using[awaitcancel]Replace. * *Note that the delay can be[select]Used in call[onTimeout][SelectBuilder. onTimeout)Terms. * *Realize attention:How to accurately track time is a key issue in context[CoroutineDispatcher]Implementation details of. * @param timillis Time in milliseconds. * / public suspend fun delay(timeMillis: Long) { if (timeMillis <= 0) return // don't delay return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> -> // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule. if (timeMillis < Long.MAX_VALUE) { cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) } } } public fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) public suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE) /* * For non-atomic cancellation we setup parent-child relationship immediately * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but * properly supports cancellation. */ cancellable.initCancellability() block(cancellable) cancellable.getResult() } / * * *stay suspend Function to get the current continuation example *The result of a pending or immediate execution of the collaboration. * *If[block]Return special[COROUTINE_SUSPENDED]Value, which means that the execution of the suspended function has been suspended and will be suspended No results are returned immediately. In this case, the [renewal] provided to [block] shall be *Recover by calling[continue. At some point in When the results are available, continue the calculation. * *Otherwise,[block]Must have a return value that can be assigned to[T]And represents the result of this pending function. *Indicates that the execution is not suspended, and the [continuation] provided to the [block] should not be called. *because[block]The result type of is declared as' Any?',The correct type check cannot be performed, Its correct return type depends on the author of the suspended function. * *[continue]Call. Recover the coroutine directly in the caller's thread without passing the * [ContinuationInterceptor]May appear at the beginning of the collaborative process[CoroutineContext]Yes. *It is the caller's responsibility to ensure that the correct invocation context is established. * [continue .intercept]It can be used to obtain the continuation of the intercepted. * *Note that calling is not recommended either [Continuation.]resume]and[Continuation. resumeWithException)Synchronization function *In the same stack frame, the hanging function runs. use[suspendCoroutine]As a safer way to get current *Continuation instance. * / //This is very important. It is used to suspend the current coroutine and wait for it to return. You can directly return a value or return a COROUTINE_SUSPENDED indicates that it has been suspended, and it will be used for continuation after suspension Resume returns the result public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic") }
withContext
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> // compute new context val oldContext = uCont.context val newContext = oldContext + context // always check for cancellation of new context newContext.checkCompletion() // FAST PATH #1 -- new context is the same as the old one if (newContext === oldContext) { val coroutine = ScopeCoroutine(newContext, uCont) return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { val coroutine = UndispatchedCoroutine(newContext, uCont) // There are changes in the context, so this thread needs to be updated withCoroutineContext(newContext, null) { return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } } // SLOW PATH -- use new dispatcher val coroutine = DispatchedCoroutine(newContext, uCont) coroutine.initParentJob() block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() } }
withContext is actually very simple. Hang up the current collaboration process and see if it is the same interceptor, and then cut it to execute
The key is
private class DispatchedCoroutine<in T>( context: CoroutineContext, uCont: Continuation<T> ) : ScopeCoroutine<T>(context, uCont) { ... override fun afterResume(state: Any?) { if (tryResume()) return // completed before getResult invocation -- bail out // Resume in a cancellable way because we have to switch back to the original dispatcher //There will be a problem here. If the original collaboration does not have an interceptor/dispatcher, in short, the Continuation returned by the intercepted () method is not a DispatchedContinuation, then the original thread will not be switched back, and ucont will continue to be executed in the current thread intercepted(). resumeCancellableWith(recoverResult(state, uCont)) } ...