Python asyncio asynchronous programming

Python asyncio asynchronous programming

Author: Wu Peiqi
source: http://www.cnblogs.com/wupeiqi/
The copyright of this article belongs to the author and the blog park. Reprint is welcome, but this statement must be retained without the consent of the author, and the original text link must be given in an obvious position on the article page.

I wonder if you find that more and more people are talking about asynchrony, such as FastAPI, Tornado, Sanic, Django 3, aiohttp, etc.

I heard how asynchronous is awesome? How about the performance of the crane.... But what's the matter with him?

In this section, I want to talk with you about asyncio asynchrony!

asyncio tutorial: https://study.163.com/instructor/3525856.htm

Blog Park synchronization: https://www.cnblogs.com/wupeiqi/

1. Coordination process

If you want to learn asyncio, you must first understand the collaborative process. The collaborative process is fundamental!

Coroutine, also known as micro thread, is a context switching technology in user state. In short, it is actually to switch code blocks to each other through a thread. For example:

def func1():
    print(1)
    ...
    print(2)


def func2():
    print(3)
    ...
    print(4)


func1()
func2()

The above code is a common function definition and execution. The codes in the two functions are executed according to the process, and will be output successively: 1, 2, 3 and 4. However, if the coprocessing technology is involved, the function can be implemented. See code switching and execution, and the final input: 1, 3, 2 and 4.

There are many ways to implement a coroutine in Python, such as:

  • greenlet is a third-party module used to implement the collaboration code (Gevent collaboration is implemented based on greenlet)
  • yield, generator, with the help of the characteristics of the generator, you can also realize the co process code.
  • asyncio, at Python 3 The module introduced in 4 is used to write the cooperation program code.
  • Async & awiat, at Python 3 The two keywords introduced in 5, combined with asyncio module, can be more convenient to write collaboration code.

1.1 greenlet

Greenlet is a third-party module. You need to install pip3 install greenlet in advance to use it.

from greenlet import greenlet


def func1():
    print(1)        # Step 1: output 1
    gr2.switch()    # Step 3: switch to func2 function
    print(2)        # Step 6: output 2
    gr2.switch()    # Step 7: switch to func2 function and continue to execute backward from the last execution position


def func2():
    print(3)        # Step 4: output 3
    gr1.switch()    # Step 5: switch to func1 function and continue to execute backward from the last execution position
    print(4)        # Step 8: output 4


gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch()  # Step 1: execute func1 function

Note: parameters can also be passed in switch to pass values to each other during switching execution.

1.2 yield

The yield and yield form keywords of Python based generator implement the co process code.

def func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4


f1 = func1()
for item in f1:
    print(item)

Note: the yield form keyword is in Python 3 3.

1.3 asyncio

In Python 3 4 before, the official did not provide the class library of collaborative process. Generally, we use greenlet and other tools to realize it. In Python 3 4 after the release, the official support process, namely asyncio module.

import asyncio


@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # In case of IO time-consuming operation, the automation switches to other tasks in tasks
    print(2)


@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2)  # In case of IO time-consuming operation, the automation switches to other tasks in tasks
    print(4)


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

Note: the collaboration based on asyncio module is more powerful than before, because it also integrates the function of automatic switching in case of IO time-consuming operations.

1.4 async & awit

Async & awit keyword in Python 3 5 version. The collaboration code written by him is actually an enhanced version of the previous example, which makes the code easier.

Python3. After 8 @ asyncio The coroutine decorator will be removed. It is recommended to use the async & awit keyword to implement the collaboration code.

import asyncio


async def func1():
    print(1)
    await asyncio.sleep(2)
    print(2)


async def func2():
    print(3)
    await asyncio.sleep(2)
    print(4)


tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

1.5 summary

There are many ways to implement collaborative processes. At present, the mainstream use is the asyncio module and async & await keyword officially recommended by Python. For example, it has been supported in tonado, sanic, fastapi and django3.

Next, we will explain the asyncio module + async & await keyword in more detail.

2. Significance of collaborative process

Through learning, we have learned that a coroutine can switch back and forth in multiple contexts through one thread.

But what is the significance of switching back and forth? (I see many articles on the Internet licking Xiecheng. Where is Xiecheng awesome?)

  1. Computational operations are executed by switching back and forth in the co process, which makes no sense. Switching back and forth and saving the state will reduce the performance.
  2. For IO type operations, the co process is used to switch and execute other tasks during the IO waiting time. When the IO operation is completed, it will automatically call back, which will greatly save resources and provide performance, so as to realize asynchronous programming (other codes can be executed without waiting for the task to end).

2.1 reptile cases

For example: Download URL with code_ List.

  • Mode 1: synchronous programming
"""
Download pictures using third-party modules requests,Please install in advance: pip3 install requests
"""
import requests


def download_image(url):
    print("Start downloading:", url)
    # Send network request and download pictures
    response = requests.get(url)
    print("Download complete")
    # Save picture to local file
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)


if __name__ == '__main__':
    url_list = [
        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/ChcCSV2BBICAUntfAADjJFd6800429.jpg',
        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]
    for item in url_list:
        download_image(item)
  • Mode 2: asynchronous programming based on CO process
"""
Download pictures using third-party modules aiohttp,Please install in advance: pip3 install aiohttp
"""
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import aiohttp
import asyncio


async def fetch(session, url):
    print("Send request:", url)
    async with session.get(url, verify_ssl=False) as response:
        content = await response.content.read()
        file_name = url.rsplit('_')[-1]
        with open(file_name, mode='wb') as file_object:
            file_object.write(content)


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
            'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/ChcCSV2BBICAUntfAADjJFd6800429.jpg',
            'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await asyncio.wait(tasks)
if __name__ == '__main__':
    asyncio.run(main())

After comparing the execution of the above two methods, it will be found that asynchronous programming based on CO process is much more efficient than synchronous programming. Because:

  • For synchronous programming, queue up one by one in order. If the picture download time is 2 minutes, it will take 6 minutes to complete all execution.
  • Asynchronous programming sends requests for three download tasks almost at the same time (automatically switch to send other task requests in case of IO requests). If the picture download time is 2 minutes, it will take about 2 minutes to complete all execution.

2.2 summary

Coroutines are generally used in programs with IO operations, because coroutines can use the waiting time of IO to execute some other code, so as to improve the efficiency of code execution.

Isn't this also true in life? Suppose you are the boss of a car manufacturer. After the employee clicks the [start] button of the equipment, you need to wait 30 minutes in front of the equipment, and then click the [End] button. At this time, as the boss, you must hope that the employee will do other work in the 30 minutes he waits.

3. Asynchronous programming

Async & await keyword based coprocessor can realize asynchronous programming, which is also the mainstream technology related to python asynchrony.

To really understand the built-in asynchronous programming in Python, look at it a little bit in the following order.

3.1 event cycle

Event loop can be regarded as a while loop, which runs periodically and executes some tasks to terminate the loop under specific conditions.

# Pseudo code
 task list = [Task 1, Task 2, Task 3, ...]

while True:
    Executable task list, completed task list = Go to the task list to check all tasks, and'Executable'and'Completed'Task return

    for Ready task in List of tasks ready:
        Perform ready tasks

    for Completed tasks in List of completed tasks:
        Remove completed tasks from the task list

    If all the tasks in the task list have been completed, the cycle is terminated

When writing a program, you can obtain and create an event loop through the following code.

import asyncio
loop = asyncio.get_event_loop()

3.2 concurrent and asynchronous programming

The co process function is defined as async def Function of.

A coroutine object is an object returned by calling a coroutine function.

# Define a coprocessor function
async def func():
    pass

# Call the coroutine function to return a coroutine object
result = func()

Note: when calling a coroutine function, the internal code of the function will not be executed, but a coroutine object will be returned.

3.2.1 basic application

In the program, if you want to execute the internal code of the coroutine function, you need to cooperate with the event loop and the coroutine object, such as:

# Define a coprocessor function
async def func():
    pass

# Call the coroutine function to return a coroutine object
result = func()

This process can be simply understood as: add the collaboration as a task to the task list of the event loop, and then the event loop detects whether the collaboration in the list is ready (which can be understood as ready by default). If it is ready, execute its internal code.

3.2.2 await

Await is a keyword that can only be used in the coroutine function. It is used to suspend the current coroutine (task) when an IO operation is encountered. During the suspension of the current coroutine (task), the event loop can execute other coroutines (tasks). When the IO processing of the current coroutine is completed, it can switch back to execute the code after await again. The code is as follows:

Example 1:

import asyncio


async def func():
    print("Internal code of execution coprocessor function")
    # Suspend the current collaboration (task) when an IO operation is encountered, and continue to execute after the IO operation is completed.
    # When the current process is suspended, the event loop can execute other processes (tasks).
    response = await asyncio.sleep(2)
    print("IO The request ended with:", response)
result = func()
asyncio.run(result)

Example 2:

import asyncio


async def others():
    print("start")
    await asyncio.sleep(2)
    print('end')
    return 'Return value'


async def func():
    print("Internal code of execution coprocessor function")
    # In case of IO operation, suspend the current collaboration (task), and continue to execute after the IO operation is completed. When the current process is suspended, the event loop can execute other processes (tasks).
    response = await others()
    print("IO The request ended with:", response)
asyncio.run(func())

Example 3:

import asyncio


async def others():
    print("start")
    await asyncio.sleep(2)
    print('end')
    return 'Return value'


async def func():
    print("Internal code of execution coprocessor function")
    # Suspend the current collaboration (task) when an IO operation is encountered, and continue to execute after the IO operation is completed. When the current process is suspended, the event loop can execute other processes (tasks).
    response1 = await others()
    print("IO The request ended with:", response1)
    response2 = await others()
    print("IO The request ended with:", response2)
asyncio.run(func())

All the above examples only create one task, that is, there is only one task in the task list of the event cycle, so the effect of switching to other tasks cannot be demonstrated while IO is waiting.

If you want to create multiple Task objects in your program, you need to use Task objects to implement them.

3.2.3 Task object

Tasks are used to schedule coroutines concurrently.

When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon.

Tasks is used to concurrently schedule collaborative processes through asyncio create_ Create a task object in the form of task (collaboration object), so that the collaboration can be added to the event loop to wait for the scheduled execution. In addition to using asyncio create_ In addition to the task () function, you can also use a low-level loop create_ Task () or ensure_ The future() function. Manual instantiation of task objects is not recommended.

In essence, it encapsulates the collaboration object into a task object, immediately adds the collaboration into the event loop, and tracks the status of the collaboration at the same time.

Note: asyncio create_ The task () function was added in Python 3.7. Before Python 3.7, you can use the low-level asyncio ensure_ The future() function.

Example 1:

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "Return value"


async def main():
    print("main start")
    # Create a collaboration, encapsulate the collaboration into a Task object, immediately add it to the Task list of the event cycle, and wait for the event cycle to execute (ready by default).
    task1 = asyncio.create_task(func())
    # Create a collaboration, encapsulate the collaboration into a Task object, immediately add it to the Task list of the event cycle, and wait for the event cycle to execute (ready by default).
    task2 = asyncio.create_task(func())
    print("main end")
    # When an IO operation is encountered during the execution of a collaboration, it will automatically switch to other tasks.
    # The await here is to wait for the execution of the corresponding processes and obtain the results
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)
asyncio.run(main())

Example 2:

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "Return value"


async def main():
    print("main start")
    # Create a collaboration, encapsulate the collaboration into the Task object and add it to the Task list of the event cycle, and wait for the event cycle to execute (ready by default).
    # In call
    task_list = [
        asyncio.create_task(func(), name="n1"),
        asyncio.create_task(func(), name="n2")
    ]
    print("main end")
    # When an IO operation is encountered during the execution of a collaboration, it will automatically switch to other tasks.
    # await here is to wait for all cooperation processes to be executed and save the return values of all cooperation processes to done
    # If the timeout value is set, it means that the maximum waiting time here is seconds. The returned value of the completed process is written to done. If it is not completed, it is written to pending.
    done, pending = await asyncio.wait(task_list, timeout=None)
    print(done, pending)
asyncio.run(main())

Note: asyncio The wait source code will execute ensure for each collaboration in the list_ Future is thus encapsulated as a Task object, so when used with wait, the Task_ The value of list is [func(),func()].

Example 3:

import asyncio


async def func():
    print("Internal code of execution coprocessor function")
    # Suspend the current collaboration (task) when an IO operation is encountered, and continue to execute after the IO operation is completed. When the current process is suspended, the event loop can execute other processes (tasks).
    response = await asyncio.sleep(2)
    print("IO The request ended with:", response)
coroutine_list = [func(), func()]
# Error: coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]
# Asyncio.com cannot be used directly here create_ Task, because the task is immediately added to the task list of the event cycle,
# However, the event loop has not been created at this time, so an error will be reported.
# Use asyncio Wait encapsulates the list as a coroutine and calls asyncio The run implementation executes two coroutines
# asyncio.wait will execute ensure for each collaboration in the list_ Future, encapsulated as a Task object.
done, pending = asyncio.run(asyncio.wait(coroutine_list))

3.2.4 asyncio.Future object

A Futureis a special low-level awaitable object that represents an eventual result of an asynchronous operation.

The Future object in asyncio is a relatively more underlying object. Usually, we don't use this object directly, but directly use the Task object to complete the Task and track the status. (Task is a subclass of Futrue)

Future provides us with the processing of the final result in asynchronous programming (the Task class also has the function of state processing).

Example 1:

async def main():
    # Get current event loop
    loop = asyncio.get_running_loop()
    # # Create a task (Future object) that does nothing.
    fut = loop.create_future()
    # Wait for the final result of the task (Future object). If there is no result, it will wait forever.
    await fut
asyncio.run(main())

Example 2:

import asyncio


async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result("666")


async def main():
    # Get current event loop
    loop = asyncio.get_running_loop()
    # If you create a task (Future object) without binding any behavior, the task will never know when to end.
    fut = loop.create_future()
    # Create a Task (Task object) and bind the set_after function, the function will assign a value to fut after 2s.
    # That is, manually set the final result of the future task, and then future can end.
    await loop.create_task(set_after(fut))
    # Wait for the Future object to get the final result, otherwise wait forever
    data = await fut
    print(data)
asyncio.run(main())

The Future object itself is bound to the function, so you need to set it manually if you want the event to cycle to obtain the results of Future. The Task object inherits the Future object, which actually extends the Future. It can automatically execute set after the execution of the corresponding bound function_ Result to achieve automatic end.

Although Task objects are usually used, the essence of processing results is based on Future objects.

Extension: objects that support await object syntax become wait objects, so collaboration objects, Task objects, and Future objects can all be wait objects.

3.2.5 futures.Future object

In Python's concurrent There is also a Future object in the Future module, which is used to implement asynchronous operations based on thread pool and process pool.

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor


def func(value):
    time.sleep(1)
    print(value)


pool = ThreadPoolExecutor(max_workers=5)
# Or pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
    fut = pool.submit(func, i)
    print(fut)

Two future objects are different. They are designed for different application scenarios, such as concurrent futures. Future does not support await syntax, etc.

Official tips: there are differences between the two objects:

In Python, there is an example of future The future object is wrapped as asyncio The function asynic. Of the future object wrap_ future.

Next, you must ask: why does python provide this function?

In fact, generally, in program development, we either use asycio's co process to realize asynchronous operation, or use process pool and thread pool to realize asynchronous operation. However, this function will be used if the asynchrony of the coroutine is mixed with the asynchrony of the process pool / thread pool.

import time
import asyncio
import concurrent.futures


def func1():
    # A time-consuming operation
    time.sleep(2)
    return "SB"


async def main():
    loop = asyncio.get_running_loop()
    # 1. Run in the default loop's executor
    # Step 1: the internal will first call the submit method of ThreadPoolExecutor to apply for a thread in the thread pool to execute func1 function and return a concurrent futures. Future object
    # Step 2: call asyncio wrap_ Future will be concurrent futures. The future object is wrapped as asycio Future object.
    # Because concurrent futures. Future objects do not support await syntax, so they need to be wrapped as asycio Only future objects can be used.
    fut = loop.run_in_executor(None, func1)
    result = await fut
    print('default thread pool', result)
    # 2. Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor() as pool:
    #     result = await loop.run_in_executor(
    #         pool, func1)
    #     print('custom thread pool', result)
    # 3. Run in a custom process pool:
    # with concurrent.futures.ProcessPoolExecutor() as pool:
    #     result = await loop.run_in_executor(
    #         pool, func1)
    #     print('custom process pool', result)
asyncio.run(main())

Application scenario: when the project is developed by asynchronous programming of CO program, if a third-party module is used, and the third-party module does not support asynchronous programming of CO program, this function is required, for example:

import asyncio
import requests


async def download_image(url):
    # Send a network request and download pictures (in case of an IO request to download pictures from the network, automatically switch to other tasks)
    print("Start downloading:", url)
    loop = asyncio.get_event_loop()
    # The requests module does not support asynchronous operation by default, so it uses thread pool to cooperate with the implementation.
    future = loop.run_in_executor(None, requests.get, url)
    response = await future
    print('Download complete')
    # Save picture to local file
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)

if __name__ == '__main__':
    url_list = [
        'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
        'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/ChcCSV2BBICAUntfAADjJFd6800429.jpg',
        'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]
    tasks = [download_image(url) for url in url_list]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

3.2.6 asynchronous iterators

What is an asynchronous iterator

Realized __aiter__() and __anext__() Method__ anext__ One must be returned awaitable Object. async for Will handle asynchronous iterators __anext__() Method until it raises a StopAsyncIteration Abnormal. from PEP 492 introduce.

What is an asynchronous iteratable object?

Available in async for The object used in the statement. Must pass its __aiter__() Method returns a asynchronous iterator . from PEP 492 introduce.

import asyncio


class Reader(object):
    """ Custom asynchronous iterators (also asynchronous iteratable objects) """

    def __init__(self):
        self.count = 0

    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise StopAsyncIteration
        return val


async def func():
    # Create asynchronous iteratable objects
    async_iter = Reader()
    # async for must be placed in the async def function, otherwise the syntax is incorrect.
    async for item in async_iter:
        print(item)
asyncio.run(func())

Asynchronous iterators don't really do much. They just support async for syntax.

3.2.6 asynchronous context manager

Such objects are defined by __aenter__() and __aexit__() Method to async with Statement. from PEP 492 introduce.

import asyncio


class AsyncContextManager:
    def __init__(self):
        self.conn = None

    async def do_something(self):
        # Asynchronous operation database
        return 666

    async def __aenter__(self):
        # Asynchronously linked database
        self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        # Closing database links asynchronously
        await asyncio.sleep(1)


async def func():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)
asyncio.run(func())

This asynchronous context manager is still useful. It can be used to open, process and close operations during development.

3.3 summary

As long as you see the async and await keywords in the program, its internal is asynchronous programming based on CO process. This asynchronous programming is to execute other tasks through a thread in the IO waiting time, so as to achieve concurrency.

The above is a common operation of asynchronous programming. Refer to the official documentation for the content.

4. uvloop

asyncio module is provided in Python standard library to support asynchronous programming based on coroutine.

Uvloop is an alternative to the event loop in asyncio, which can improve the performance of asyncio. In fact, uvloop is at least twice as fast as other python asynchronous frameworks such as nodejs and gevent, and its performance can be comparable to that of Go language.

Install uvloop

pip3 install uvloop

It is also very simple to replace the event loop of asyncio with uvloop in the project, as long as you do so in the code.

import asyncio
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Write asyncio code, which is consistent with the code written before.
# The internal event loop automation becomes uvloop
asyncio.run(...)

Note: the well-known asgi uvicon uses the uvloop event loop internally.

5. Practical cases

For better understanding, the IO situation of all the above examples is based on asyncio Sleep is taken as an example, and a lot of IO will be used in real project development.

5.1 asynchronous Redis

When redis is operated through python, linking, setting values and obtaining values all involve network IO requests. Using asycio asynchronously, you can do some other tasks while waiting for IO, so as to improve performance.

Install Python asynchronous operation redis module

pip3 install aioredis

Example 1: asynchronous operation redis.

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aioredis


async def execute(address, password):
    print("Start execution", address)
    # Network IO operation: create redis connection
    redis = await aioredis.create_redis(address, password=password)
    # Network IO operation: set the hash value car in redis, and set three key value pairs internally, namely: redis = {car: {key1:1, key2:2, key3:3}
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    # Network IO operation: get the value from redis
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
    redis.close()
    # Network IO operation: close redis connection
    await redis.wait_closed()
    print("end", address)
asyncio.run(execute('redis://47.93.4.198:6379', "root!2345"))

Example 2: connecting multiple redis for operation (switching other tasks when encountering IO provides performance).

import asyncio
import aioredis


async def execute(address, password):
    print("Start execution", address)
    # Network IO operation: connect 47.93.4.197:6379 first, automatically switch tasks in case of IO, and connect 47.93.4.198:6379
    redis = await aioredis.create_redis_pool(address, password=password)
    # Network IO operation: automatically switch tasks in case of Io
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    # Network IO operation: automatically switch tasks in case of Io
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
    redis.close()
    # Network IO operation: automatically switch tasks in case of Io
    await redis.wait_closed()
    print("end", address)

task_list = [
    execute('redis://47.93.4.197:6379', "root!2345"),
    execute('redis://47.93.4.198:6379', "root!2345")
]
asyncio.run(asyncio.wait(task_list))

For more redis operations, please refer to the official website of aioredis: https://aioredis.readthedocs.io/en/v1.3.0/start.html

5.2 asynchronous MySQL

When operating MySQL through python, connecting, executing SQL and closing all involve network IO requests. Using asycio asynchronously, you can do some other tasks while waiting for IO, so as to improve performance.

Install Python asynchronous operation redis module

pip3 install aiomysql

Example 1:

import asyncio
import aiomysql


async def execute():
    # Network IO operation: connect to MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
    # Network IO operation: create CURSOR
    cur = await conn.cursor()
    # Network IO operation: execute SQL
    await cur.execute("SELECT Host,User FROM user")
    # Network IO operation: get SQL results
    result = await cur.fetchall()
    print(result)
    # Network IO operation: close link
    await cur.close()
    conn.close()

asyncio.run(execute())

Example 2:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aiomysql


async def execute(host, password):
    print("start", host)
    # Network IO operation: connect 47.93.40.197 first, automatically switch tasks in case of IO, and connect 47.93.40.198:6379
    conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
    # Network IO operation: automatically switch tasks in case of Io
    cur = await conn.cursor()
    # Network IO operation: automatically switch tasks in case of Io
    await cur.execute("SELECT Host,User FROM user")
    # Network IO operation: automatically switch tasks in case of Io
    result = await cur.fetchall()
    print(result)
    # Network IO operation: automatically switch tasks in case of Io
    await cur.close()
    conn.close()
    print("end", host)

task_list = [
    execute('47.93.40.197', "root!2345"),
    execute('47.93.40.197', "root!2345")
]
asyncio.run(asyncio.wait(task_list))

5.3 FastAPI framework

FastAPI is a high-performance web framework for building API s, which is based on Python 3 type hints of 6 +.

The following asynchronous examples are explained by FastAPI and uvicon (uvicon is an asgi that supports asynchronous).

Install the FastAPI web framework,

pip3 install fastapi

Install uvicon, which essentially provides asgi supported by socket server for the web (generally supports asynchronous asgi and does not support asynchronous wsgi)

pip3 install uvicorn

Example:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)


@app.get("/")
def index():
    """ Common operation interface """
    return {"message": "Hello World"}


@app.get("/red")
async def red():
    """ Asynchronous operation interface """
    print("The request came")
    await asyncio.sleep(3)
    # Get a connection from connection pool
    conn = await REDIS_POOL.acquire()
    redis = Redis(conn)
    # Set value
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    # Read value
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
    # Connection pool
    REDIS_POOL.release(conn)
    return result

if __name__ == '__main__':
    uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")

When there are concurrent requests from multiple users, the interface written asynchronously can process other requests during IO waiting to provide performance.

For example, there are two concurrent users to the interface http://127.0.0.1:5000/red When sending a request, the server has only one thread, and only one request is processed at the same time. Asynchronous processing can provide concurrency because: when the view function is processing the first request, the second request is waiting to be processed. When the first request encounters IO waiting, it will automatically switch to receive and process the second request. When it encounters IO, it will automatically switch to other requests. Once the requested IO is executed, It will return to the specified request again and continue to execute its function code downward.

Cases of automatic management based on context management:

Example 1: redis

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)


@app.get("/")
def index():
    """ Common operation interface """
    return {"message": "Hello World"}


@app.get("/red")
async def red():
    """ Asynchronous operation interface """
    print("The request came")
    async with REDIS_POOL.get() as conn:
        redis = Redis(conn)
        # Set value
        await redis.hmset_dict('car', key1=1, key2=2, key3=3)
        # Read value
        result = await redis.hgetall('car', encoding='utf-8')
        print(result)
    return result

if __name__ == '__main__':
    uvicorn.run("fast3:app", host="127.0.0.1", port=5000, log_level="info")

Example 2: mysql

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import uvicorn
from fastapi import FastAPI
import aiomysql

app = FastAPI()
# Create database connection pool
pool = aiomysql.Pool(host='127.0.0.1', port=3306, user='root', password='123', db='mysql',
                     minsize=1, maxsize=10, echo=False, pool_recycle=-1, loop=asyncio.get_event_loop())

@app.get("/red")
async def red():
    """ Asynchronous operation interface """
    # Application link to database connection pool
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            # Network IO operation: execute SQL
            await cur.execute("SELECT Host,User FROM user")
            # Network IO operation: get SQL results
            result = await cur.fetchall()
            print(result)
            # Network IO operation: close link
    return {"result": "ok"}

if __name__ == '__main__':
    uvicorn.run("fast2:app", host="127.0.0.1", port=5000, log_level="info")

5.4 reptiles

When writing a crawler application, you need to request target data through network IO. This situation is suitable for asynchronous programming to improve performance. Next, we use the AIO HTTP module that supports asynchronous programming.

Installing the aiohttp module

pip3 install aiohttp

Example:

import aiohttp
import asyncio


async def fetch(session, url):
    print("Send request:", url)
    async with session.get(url, verify_ssl=False) as response:
        text = await response.text()
        print("Results obtained:", url, len(text))


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://python.org',
            'https://www.baidu.com',
            'https://www.pythonav.com'
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

summary

In order to improve performance, more and more frameworks are moving closer to asynchronous programming, such as sanic, tornado and Django 3 0. django channels components can do more things with less resources. Why not.

This article is reproduced from Wu Peiqi Teacher blog Python AV resource sharing

Original link: https://pythonav.com/wiki/detail/6/91/

Keywords: Python

Added by gigantorTRON on Thu, 20 Jan 2022 00:32:23 +0200