There are four ways to implement collaborative processes in Python

Synergetic process

Coprocess is not provided by computer, but a man-made context switching technology, which can also be called micro thread. In short, it is actually to switch the execution of code blocks in a thread.
We know that normal code is executed from top to bottom. After a method or function is operated, it will enter the next method or function for execution. For example:

def func1():
	print(1)
	print(2)
	
def func2():
	print(3)
	print(4)

func1()
func2()

At this time, the code execution logic must first execute the statements in the func1() object and then execute func2(). This is called synchronization. But what if we want to switch to func2() after print(1) in func1() object?

The following CO process based methods can be adopted:

  • greenlet.
  • yield keyword
  • asyncio decorator (introduced after py3.4)
  • async and await keywords (introduced after py3.5) [recommended]

1. greenlet implementation collaboration

# greenlet is a third-party module that needs to be introduced first
pip3 install greenlet
# -*- coding: utf-8 -*-
# author: micher.yu
# Time: 2022/01/08
# simple_desc :
from greenlet import greenlet


def func1():
	print(1)  # Step 2: output 1
	gr2.switch()  # Step 3: switch to func2 function
	print(2)  # Step 6: output 2
	gr2.switch()  # Step 7: switch to func2 function (if you do not switch, the handle will continue to execute, and it will not enter func2 output 4)


def func2():
	print(3)  # Step 4: output 3
	gr1.switch()  # Step 5: switch to func1 function
	print(4)  # Step 8: output 4. After func2 function is executed, handle continues to execute


def func3():
	print(5)  # Step 10: output 5


gr1 = greenlet(func1)  # This is only a func1 object that generates a greenlet wrapper, and the code will not actually run
gr2 = greenlet(func2)  # func2 objects wrapped in greenlet are generated here

gr1.switch()  # Step 1: This is the formal execution func1() object
func3()  # Step 9: instantiate func3

# So the actual output will be 1 3 2 4 5

2. yield keyword

Not recommended. There are few practical application scenarios.

If you are not familiar with the yield keyword, you can refer to previous articles Explain in detail the three major tools of python -- iterator, generator and decorator The generator part is explained in detail

def func1():
	yield 1
	yield from func2()  # This is actually equivalent to for item in func2(): yield item 
	yield 2


def func2():
	yield 3
	yield 4


for item in func1():
	print(item)

# The output will be: 1 3 4 2

3. asyncio module

In Python 3 4 and later versions. This framework uses event loops to orchestrate callback and asynchronous tasks. The event loop is in the context of the event loop policy.
The following figure shows the interaction between synergy, event loop and strategy

Note: asyncio is forced to switch automatically in case of IO blocking!

Let's use @ asyncio The coroutine decorator (py3.10 + will be removed) defines two coprocessor functions. ( Generator based coroutine)

import asyncio


@asyncio.coroutine
def func1():
	print(1)
	# Asyncio. Is used here Sleep (2) to simulate IO time consumption (asyncio.sleep is also a co process object and cannot use time.sleep()). When the co process function defined by asyncio encounters IO operation, it will automatically switch to other tasks in the event loop
	yield from asyncio.sleep(2)  
	print(2)


@asyncio.coroutine
def func2():
	print(3)
	yield from asyncio.sleep(2)
	print(4)

PS: asyncio can still be used if py version is higher than 3.8 Coroutine decorator, but there will be alarms. It is recommended that you use the async & await keyword to define the coprocessor function, which will not affect the use!

A coroutine function cannot be instantiated and run directly like an ordinary function. Calling a coroutine function does not start running, but only returns a coroutine object.

fun1() # There will be no results here

You can use asyncio Iscoroutine to verify whether it is a coroutine object

print(asyncio.iscoroutine(func1()))  # True

The collaboration object must be event loop We can run through asyncio get_ event_ Loop method to get the currently running loop instance. For example, loop object, and then give the coroutine object to loop run_ until_ Complete, the collaboration object will then be run in the loop.

loop = asyncio.get_event_loop()
loop.run_until_complete(func1())
# The operation result is:
# 1
# Wait for 2s
# 2

run_until_complete is a blocking call that does not return until the coroutine runs; So what he must accept is a waiting object (collaboration, task and future object).
run_ until_ The parameter of complete is a future, but what we pass to it here is a coroutine object. This is possible because it has been checked internally

To turn this collaboration object into future, you can use asyncio ensure_ Future method.

Therefore, we can write more clearly:

loop = asyncio.get_event_loop()
loop.run_until_complete(asnycio.ensure_future(func1()))
# The operation result is:
# 1
# Wait for 2s
# 2

What should I do when there are multiple coprocessor functions to run?

  1. We can wrap the collaboration object into a future object, put it in a list, and then use asyncio Wait runs.

    asyncio. The wait method or await keyword can only be passed to a waiting object

    tasks = [
    	asyncio.ensure_future(func1()),  # Turn the collaboration object package into a future object
    	asyncio.ensure_future(func2())
    ]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    # The operation result is:
    # 1
    # 3
    # Wait for 2s
    # 2
    # 4
    
  2. Via asyncio Gather can directly put the collaboration object into the list (must unpack! That is * []):

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*[func1(), func2()]))
    # The operation result is:
    # 1
    # 3
    # Wait for 2s
    # 2
    # 4
    

The complete code is:

import asyncio


@asyncio.coroutine
def func1():
	print(1)
	yield from asyncio.sleep(2)  # Asyncio. Is used here Sleep (2) to simulate IO time consumption (asyncio.sleep is also a co process object and cannot be used with time.sleep()), and automatically switch to other tasks in tasks
	print(2)


@asyncio.coroutine
def func2():
	print(3)
	yield from asyncio.sleep(2)  # After IO blocking occurs here, it will automatically switch to other tasks in tasks
	print(4)


func1()  # When calling a coroutine function, the coroutine does not start running, but only returns a coroutine object. You can use asyncio Iscoroutine to verify whether it is a coroutine object
print(asyncio.iscoroutine(func1()))  # True

tasks = [
	asyncio.ensure_future(func1()),  # Turn the collaboration object package into a future object
	asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
# Mode 1:
loop.run_until_complete(asyncio.wait(tasks))
# Mode 2:
loop.run_until_complete(asyncio.gather(*[func1(), func2()]))

Similarly, we can execute Task objects:

The Task object is used to run one Task and other tasks concurrently.

Task objects can use asyncio create_ The task () function can also be created using loop create_ task()

  • Cancel Task object (cancel)
  • Whether the Task is cancelled()
  • Whether the Task object is completed (done)
  • Return result()
    1. When the task object is completed, the result is returned
    2. If the task object is cancelled, the cancelederror exception is thrown
    3. If the result of the task object is unavailable, an InvalidStateError exception is thrown
  • Add a callback and trigger add when the task is completed_ done_ callback(task)
  • All task list asyncio all_ tasks()
  • Returns the current task asyncio current_ task()

Create using the loop object_ The Task function creates a Task object. When the Task object is printed for the first time, the status is pending and the status after the function is executed is finished.

import asyncio



async def do_something():
	print("This is a Task example....")
	# Simulate blocking for 1 second
	await asyncio.sleep(1)
	return "Task Mission accomplished"


# Create an event event_loop
loop = asyncio.get_event_loop()

# Create a task
task = loop.create_task(do_something())
# Print task for the first time
print(task)

# Add task to event_ In loop
loop.run_until_complete(task)
# Print task again
print(task)
print(task.result())
""" Operation results
#1 <Task pending name='Task-1' coro=<do_something() running at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97>>
#2 this is an example of a Task
#3 <Task finished name='Task-1' coro=<do_ something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_ demo. Py: 97 > result ='task completed '>
#4 Task completion
"""

The result() function of the Task object can get do_ The return value of the something() function.

Task callback

import asyncio



async def do_something(task_id):
	print(f"This is a Task example,current task_id: {task_id}")
	# Simulate blocking for 1 second
	await asyncio.sleep(1)
	return f"Task-id {task_id} Mission accomplished"


# Callback function after task completion
def callback(task):
	# Print parameters
	print(task)
	# Print returned results
	print(task.result())


# Create an event event_loop
loop = asyncio.get_event_loop()

# Create a task
tasks = []
for i in range(5):
	name = f"task-{i}"
	task = loop.create_task(do_something(name), name=name)
	task.add_done_callback(callback)
	tasks.append(task)

# Add task to event_ In loop
loop.run_until_complete(asyncio.wait(tasks))
""" Output is:
This is a Task example,current task_id: task-0
 This is a Task example,current task_id: task-1
 This is a Task example,current task_id: task-2
 This is a Task example,current task_id: task-3
 This is a Task example,current task_id: task-4
<Task finished name='task-0' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-0 Mission accomplished'>
Task-id task-0 Mission accomplished
<Task finished name='task-1' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-1 Mission accomplished'>
Task-id task-1 Mission accomplished
<Task finished name='task-2' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-2 Mission accomplished'>
Task-id task-2 Mission accomplished
<Task finished name='task-3' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-3 Mission accomplished'>
Task-id task-3 Mission accomplished
<Task finished name='task-4' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-4 Mission accomplished'>
Task-id task-4 Mission accomplished
"""

Use asyncio The wait() function adds the Task list to the event_ In loop, you can also use asyncio Gather() function.

Callback after multiple tasks are executed

import asyncio
import functools



async def do_something(t):
	print("suspend" + str(t) + "second")
	await asyncio.sleep(t)
	return "Suspended" + str(t) + "second"


def callback(event_loop, gatheringFuture):
	print(gatheringFuture.result())
	print("Multiple Task Callback after task completion")


loop = asyncio.get_event_loop()
gather = asyncio.gather(do_something(1), do_something(3))
gather.add_done_callback(functools.partial(callback, loop))

loop.run_until_complete(gather)
""" Output is:
Pause for 1 second
 Pause for 3 seconds
['Paused for 1 second', 'Paused for 3 seconds']
Multiple Task Callback after task completion
"""

4. Async & await keyword [recommended] 🌟]

py3.5 and later

Essentially consistent with asyncio in 3.4, but more powerful.
After 3.5, yield from cannot be used in the function defined by async, but await should be used.

import asyncio


async def func1():
	print(1)
	await asyncio.sleep(2)  # IO auto switch task encountered
	print(2)


async def func2():
	print(3)
	await asyncio.sleep(2)  # After IO blocking occurs here, it will automatically switch to other tasks in tasks
	print(4)


tasks = [
	asyncio.ensure_future(func1()),  # Turn the collaboration object package into a future object
	asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
# Execute a single coprocessor function
loop.run_until_complete(func1())  # Since the await keyword is used in func1, it is equivalent to asyncio wait
""" The output result is:
1
 Wait 2 s
2
"""
# Execute multiple coprocessor functions
loop.run_until_complete(asyncio.wait(tasks))
""" The output result is:
1
3
 Wait 2 s
2
4
"""

Note: Python 3 After 7, you don't need to get the loop object yourself, you can directly call asyncio The inside of the run method has helped us get the loop object and call loop run_ until_ complete

Direct use (but it does not support running multiple coprocessor functions at the same time):

asyncio.run(func1()) 

Async & await keyword simplifies code and is compatible with older generator based coroutines

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

The await keyword can be used more than once in a coprocessor function

import asyncio


async def func():
	print("start")
	await asyncio.sleep(5)
	print("end")
	return "finish"


async def main():
	print("implement main method")
	resp1 = await func()
	print(f"First return value:{resp1}")
	resp2 = await func()
	print(f"Second return value:{resp2}")

asyncio.run(main())

Keywords: Python

Added by collamack on Sat, 08 Jan 2022 14:35:46 +0200