preface
The book continues from the above: This paper builds the third wheel, which is also a very common function gather in the asyncio package
1, Knowledge preparation
● compared with the first two functions, gather is used more frequently because it supports the "simultaneous" execution of multiple collaborative tasks
● understanding__ await__ __iter__ Use of
● understand the keyword async/await. async/await is the syntax after 3.5, which is similar to yield/yield from
● today's article is a little long. Please read it patiently
2, Environmental preparation
assembly | edition |
---|---|
python | 3.7.7 |
3, Implementation of run
Let's take a look at the official usage of gather:
|># more main.py import asyncio async def hello(): print('enter hello ...') return 'return hello ...' async def world(): print('enter world ...') return 'return world ...' async def helloworld(): print('enter helloworld') ret = await asyncio.gather(hello(), world()) print('exit helloworld') return ret if __name__ == "__main__": ret = asyncio.run(helloworld()) print(ret) |># python3 main.py enter helloworld enter hello ... enter world ... exit helloworld ['return hello ...', 'return world ...']
Let's look at how the following wheels are used:
▶ more main.py import wilsonasyncio async def hello(): print('enter hello ...') return 'return hello ...' async def world(): print('enter world ...') return 'return world ...' async def helloworld(): print('enter helloworld') ret = await wilsonasyncio.gather(hello(), world()) print('exit helloworld') return ret if __name__ == "__main__": ret = wilsonasyncio.run(helloworld()) print(ret) ▶ python3 main.py enter helloworld enter hello ... enter world ... exit helloworld ['return hello ...', 'return world ...']
The self-made wheel also works well. Let's look at the code of the wheel
4, Code parsing
1) Code composition
|># tree . ├── eventloops.py ├── futures.py ├── main.py ├── tasks.py ├── wilsonasyncio.py
file | effect |
---|---|
eventloops.py | event loop |
futures.py | futures object |
tasks.py | tasks object |
wilsonasyncio.py | Callable method collection |
main.py | entrance |
2) Code overview:
eventloops.py
Class / function | method | object | effect | describe |
---|---|---|---|---|
Eventloop | Event loop, a thread can only run one | |||
__init__ | Initialize two important objects self_ Ready and self_ stopping | |||
self._ready | It is very important that all pending tasks are taken from this queue | |||
self._stopping | Flag of event cycle completion | |||
call_soon | Calling this method immediately adds the task to the pending queue | |||
run_once | Run by_ Call forever, from self_ Take out the task execution from the ready queue | |||
run_forever | Dead cycle, if self_ Stopping exits the loop | |||
run_until_complete | Very important functions, starting point and ending point of the task (described in detail later) | |||
create_task | Encapsulate the passed in function as a task object, which will__ Step add to__ ready queue | |||
Handle | All tasks will be encapsulated into Handle objects before entering the to be executed queue (Eventloop.call_soon) | |||
__init__ | Initialize two important objects self_ Callback and self_ args | |||
self._callback | Body of function to be executed | |||
self._args | Function parameters to be executed | |||
_run | Function to be executed | |||
get_event_loop | Gets the event loop for the current thread | |||
_complete_eventloop | Loop events_ The stopping flag is set to True | |||
run | Entry function | |||
gather | An entry function that can perform multiple tasks at the same time | newly added | ||
_GatheringFuture | Each task is listed and encapsulated into a new class | newly added |
tasks.py
Class / function | method | object | effect | describe |
---|---|---|---|---|
Task | It is inherited from Future and is mainly used for the entire collaborative process operation cycle | |||
__init__ | Initialize the object self_ Coro, and call_soon will be self__ Step join self_ Ready queue | |||
self._coro | User defined function body | |||
__step | The core functions of the Task class | |||
__wakeup | Wake up task | newly added | ||
ensure_future | If the object is a Future object, return, otherwise create will be called_ Task returns and is added to the_ ready queue |
futures.py
Class / function | method | object | effect | describe |
---|---|---|---|---|
Future | It is mainly responsible for interacting with user functions | |||
__init__ | Initialize two important objects self_ Loop and self_ callbacks | |||
self._loop | event loop | |||
self._callbacks | The callback queue and the task temporary queue will enter when the time is ripe (the status is not PENDING)_ ready queue | |||
add_done_callback | Add task callback function, status_ PENDING, the tiger enters_ callbacks queue, otherwise enter_ ready queue | |||
set_result | Obtain task execution results and store them in_ result to set the status_ FINISH, call__ schedule_callbacks | |||
__schedule_callbacks | Put callback function into_ ready, waiting for execution | |||
result | Get return value | |||
__await__ | Use await to enter this method | newly added | ||
__iter__ | Use yield from to enter this method | newly added |
3) Execution process
3.1) entry function
main.py
if __name__ == "__main__": ret = wilsonasyncio.run(helloworld()) print(ret)
- ret = wilsonasyncio.run(helloworld()) uses run, and the parameter is the user function helloworld(). Enter run, and the process of run can refer to the previous section
- run --> run_until_complete
3.2) event cycle start, the same as before
3.3) first cycle run_ forever --> run_ once
- Will_ The contents of the ready queue (i.e. task. _step) are taken out for execution. The coro here is helloworld()
def __step(self, exc=None): coro = self._coro try: if exc is None: result = coro.send(None) else: result = coro.throw(exc) except StopIteration as exc: super().set_result(exc.value) else: blocking = getattr(result, '_asyncio_future_blocking', None) if blocking: result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, result) finally: self = None
- __ step has changed compared with the previous code
- result = coro.send(None), enter the user-defined function
async def helloworld(): print('enter helloworld') ret = await wilsonasyncio.gather(hello(), world()) print('exit helloworld') return ret
- ret = await wilsonasyncio.gather(hello(), world()), there's nothing to say here. Enter the gather function
def gather(*coros_or_futures, loop=None): loop = get_event_loop() def _done_callback(fut): nonlocal nfinished nfinished += 1 if nfinished == nfuts: results = [] for fut in children: res = fut.result() results.append(res) outer.set_result(results) children = [] nfuts = 0 nfinished = 0 for arg in coros_or_futures: fut = tasks.ensure_future(arg, loop=loop) nfuts += 1 fut.add_done_callback(_done_callback) children.append(fut) outer = _GatheringFuture(children, loop=loop) return outer
- loop = get_event_loop() get event loop
- def _ done_ The callback (FUT) function is a callback function. The details will be analyzed later. Now you only need to know that the callback will be performed after the task (hello() and world()) is executed
- for arg in coros_ or_ The futures for loop ensures that each task is a Future object and that add_done_callback sets the callback function to_ done_callback, and adding them to_ The ready queue waits for the next round robin scheduling
- Three important variables:
children store each asynchronous task. In this case, hello() and world()
nfuts storage is the number of asynchronous tasks. In this example, it is 2
nfinished stores the number of asynchronous tasks completed. It is 0 at present and 2 when completed - Go on, here we are_ GatheringFuture, look at the source code:
class _GatheringFuture(Future): def __init__(self, children, *, loop=None): super().__init__(loop=loop) self._children = children
- _ The main function of GatheringFuture is to put multiple asynchronous tasks into self_ Children, then use_ GatheringFuture is managed by this object. Note that this object inherits Future
- So far, gather completes initialization and returns outer, which is actually_ GatheringFuture
- To sum up, gather initializes three important variables, which are used to store the state later; Add a callback function to each asynchronous task; Merge multiple different step subtasks and use a Future object to manage them
3.3.1) gather and return to helloworld()
async def helloworld(): print('enter helloworld') ret = await wilsonasyncio.gather(hello(), world()) print('exit helloworld') return ret
- ret = await wilsonasyncio. Gather (hello(), world()) Returns_ GatheringFuture, and then use await to enter future__ await__
def __await__(self): if self._state == _PENDING: self._asyncio_future_blocking = True yield self return self.result()
- Because_ The status of GatheringFuture is_ PENDING, so enter if and encounter yield self, which will be self, that is_ GatheringFuture returns (note the usage of yield and the function of process control here)
- Where did yield go? send goes back wherever he goes, so he goes back to task__ Go inside the step function
def __step(self, exc=None): coro = self._coro try: if exc is None: result = coro.send(None) else: result = coro.throw(exc) except StopIteration as exc: super().set_result(exc.value) else: blocking = getattr(result, '_asyncio_future_blocking', None) if blocking: result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, result) finally: self = None
- Here is the first core point of this function. The process control / jump needs to be very clear. If you don't know, please read the article about yield/yield from in detail
- Go on. Since the user function helloworld() does not end, it will not throw exceptions, so it comes to the else branch
- blocking = getattr(result, '_asyncio_future_blocking', None) there is an important state here, that is_ asyncio_ future_ Blocking, only call__ await__, This parameter is only available. The default value is true. The main function of this parameter is: an asynchronous function. If multiple sub asynchronous functions are called, it proves that the asynchronous function has not ended (explained in detail later), and a "wake-up" callback needs to be added
- result._asyncio_future_blocking = False sets the parameter to False and adds self__ Wakeup callback waiting for wakeup
- __ step function complete
It needs to be explained in detail here_ asyncio_ future_ The role of blocking
- If await occurs in an asynchronous function and calls other asynchronous functions, it will go to future__ await__ Will_ asyncio_ future_ Setting blocking to true
async def helloworld(): print('enter helloworld') ret = await wilsonasyncio.gather(hello(), world()) print('exit helloworld') return ret class Future: def __await__(self): if self._state == _PENDING: self._asyncio_future_blocking = True yield self return self.result()
- After doing so, in task__ In step, the callback function of the task will be set to__ wakeup
- Why__ wakeup, because HelloWorld () is not completed, it needs to be executed again__ wakeup to wake up helloworld()
It is revealed here that in the Eventloop, as long as await is used to call other asynchronous tasks, the parent task will be suspended and the child task will be executed instead. After the child task is completed, it will return to the parent task to continue execution
Take a drink and have a rest. It's more complicated...
3.4) second cycle run_ forever --> run_ once
eventloops.py
def run_once(self): ntodo = len(self._ready) for _ in range(ntodo): handle = self._ready.popleft() handle._run()
- Take the data from the queue, and then_ The ready queue has two tasks, hello() world(), which are added during the for loop of gather
async def hello(): print('enter hello ...') return 'return hello ...' async def world(): print('enter world ...') return 'return world ...'
- Since hello() world() does not have await to call other asynchronous tasks, their execution is relatively simple, and they are executed once task__ The step is over and reaches set_ At result()
- set_result() puts the callback function into the_ ready queue, waiting for the next loop execution
3.5) third cycle run_ forever --> run_ once
- Let's look at the callback function
def _done_callback(fut): nonlocal nfinished nfinished += 1 if nfinished == nfuts: results = [] for fut in children: res = fut.result() results.append(res) outer.set_result(results)
- Yes, this is the second core point of this article. Let's analyze it carefully
- The main logic of this code is that the callback function of the parent task will be started only after all the subtasks are executed. In this article, the parent task will be started only after hello() world() is executed if nfinished == nfuts:_ GatheringFuture callback outer set_ result(results)
- results. Append (result) takes the results of the subtask and puts them into the results of the parent task
- The execution of the subtask is completed. Finally, it's time to wake up the parent task__ wakeup
def __wakeup(self, future): try: future.result() except Exception as exc: raise exc else: self.__step() self = None
3.6) fourth cycle run_ forever --> run_ once
- future.result() from_ GatheringFuture takes out the results and enters task__ step
def __step(self, exc=None): coro = self._coro try: if exc is None: result = coro.send(None) else: result = coro.throw(exc) except StopIteration as exc: super().set_result(exc.value) else: blocking = getattr(result, '_asyncio_future_blocking', None) if blocking: result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, result) finally: self = None
- result = coro.send(None) is actually helloworld() -- > send will jump back to the original yield, that is, future__ await__
def __await__(self): if self._state == _PENDING: self._asyncio_future_blocking = True yield self return self.result()
- return self.result() finally returns to the helloworld() function
async def helloworld(): print('enter helloworld') ret = await wilsonasyncio.gather(hello(), world()) print('exit helloworld') return ret
- helloworld finally finished executing and returned ret
3.7) fifth cycle run_ forever --> run_ once
- End of cycle
- Back to run
3.8) return to the main function to obtain the return value
if __name__ == "__main__": ret = wilsonasyncio.run(helloworld()) print(ret)
3.9) implementation results
▶ python3 main.py enter helloworld enter hello ... enter world ... exit helloworld ['return hello ...', 'return world ...']
5, Process summary
6, Summary
● it's finally over. This is a very long section, but I still don't mention many details. If you have any questions, please leave a message in time
● _ GatheringFuture is a very important object. It not only tracks the execution state of hello() world(), wakes up helloworld(), but also passes the return value to helloworld
● await async yield requires special attention to process control
● the code in this article is tailored by referring to the source code of asyncio in python 3.7.7
● code in this document: code
So far, this article ends
I have little talent and learning. Some people spill soup and leak water. Please don't hesitate to give me advice
For more articles, please follow me: wilson.chai