Building wheels from scratch: gather of Python 3 asyncio

preface

The book continues from the above: This paper builds the third wheel, which is also a very common function gather in the asyncio package

1, Knowledge preparation

● compared with the first two functions, gather is used more frequently because it supports the "simultaneous" execution of multiple collaborative tasks
● understanding__ await__ __iter__ Use of
● understand the keyword async/await. async/await is the syntax after 3.5, which is similar to yield/yield from
● today's article is a little long. Please read it patiently


2, Environmental preparation

assembly edition
python 3.7.7

3, Implementation of run

Let's take a look at the official usage of gather:

|># more main.py
import asyncio

async def hello():
    print('enter hello ...')
    return 'return hello ...'

async def world():
    print('enter world ...')
    return 'return world ...'

async def helloworld():
    print('enter helloworld')
    ret = await asyncio.gather(hello(), world())
    print('exit helloworld')
    return ret

if __name__ == "__main__":
    ret = asyncio.run(helloworld())
    print(ret)
    
|># python3 main.py
enter helloworld
enter hello ...
enter world ...
exit helloworld
['return hello ...', 'return world ...']

Let's look at how the following wheels are used:

▶ more main.py
import wilsonasyncio

async def hello():
    print('enter hello ...')
    return 'return hello ...'

async def world():
    print('enter world ...')
    return 'return world ...'

async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret


if __name__ == "__main__":
    ret = wilsonasyncio.run(helloworld())
    print(ret)

    
▶ python3 main.py
enter helloworld
enter hello ...
enter world ...
exit helloworld
['return hello ...', 'return world ...']

The self-made wheel also works well. Let's look at the code of the wheel

4, Code parsing

Wheel code

1) Code composition

|># tree
.
├── eventloops.py 
├── futures.py
├── main.py
├── tasks.py
├── wilsonasyncio.py
file effect
eventloops.py event loop
futures.py futures object
tasks.py tasks object
wilsonasyncio.py Callable method collection
main.py entrance

2) Code overview:

eventloops.py

Class / function method object effect describe
Eventloop Event loop, a thread can only run one
__init__ Initialize two important objects self_ Ready and self_ stopping
self._ready It is very important that all pending tasks are taken from this queue
self._stopping Flag of event cycle completion
call_soon Calling this method immediately adds the task to the pending queue
run_once Run by_ Call forever, from self_ Take out the task execution from the ready queue
run_forever Dead cycle, if self_ Stopping exits the loop
run_until_complete Very important functions, starting point and ending point of the task (described in detail later)
create_task Encapsulate the passed in function as a task object, which will__ Step add to__ ready queue
Handle All tasks will be encapsulated into Handle objects before entering the to be executed queue (Eventloop.call_soon)
__init__ Initialize two important objects self_ Callback and self_ args
self._callback Body of function to be executed
self._args Function parameters to be executed
_run Function to be executed
get_event_loop Gets the event loop for the current thread
_complete_eventloop Loop events_ The stopping flag is set to True
run Entry function
gather An entry function that can perform multiple tasks at the same time newly added
_GatheringFuture Each task is listed and encapsulated into a new class newly added

tasks.py

Class / function method object effect describe
Task It is inherited from Future and is mainly used for the entire collaborative process operation cycle
__init__ Initialize the object self_ Coro, and call_soon will be self__ Step join self_ Ready queue
self._coro User defined function body
__step The core functions of the Task class
__wakeup Wake up task newly added
ensure_future If the object is a Future object, return, otherwise create will be called_ Task returns and is added to the_ ready queue

futures.py

Class / function method object effect describe
Future It is mainly responsible for interacting with user functions
__init__ Initialize two important objects self_ Loop and self_ callbacks
self._loop event loop
self._callbacks The callback queue and the task temporary queue will enter when the time is ripe (the status is not PENDING)_ ready queue
add_done_callback Add task callback function, status_ PENDING, the tiger enters_ callbacks queue, otherwise enter_ ready queue
set_result Obtain task execution results and store them in_ result to set the status_ FINISH, call__ schedule_callbacks
__schedule_callbacks Put callback function into_ ready, waiting for execution
result Get return value
__await__ Use await to enter this method newly added
__iter__ Use yield from to enter this method newly added

3) Execution process

3.1) entry function

main.py

    
if __name__ == "__main__":
    ret = wilsonasyncio.run(helloworld())
    print(ret)
  • ret = wilsonasyncio.run(helloworld()) uses run, and the parameter is the user function helloworld(). Enter run, and the process of run can refer to the previous section
  • run --> run_until_complete

3.2) event cycle start, the same as before

3.3) first cycle run_ forever --> run_ once

  • Will_ The contents of the ready queue (i.e. task. _step) are taken out for execution. The coro here is helloworld()
    def __step(self, exc=None):
        coro = self._coro
        try:
            if exc is None:
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            super().set_result(exc.value)
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking:
                result._asyncio_future_blocking = False
                result.add_done_callback(self.__wakeup, result)
        finally:
            self = None
  • __ step has changed compared with the previous code
  • result = coro.send(None), enter the user-defined function
async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret
  • ret = await wilsonasyncio.gather(hello(), world()), there's nothing to say here. Enter the gather function
def gather(*coros_or_futures, loop=None):
    loop = get_event_loop()

    def _done_callback(fut):
        nonlocal nfinished
        nfinished += 1

        if nfinished == nfuts:
            results = []
            for fut in children:
                res = fut.result()
                results.append(res)

            outer.set_result(results)

    children = []
    nfuts = 0
    nfinished = 0

    for arg in coros_or_futures:
        fut = tasks.ensure_future(arg, loop=loop)
        nfuts += 1
        fut.add_done_callback(_done_callback)

        children.append(fut)

    outer = _GatheringFuture(children, loop=loop)
    return outer
  • loop = get_event_loop() get event loop
  • def _ done_ The callback (FUT) function is a callback function. The details will be analyzed later. Now you only need to know that the callback will be performed after the task (hello() and world()) is executed
  • for arg in coros_ or_ The futures for loop ensures that each task is a Future object and that add_done_callback sets the callback function to_ done_callback, and adding them to_ The ready queue waits for the next round robin scheduling
  • Three important variables:
         children store each asynchronous task. In this case, hello() and world()
         nfuts storage is the number of asynchronous tasks. In this example, it is 2
         nfinished stores the number of asynchronous tasks completed. It is 0 at present and 2 when completed
  • Go on, here we are_ GatheringFuture, look at the source code:
class _GatheringFuture(Future):

    def __init__(self, children, *, loop=None):
        super().__init__(loop=loop)
        self._children = children
  • _ The main function of GatheringFuture is to put multiple asynchronous tasks into self_ Children, then use_ GatheringFuture is managed by this object. Note that this object inherits Future
  • So far, gather completes initialization and returns outer, which is actually_ GatheringFuture
  • To sum up, gather initializes three important variables, which are used to store the state later; Add a callback function to each asynchronous task; Merge multiple different step subtasks and use a Future object to manage them

3.3.1) gather and return to helloworld()

async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret
  • ret = await wilsonasyncio. Gather (hello(), world()) Returns_ GatheringFuture, and then use await to enter future__ await__
    def __await__(self):
        if self._state == _PENDING:
            self._asyncio_future_blocking = True
            yield self
        return self.result()
  • Because_ The status of GatheringFuture is_ PENDING, so enter if and encounter yield self, which will be self, that is_ GatheringFuture returns (note the usage of yield and the function of process control here)
  • Where did yield go? send goes back wherever he goes, so he goes back to task__ Go inside the step function
    def __step(self, exc=None):
        coro = self._coro
        try:
            if exc is None:
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            super().set_result(exc.value)
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking:
                result._asyncio_future_blocking = False
                result.add_done_callback(self.__wakeup, result)
        finally:
            self = None
  • Here is the first core point of this function. The process control / jump needs to be very clear. If you don't know, please read the article about yield/yield from in detail
  • Go on. Since the user function helloworld() does not end, it will not throw exceptions, so it comes to the else branch
  • blocking = getattr(result, '_asyncio_future_blocking', None) there is an important state here, that is_ asyncio_ future_ Blocking, only call__ await__, This parameter is only available. The default value is true. The main function of this parameter is: an asynchronous function. If multiple sub asynchronous functions are called, it proves that the asynchronous function has not ended (explained in detail later), and a "wake-up" callback needs to be added
  • result._asyncio_future_blocking = False sets the parameter to False and adds self__ Wakeup callback waiting for wakeup
  • __ step function complete

It needs to be explained in detail here_ asyncio_ future_ The role of blocking

  • If await occurs in an asynchronous function and calls other asynchronous functions, it will go to future__ await__ Will_ asyncio_ future_ Setting blocking to true
async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret
    
class Future:
    def __await__(self):
        if self._state == _PENDING:
            self._asyncio_future_blocking = True
            yield self
        return self.result()
  • After doing so, in task__ In step, the callback function of the task will be set to__ wakeup
  • Why__ wakeup, because HelloWorld () is not completed, it needs to be executed again__ wakeup to wake up helloworld()

It is revealed here that in the Eventloop, as long as await is used to call other asynchronous tasks, the parent task will be suspended and the child task will be executed instead. After the child task is completed, it will return to the parent task to continue execution

Take a drink and have a rest. It's more complicated...

3.4) second cycle run_ forever --> run_ once

eventloops.py

    def run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            handle = self._ready.popleft()
            handle._run()
  • Take the data from the queue, and then_ The ready queue has two tasks, hello() world(), which are added during the for loop of gather
async def hello():
    print('enter hello ...')
    return 'return hello ...'

async def world():
    print('enter world ...')
    return 'return world ...'
  • Since hello() world() does not have await to call other asynchronous tasks, their execution is relatively simple, and they are executed once task__ The step is over and reaches set_ At result()
  • set_result() puts the callback function into the_ ready queue, waiting for the next loop execution

3.5) third cycle run_ forever --> run_ once

  • Let's look at the callback function
    def _done_callback(fut):
        nonlocal nfinished
        nfinished += 1

        if nfinished == nfuts:
            results = []
            for fut in children:
                res = fut.result()
                results.append(res)

            outer.set_result(results)
  • Yes, this is the second core point of this article. Let's analyze it carefully
  • The main logic of this code is that the callback function of the parent task will be started only after all the subtasks are executed. In this article, the parent task will be started only after hello() world() is executed if nfinished == nfuts:_ GatheringFuture callback outer set_ result(results)
  • results. Append (result) takes the results of the subtask and puts them into the results of the parent task
  • The execution of the subtask is completed. Finally, it's time to wake up the parent task__ wakeup
    def __wakeup(self, future):
        try:
            future.result()
        except Exception as exc:
            raise exc
        else:
            self.__step()
        self = None

3.6) fourth cycle run_ forever --> run_ once

  • future.result() from_ GatheringFuture takes out the results and enters task__ step
    def __step(self, exc=None):
        coro = self._coro
        try:
            if exc is None:
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            super().set_result(exc.value)
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking:
                result._asyncio_future_blocking = False
                result.add_done_callback(self.__wakeup, result)
        finally:
            self = None
  • result = coro.send(None) is actually helloworld() -- > send will jump back to the original yield, that is, future__ await__
    def __await__(self):
        if self._state == _PENDING:
            self._asyncio_future_blocking = True
            yield self
        return self.result()
  • return self.result() finally returns to the helloworld() function
async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret

  • helloworld finally finished executing and returned ret

3.7) fifth cycle run_ forever --> run_ once

  • End of cycle
  • Back to run

3.8) return to the main function to obtain the return value

if __name__ == "__main__":
    ret = wilsonasyncio.run(helloworld())
    print(ret)

3.9) implementation results

▶ python3 main.py
enter helloworld
enter hello ...
enter world ...
exit helloworld
['return hello ...', 'return world ...']

5, Process summary

6, Summary

● it's finally over. This is a very long section, but I still don't mention many details. If you have any questions, please leave a message in time
● _ GatheringFuture is a very important object. It not only tracks the execution state of hello() world(), wakes up helloworld(), but also passes the return value to helloworld
● await async yield requires special attention to process control
● the code in this article is tailored by referring to the source code of asyncio in python 3.7.7
● code in this document: code


So far, this article ends
I have little talent and learning. Some people spill soup and leak water. Please don't hesitate to give me advice
For more articles, please follow me: wilson.chai

Keywords: Python asyncio

Added by fotakis on Tue, 18 Jan 2022 20:29:15 +0200