python collaboration in housework

Why use coprocessing? We usually use multithreading or multiprocessing for concurrent programming in Python. For computational tasks, we usually use multiprocessing due to the existence of GIL, while for IO tasks, we can use thread scheduling to make the line give up GIL when executing IO tasks, so as to achieve apparent concurrency.

Coprocess is a "concurrency" running in a single thread. One advantage of coprocess over multithreading is that it saves the switching overhead between multithreads and obtains greater operation efficiency. This article will not discuss the implementation mechanism of python collaboration, but show the most common usage of collaboration through simple examples. You can quickly start some high-performance web frameworks based on collaboration, such as FastAPI.

1. Housework

Suppose I have to do three kinds of housework, namely, boiling water, washing clothes and sweeping the floor. As a programmer, I always plan very orderly before work. Here are the specific task lines I plan:

  • The kettle receives water
  • Wait for the kettle to boil
  • Put clothes in the washing machine and add hot water
  • Wait for the washing machine to wash clothes
  • hang the clothes
  • Sweep the floor

Think about it. I'm like that hardworking CPU, but I still have many machines that can help me do these jobs. Maybe for the CPU, the network card is like a kettle and the hard disk is like a washing machine. Let's analyze again. Boiling water and washing clothes are the same kind of work. What we need to do is to connect the water to the kettle or put the clothes into the washing machine, and then turn on the switch. The machine will help us complete the specific details. Sweeping the floor is another kind of work, because there is no robot to do it for me, so I need to sweep it myself.

If boiling water and washing clothes are compared to IO tasks, sweeping the floor is a computing intensive task.

2. Procedure description

The housework program in the previous section simulates:

programhousework
Calculation 1 + 2The kettle receives water
Read an addend saved on another computer over the networkWait for the kettle to boil
Accumulate and sum to get the file name of the multiplier saved on diskPut clothes in the washing machine and add hot water
Read a saved multiplier from a disk fileWait for the washing machine to wash clothes
The result of accumulation is multiplied by the multiplierhang the clothes
Calculate the cumulative sum of 0 to 10000Sweep the floor
def get_network_number() -> int:
	"""Get an integer through the network""" 
	... 

def get_file_number(filename: str) -> int: 
	"""Read the disk file to get an integer""" 
	... 

def cumulative_sum(start: int, end: int) -> int: 
	"""Cumulative summation""" 
	sum = 0 
	for number in range(start, end): 
		sum += number 
		return sum 

def task(): 
	"""task""" 
	result = 1 + 2 
	network_number = get_network_number() 
	result += network_number 
	file_number = get_file_number(f"{result}.txt") 
	result *= file_number 
	sum = cumulative_sum(0, 10000) task() 

3. Problem analysis and procedure improvement

As our housekeeper, my mother couldn't stand it any longer. I was lazy at first. Sweeping the floor had nothing to do with the other two jobs. She could sweep the floor when the kettle was boiling water and the washing machine was working, so she began to direct me to work. For example, she arranged me to sweep the floor when the water was half boiled, and arranged me to boil water before the floor was swept.

Under the arrangement of the housekeeper, my human resources have been used efficiently, and it is difficult to have the opportunity to be idle.

The housekeeper is like our operating system, so we have the following optimized code:

from threading import Thread 

def get_network_number() -> int: 
	"""Get an integer through the network""" 
	... 

def get_file_number(filename: str) -> int: 
	"""Read the disk file to get an integer""" 
	... 

def cumulative_sum(start: int, end: int) -> int: 
	"""Cumulative summation""" 
	sum = 0 
	for number in range(start, end): 
		sum += number 
		return sum 

def task1():
	 ""Task 1""" 
	result = 1 + 2 
	network_number = get_network_number() 
	result += network_number 
	file_number = get_file_number() 
	result *= file_number 

def task2(): 
	"""Task 2""" 
	sum = cumulative_sum(0, 10000) 

t1 = Thread(target=task1) 
t2 = Thread(target=task2) 
t1.start() 
t2.start() 
t1.join() 
t2.join() 

Sweeping the floor doesn't have much to do with boiling water and washing clothes. It's a task that needs to be executed separately. The two tasks are concurrent, so we can arrange this task to another thread. So the CPU will switch back and forth between the two threads and execute two tasks at the same time.

This way of command by the operating system has a great disadvantage. It needs to switch tasks frequently, which wastes a lot of time.

4. Introduction of collaborative process

Smart people like me don't need the housekeeper's command. After the kettle receives the water and turns on the switch, I directly pick up the broom and start sweeping the floor. I won't wait foolishly anymore, so I have the following operation logic:

Task 1Task 2
The kettle receives water
Wait for the kettle to boil
Finish sweeping the floor
Put clothes in the washing machine and add hot water
Wait for the washing machine to wash clothes
hang the clothes

In this way, the work is much better than the housekeeper's command, and there is no need to waste the round-trip task switching time. You can arrange it yourself according to the situation. The following is the implementation of the latest collaboration Code:

import asyncio 

# Change io task to collaborative process 
async def get_network_number() -> int:
	"""Get an integer through the network""" 
	... 

# Change io task to collaborative process 
async def get_file_number(filename) -> int:
	"""Read the disk file to get an integer""" 
	... 

# Computing intensive tasks do not need to change the collaboration process 
def cumulative_sum(start: int, end: int) -> int: 
	"""Cumulative summation""" 
	sum = 0 
	for number in range(start, end): 
		sum += number 
		return sum 

async def task1(): 
	"""Task 1""" 
	result = 1 + 2 
	network_number = await get_network_number() 
	result += network_number 
	file_number = await get_file_number(f'{result}.txt') 
	result *= file_number 

async def task2(): 
	"""Task 2""" 
	sum = cumulative_sum(0, 10000) 

async def main(): 
	task1 = asyncio.create_task(task1()) 
	task2 = asyncio.create_task(task2()) 
	await task1 
	await task2 
	asyncio.run(main()) 

In the collaborative process version, we will find that sweeping the floor is a computationally intensive task, so we can't stop working. The water may have been burned, but we can't go back to washing clothes until we finish sweeping the floor.

In order to solve this problem, you can take the initiative to stop on the way to sweep the floor. You can do the sweeping work several times, so you can go and see if there are other jobs to do. In python collaboration, we can use asyncio Sleep lets us stop our work and do other work. The following is the transformation of computing intensive tasks:

async def cumulative_sum(start: int, end: int): 
	result = 0 
	for i in range(start, end): 
		if i % 100 == 0: 
		await asyncio.sleep(1) 
		result += i 
		return result 

async def task2(): 
	"""Task 2""" 
	sum = await cumulative_sum(0, 10000) 

Every time we add 100 times, we go to see if there are other jobs to do. For example, when the water is cooked, we can wash clothes. We put the clothes in the washing machine and come back. If there is no work, we will have a rest and continue to work when the time is up (sleep has a length of time).

The above idea is that we divide the task of sweeping the floor into several things.

5. Tasks and events

Through the analysis in the previous sections, we found two very important concepts from housework:

  • task
  • thing

We find that tasks are composed of many sequential things. When we complete all kinds of tasks, we are doing one thing at a time.

Looking back at python programs, we can find out what can correspond to things in daily life.

For task1

async def task1(): 
	"""Task 1""" 
	result = 1 + 2 
	network_number = await get_network_number() 
	result += network_number 
	file_number = await get_file_number(f"{result}.txt") 
	result *= file_number 

We found what these CPUs need to do (for simplicity, ignore the CPU usage when network requests and reading files, and also ignore the assignment operations of netwok_number and file_number):

  1. result = 1 + 2
  2. result += network_number
  3. result *= file_number

For task2

async def task2(): 
	"""Task 2""" 
	sum = await cumulative_sum(0, 10000) 

Because the logic executed by task2 is cumulative_sum, so we will continue to analyze cumulative_sum is the event generated by this process.

async def cumulative_sum(start: int, end: int): 
	result = 0 
	for i in range(start, end): 
		if i % 100 == 0: 
			await asyncio.sleep(1) 
	result += i 
	return result 

We regard 100 times as one thing, so task2 is composed of many things that accumulate 100 times. Through the above analysis, we can see that things in life are events in python collaboration, and await is an obvious event segmentation point. Our program can be composed of many concurrent tasks, which contain a large number of events. The smallest unit in the actual execution of the program is these events.

6. Event cycle

How should we complete the whole execution process of the program according to the idea of execution events?

We can create a loop that is used to execute events. At first, there was nothing in this cycle. Then we created a task. There were several events in this task, so we first put the first event in this task into the event cycle, and then the event cycle executed the event we put in. When this event ended, we put the events that need to be executed later into the event cycle, In this way, after the orderly addition of multiple events, the event cycle executes all the events in our task, and the task ends.

Because the event loop can only execute one event at a time, when we have several tasks, the events will queue up and wait for execution in turn.

7. Detailed discussion

Let's look at the operation of reading integers from files. The normal reading is as follows:

async def get_file_number(filename): 
	with open(filename) as f: 
		number = int(f.read()) 
		return number 

We found that there is no await in the read operation, and its execution is similar to that we did not join asyncio Cumulative of sleep_ Sum is the same, so the main program is waiting even when doing disk IO and will not execute other events. We need to modify the disk IO operation to maximize the utilization of CPU resources.

At this time, threads come in handy. The transformation provided by python is as follows:

import asyncio 

def _read_file(fd): 
	return fd.read() 

async def get_file_number(filename): 
	loop = asyncio.get_event_loop() 
	with open(filename) as f: 
	number = await loop.run_in_executor(None, _read_file, f) 
	return int(number) 

Through the scheduling of operating system threads, we separate the operation of disk IO and give certain execution rights to other events, just like two events can preempt CPU resources. The operating system decides which one to execute. Same time Sleep will also block the event loop, so asyncio should be used when using a coroutine sleep. The above transformation method can also be used for cumulative_ The transformation of sum replaces the original asyncio The transformation mode of sleep is changed to thread execution in python 3 9 has a better use of asyncio to_ Thread, you still have to carefully read the official python documents for the details of the use of the collaborative process.

8. Use of collaborative process

Through the detailed discussion in the previous section, two more questions are raised:

1) Why does disk IO need thread scheduling and network IO does not?

2) After the introduction of thread transformation, will there still be frequent task switching and waste CPU time? Will this be more efficient than multithreading?

Understanding the above two issues is the key to our flexible use of python collaboration. The following points are my personal understanding experience. I have not analyzed the source code, but for reference only:

  • There are synchronous and asynchronous ways in network programming. The asynchronous way is IO multiplexing.
  • The file descriptor types supported by IO multiplexing are related to the operating system.
  • The task switching in python collaboration depends on IO multiplexing.
  • Disk IO under Windows does not support IO multiplexing. Even if it is supported by the operating system, if the standard library is not encapsulated, we need to encapsulate it ourselves.
  • If network IO is not involved in the program, the use of collaborative process can not effectively reduce the cost of task switching, but the good synchronous programming method of collaborative process is still available.
  • Different programming languages have different implementation methods and application scenarios.

Keywords: Python Java Programming Linux Multithreading

Added by nadeemshafi9 on Sat, 22 Jan 2022 11:32:55 +0200