asyncio of python asynchronous programming (million concurrent) - Sun cool College

Sun cool College: https://shareku.ke.qq.com/
           
Micro signal: Please specify shaku college QQ group: 979438600
       

Due to the existence of GIL (global lock), python can not give full play to the advantages of multi-core, and its performance has been criticized all the time. However, in IO intensive network programming, asynchronous processing can improve the efficiency hundreds of times than synchronous processing, which makes up for the shortcomings of Python performance, such as the latest micro service framework japronto, and the requests per second can reach millions.

Another advantage of Python is that the libraries (third-party libraries) are extremely rich and easy to use. asyncio was introduced into the standard library in Python 3.4. Python 2x did not add this library. After all, python 3x is the future. Ha ha! Python 3.5 has added the async/await feature.

Before learning asyncio, let's clarify the concept of synchronization / asynchrony:

·Synchronization refers to the logic of completing a transaction. The first transaction is executed first. If it is blocked, it will wait until the transaction is completed, and then the second transaction is executed in sequence...

·Asynchrony is opposite to synchronization. Asynchrony means that after processing and calling this transaction, you will not wait for the processing result of this transaction, but directly process the second transaction, and notify the caller of the processing result through status, notification and callback.

1, asyncio

Let's compare the differences between synchronous code and asynchronous code writing through examples. Next, let's look at the performance gap between them. We use sleep(1) to simulate an io operation that takes one second.
Synchronization code:

import time

def hello():
    time.sleep(1)

def run():
    for i in range(5):
        hello()
        print('Hello World:%s' % time.time())  # Any great code starts with Hello World!
if __name__ == '__main__':
    run()

Output: (interval is about 1s)

Hello World:1527595175.4728756
Hello World:1527595176.473001
Hello World:1527595177.473494
Hello World:1527595178.4739306
Hello World:1527595179.474482

Asynchronous code:

import time
import asyncio

# Define asynchronous functions
async def hello():
    await asyncio.sleep(1)
    print('Hello World:%s' % time.time())

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    tasks = [hello() for i in range(5)]
    loop.run_until_complete(asyncio.wait(tasks))

Output:

Hello World:1527595104.8338501
Hello World:1527595104.8338501
Hello World:1527595104.8338501
Hello World:1527595104.8338501
Hello World:1527595104.8338501

async def is used to define asynchronous functions. await indicates the sleep waiting time of the current collaborative task and allows other tasks to run. Then get an event loop, and the main thread calls asyncio get_ event_ Loop () will create an event loop. You need to throw the asynchronous task to the run of the loop_ until_ With the complete () method, the event loop will schedule the execution of the cooperative program.

2, aiohttp

What if concurrent HTTP requests are needed? Requests are usually used, but requests is a synchronous library. If you want to be asynchronous, you need to introduce AIO http. Here we introduce a class from aiohttp import ClientSession. First, we need to create a session object, and then use the session object to open the web page. Session can perform multiple operations, such as post, get, put, head, etc.
Basic usage:

async with ClientSession() as session:
    async with session.get(url) as response:

Examples of aiohttp asynchronous implementation:

import asyncio
from aiohttp import ClientSession


tasks = []
url = "https://www.baidu.com/{}"
async def hello(url):
    async with ClientSession() as session:
        async with session.get(url) as response:
            response = await response.read()
            print(response)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(hello(url))

First, the async def keyword defines that this is an asynchronous function, and the await keyword is added in front of the operation to wait, response Read() waits for the request response, which is an IO consuming operation. Then use the ClientSession class to initiate an http request.

Multi link asynchronous access

What should we do if we need to request multiple URLs? We can access multiple URLs synchronously by adding a for loop. However, the asynchronous implementation is not so easy. On the basis of the previous work, we need to wrap hello() in the Future object of asyncio, and then pass the Future object list as a task to the event loop.

import time
import asyncio
from aiohttp import ClientSession

tasks = []
url = "https://www.baidu.com/{}"
async def hello(url):
    async with ClientSession() as session:
        async with session.get(url) as response:
            response = await response.read()
#            print(response)
            print('Hello World:%s' % time.time())

def run():
    for i in range(5):
        task = asyncio.ensure_future(hello(url.format(i)))
        tasks.append(task)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    run()
    loop.run_until_complete(asyncio.wait(tasks))

Output:

Hello World:1527754874.8915546
Hello World:1527754874.899039
Hello World:1527754874.90004
Hello World:1527754874.9095392
Hello World:1527754874.9190395

Collect http responses

Well, the above describes the asynchronous implementation of accessing different links, but we only send requests. If we want to collect the responses one by one into a list, and finally save them locally or print them out, how can we implement it, * you can use asyncio Gather (tasks) collects all responses, which is demonstrated by the following example.

import time
import asyncio
from aiohttp import ClientSession

tasks = []
url = "https://www.baidu.com/{}"
async def hello(url):
    async with ClientSession() as session:
        async with session.get(url) as response:
#            print(response)
            print('Hello World:%s' % time.time())
            return await response.read()

def run():
    for i in range(5):
        task = asyncio.ensure_future(hello(url.format(i)))
        tasks.append(task)
    result = loop.run_until_complete(asyncio.gather(*tasks))
    print(result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    run()

Output:

Hello World:1527765369.0785167
Hello World:1527765369.0845182
Hello World:1527765369.0910277
Hello World:1527765369.0920424
Hello World:1527765369.097017
[b'<!DOCTYPE html>\r\n<!--STATUS OK-->\r\n<html>\r\n<head>\r\n......

Exception resolution

If your concurrency reaches 2000, the program will report an error: ValueError: too many file descriptors in select(). The reason for the error report is that the select called by Python has a limit on the maximum number of open files. In fact, this is the limit of the operating system. The maximum number of open files in linux is 1024 by default and 509 in windows by default. If this value is exceeded, the program starts to report an error. Here we have three ways to solve this problem:

1. Limit the number of concurrent. (not so many tasks at a time, or limit the maximum number of concurrent tasks)

2. Use callback.

3. Modify the maximum number of files opened by the operating system. There is a configuration file in the system that can modify the default value. The specific steps are not described.

If you don't modify the default configuration of the system, I recommend limiting the number of concurrency. Set the number of concurrency to 500 to speed up the processing.

#coding:utf-8
import time,asyncio,aiohttp


url = 'https://www.baidu.com/'
async def hello(url,semaphore):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.read()


async def run():
    semaphore = asyncio.Semaphore(500) # Limit concurrency to 500
    to_get = [hello(url.format(),semaphore) for _ in range(1000)] #1000 tasks in total
    await asyncio.wait(to_get)


if __name__ == '__main__':
#    now=lambda :time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()

Keywords: Python Back-end

Added by Trevors on Sun, 02 Jan 2022 01:00:47 +0200