1. Comparison
Python 3.2 introduces concurrent futures.
Version 3.4 introduces asyncio into the standard library, python 3 Use async/await syntax after 5.
library | Class/Method | Concurrency type | Applicable tasks |
---|---|---|---|
multiprocessing | Pool | parallel | cpu intensive |
concurrent.futures | ProcessPoolExecutor | parallel | cpu intensive |
threading | Thread | Concurrent | I/O intensive |
concurrent.futures | ThreadPoolExecutor | Concurrent | I/O intensive |
asyncio | gather | Concurrency (coroutines) | I/O intensive |
concurrent.futures provides a simpler interface to deal with multi-threaded and multi process programming, reducing the complexity of use.
This article shows how to use different libraries through some code examples.
2. IO intensive tasks (threading, ThreadPoolExecutor, asyncio)
2.1 synchronization mode
# io_bound_sync.py import time import requests def fetch(n): return requests.get('http://httpbin.org/get', params={'number': n}) def main(): for num in range(100): fetch(num) start_time = time.perf_counter() main() end_time = time.perf_counter() print(f"Elapsed run time: {end_time - start_time} seconds.")
Executable python io_bound_sync.py returns, taking 68 seconds
Elapsed run time: 68.3351878 seconds.
2.2 threading example
# io_bound_threading.py import time from threading import Thread import requests def fetch(n): return requests.get('http://httpbin.org/get', params={'number': n}) def main(): tasks = [Thread(target=fetch, args=(num,)) for num in range(20)] for task in tasks: task.start() for task in tasks: task.join() start_time = time.perf_counter() main() end_time = time.perf_counter() print(f"Elapsed run time: {end_time - start_time} seconds.")
Executable python io_bound_threading.py returns and the speed is increased by nearly 40 times
Elapsed run time: 1.8064501000000002 seconds.
2.3 concurrent.futures example
# io_bound_threadpool.py import time from concurrent.futures import ThreadPoolExecutor, wait import requests def fetch(n): return requests.get('http://httpbin.org/get', params={'number': n}) def main(): futures = [] with ThreadPoolExecutor() as executor: for num in range(20): futures.append(executor.submit(fetch, num)) wait(futures) start_time = time.perf_counter() main() end_time = time.perf_counter() print(f"Elapsed run time: {end_time - start_time} seconds.")
Execute file io_bound_threadpool.py return
Elapsed run time: 6.0246413 seconds.
concurrent.futures.ThreadPoolExecutor is actually an abstraction of the threading library, making it easier to use. In the previous example, we assigned each request to a thread, using a total of 100 threads. However, the default number of working threads of ThreadPoolExecutor is min(32, os.cpu_count() + 4), So you can see that the execution time is a little longer than the threading version.
You can modify with # ThreadPoolExecutor(max_workers=100) as # executor: specify the number of threads. Run again and the results are as follows:
Elapsed run time: 1.038872 seconds.
ThreadPoolExecutor exists to simplify the process of implementing multithreading. If you need more control over multithreading, use the threading library.
2.4 asyncio example
# io_bound_asyncio.py import time import asyncio import httpx async def fetch(client, n): await client.get("https://httpbin.org/get", params={'number': n}) async def main(): async with httpx.AsyncClient() as client: await asyncio.gather( *[fetch(client, num) for num in range(20)] ) start_time = time.perf_counter() asyncio.get_event_loop().run_until_complete(main()) end_time = time.perf_counter() print(f"Elapsed run time: {end_time - start_time} seconds.")
Execute file io_bound_asyncio.py return
Elapsed run time: 1.6809711999999999 seconds.
Asyncio is faster than threading because threading uses OS (operating system) threads, so threads are scheduled by the operating system, where thread switching is preempted by the operating system. Asyncio uses a protocol defined by the Python interpreter. The program determines when to switch tasks in the best way. This is handled by the # event_loop in asyncio.
3. CPU intensive tasks (multiprocessing, processpool executor)
3.1 synchronization tasks
import time def sum(count): ret = 0 for n in range(count): ret += n return ret def main(): for num in range(1000, 16000): sum(num) start_time = time.perf_counter() main() end_time = time.perf_counter() print(f"Elapsed run time: {end_time - start_time} seconds.")
Operation results
Elapsed run time: 5.0112078 seconds.
3.2 multiprocessing example
import time from multiprocessing import Pool, cpu_count def sum(count): ret = 0 for n in range(count): ret += n return ret def main(): with Pool(cpu_count() - 1) as p: p.starmap(sum, zip(range(1000, 16000))) p.close() p.join() start_time = time.perf_counter() main() end_time = time.perf_counter() print(f"Elapsed run time: {end_time - start_time} seconds.")
Operation results
Elapsed run time: 2.0392013 seconds.
3.3 concurrent.futures example
import time from concurrent.futures import ProcessPoolExecutor, wait from multiprocessing import cpu_count def sum(count): ret = 0 for n in range(count): ret += n return ret def main(): futures = [] with ProcessPoolExecutor(cpu_count() - 1) as executor: for num in range(1000, 16000): futures.append(executor.submit(sum, num)) wait(futures) start_time = time.perf_counter() main() end_time = time.perf_counter() print(f"Elapsed run time: {end_time - start_time} seconds.")
Operation results
Elapsed run time: 8.558755399999999 seconds.
Slower than a single process?, I don't understand why.
4. Conclusion
concurrent.futures is easier to use
Daily coding