Association
What is synergy? A single thread handles multiple tasks concurrently, and the program controls the synergistic switching + holding state. The synergistic switching speed is very fast, which blinds the eyes of the operating system and makes the operating system think that the CPU is running all the time.
Processes or threads are controlled by the operating system to switch back and forth. When they encounter blockages, they switch to perform other tasks. The cooperation is programmed. They occupy the CPU to perform tasks. They switch back and forth before the operating system controls the CPU. The operating system thinks that the CPU has been running.
Advantages of the consortium:
1. Low overhead
2. Fast running speed
3. The consortium will occupy the CPU for a long time and only perform all the tasks in my program.
The shortcomings of the protocol are as follows:
1. Cooperative processes are micro-concurrent, and it is not easy to deal with too many tasks.
2. The essence of coroutines is single-threaded, which can not make use of multi-core. A program can open multiple processes, each process can open multiple threads. Each thread can open the coroutines.
It's better to deal with io-intensive in the cooperative process
The characteristics of the consortium are as follows:
1. Concurrency must be implemented in only one single thread
2. Modifying shared data does not require locks
3. Stay in good condition
4. A protocol automatically switches to another protocol when it encounters io operation
At work:
In general, in our work, we implement concurrency in the way of process + thread + co-process to achieve the best concurrency effect. If it is a 4-core cpu, there are usually five processes, 20 threads in each process (5 times the number of cpus), each thread can start 500 co-processes, waiting for network latency when crawling pages on a large scale. In time, we can achieve concurrency by using co-processes. The number of concurrencies = 5 * 20 * 500 = 50,000 concurrencies, which is the largest number of concurrencies for a general 4 CPU machine. The maximum load-carrying capacity of nginx in load balancing is 5w
#When a task is not blocked
import time
def task():
res = 1
for i in range(1,1000000):
res += 1
def task1():
res = 1
for i in range(1,1000000):
res -= 1
strt = time.time()
task()
task1()
end = time.time()
print(f"Serial execution efficiency:{end - strt}") #Serial execution efficiency: 0.07783079147338867
#The first contact is yield.
#No io blocking
import time
def task():
res = 1
for i in range(1,1000000):
res += 1
yield res
def task1():
g = task()
res = 1
for i in range(1,1000000):
res -= 1
next(g)
strt = time.time()
task()
task1()
end = time.time()
print(f"Cooperative Implementation Efficiency:{end - strt}")#Cooperative implementation efficiency: 0.21143341064453125
//The efficiency of serial execution is higher than that of coprocess in pure computation-intensive case without io
# It's not a real blockage.
import gevent
def eat(name):
print(f"{name} eat 1") #1
gevent.sleep(2) #The simulation is a gevent identifiable blocking
print(f"{name} eat 2") #4
def play(name):
print(f"{name} play 1") #2
gevent.sleep(1)
print(f"{name} play 2") #3
g1 = gevent.spawn(eat,'Eight quit')
g2 = gevent.spawn(play,name = 'name of a fictitious monkey with supernatural powers')
g1.join()
g2.join()
print("main") #5
import threading
from gevent import monkey
import gevent
import time
monkey.patch_all() # Patch: Mark the blockage of all the tasks below and switch if you encounter this marker
def eat():
print(f"Thread 1:{threading.current_thread().getName()}") #1
print('eat food 1') #2
time.sleep(2) #Blocking, to perform other tasks
print('eat food 2') #8
def play():
print(f"Thread 2:{threading.current_thread().getName()}") #4
print('play 1') #5
time.sleep(1) #6. Blocking, then performing other tasks, the last task is still blocking, continue to carry out downward.
print('play 2') #7
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
print(f"{threading.current_thread().getName()}") #9
//Pay attention to blocking time in each task
Event event
Add global variables,Modify global variables,Implementing a thread on a node to allow the next thread to continue working
import time
from threading import Thread
from threading import current_thread
flag = False
def task():
print(f"{current_thread().name}Check whether the server is open properly.....")
time.sleep(2)
global flag
flag = True
def task1():
while 1:
time.sleep(1)
print(f"{current_thread().name}Connecting to the server....")
if flag:
print(f"{current_thread().name}Successful connection")
return
if __name__ == '__main__':
t1 = Thread(target=task1)
t2 = Thread(target=task1)
t3 = Thread(target=task1)
t = Thread(target=task)
t.start()
t1.start()
t2.start()
t3.start()
import time
from threading import Thread
from threading import current_thread
from threading import Event
import random
event = Event() #Default to False
def task1():
print(f"{current_thread().getName()} Check whether the server is open or not...") #Get the thread name
time.sleep(random.randint(1,3))
def task2():
print(f"{current_thread().getName()}Attempting to connect to the server...")
count = 1
while count < 4:
time.sleep(1)
print(f"{current_thread().getName()}Connection section{count}second")
if count < 4:
time.sleep(0.5)
event.set() #Modify event status to True
print('Successful connection')
count += 1
t1 = Thread(target=task1)
t2 = Thread(target=task2)
t1.start()
t2.start()
import time
from threading import Thread
from threading import current_thread
from threading import Event
import random
event = Event()
def check():
print(f"{current_thread().name}Check whether the server is open...")
time.sleep(random.randint(1,3))
event.set()
print("Successful connection")
def connect():
count = 1
while not event.is_set(): #event.is_set() determines whether the state is True or False
if count == 4:
print('Too many connections,Disconnected')
break
event.wait(1) #Polling Detection event state,If true,Execute downward,It's a blockage., # Block only for 1 second, and then proceed to the next step if you haven't set yet.
print(f"{current_thread().name}Attempt to connect{count}second")
count += 1
else:
print(f"{current_thread().name}Successful connection")
t1 = Thread(target=check)
t2 = Thread(target=connect)
t1.start()
t2.start()
Thread queue
import queue
# First in first out principle
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
# print(q.get(block=False)) #Setting up multiple fetches will report errors
q.get(timeout=2)#Block for two seconds, then report an error
# print(q.get()) blocked by one more
# FIFO
q = queue.LifoQueue(4)
q.put(1)
q.put(2)
q.put('alex')
q.put('Taibai')
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# Taibai
# alex
# 2
# 1
//Priority queue
//Minimum first-out
q = queue.PriorityQueue(4)
q.put((1,'qq'))
q.put((-2,'qq1'))
q.put((0,'qq2'))
print(q.get())
print(q.get())
print(q.get())
#(-2, 'qq1')
#(0, 'qq2')
#(1, 'qq')
synchronization
Synchronized invocation(After submitting a task, wait in place for the task to complete, get the result, and then execute the next line of code)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f"{os.getpid()}Start the Mission")
time.sleep(random.randint(1,3))
print(f"{os.getpid()}End the Mission")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
for i in range(10): #Start 10 processes at the same time
obj = pool.submit(task,i) #Similar to publishing tasks
print(f"Task results:{obj.result()}") # Object plus result() becomes synchronization, and obj is a dynamic object, while running, pending, finishing is a dynamic object. result() obtains the result.
#obj.result() must wait until the task is completed, return the result, and perform the next task
pool.shutdown(wait=True) #shutdown lets my main process wait for all the subprocesses in the process pool to finish and execute, similar to join
#It is not allowed to add new tasks until the last process pool has completed its task. A task is implemented by a function whose return value is the return value of the function.
print("main")
asynchronous
Asynchronous call(After submitting a task, no longer wait in situ for the task to complete)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f"{os.getpid()}Start the Mission")
time.sleep(random.randint(1,3))
print(f"{os.getpid()}End the Mission")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
for i in range(10):
pool.submit(task,i)
pool.shutdown(wait=True) #Waiting for all child processes in the process pool to finish executing downward, similar to join
print("main")
//There is a list, which unifies the results and puts the dynamic objects in the list. (Container)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
def task(i):
print(f"{os.getpid()}Start the Mission")
time.sleep(random.randint(1,3))
print(f"{os.getpid()}End the Mission")
return i
if __name__ == '__main__':
pool = ProcessPoolExecutor()
l = []
for i in range(10):
obj = pool.submit(task,i)
l.append(obj)
pool.shutdown(wait=True)
for i in l:
print(i.result())
print("main")
//Although the task was released at the same time, when the result was recovered, the return value of the end task could not be received immediately, but the result could only be received uniformly after all the tasks were completed.
//How to get the value of an asynchronous call?
1.
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
import requests
# The simulation is that crawling multiple source code must have IO operation.
def task(url):
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(content):
return len(content)
if __name__ == '__main__':
ret = task('http://www.baidu.com')
print(parse(ret))
ret = task('http://www.JD.com')
print(parse(ret))
ret = task('http://www.taobao.com')
print(parse(ret))
ret = task('https://www.cnblogs.com/jin-xin/articles/7459977.html')
print(parse(ret))
#This is written as executing a task function, executing a parse function, inefficient execution, a task to execute 2 seconds, 20 tasks is 40 seconds, time-consuming.
#These two functions are one task, and when one task is completed, the next task is executed serially.
2.
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os
import requests
# The simulation is that crawling multiple source code must have IO operation.
def task(url):
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(content):
return len(content)
if __name__ == '__main__':
# Open a thread pool and execute concurrently
url_list = ['http://www.baidu.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html'
]
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task,url)
obj_list.append(obj)
pool.shutdown(wait=True)
#This for loop adds crawled source code to a class table, objects, and executes task functions
for i in obj_list:
print(parse(i.result()))
//Loop out the elements in the class table and pass them into the parse function as parameters for data analysis.
//Disadvantages of the above version: 10 tasks are sent asynchronously and tasks are executed concurrently, but the flow of analysis results is serial, and a return value is returned without completing a task, which is inefficient.
3.
def task(url):
ret = requests.get(url)
if ret.status_code == 200:
return parse(ret.text) #Change the function, return directly and call the parse function
def parse(content):
return len(content)
if __name__ == '__main__':
# Open a thread pool and execute concurrently
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/'
]
pool = ThreadPoolExecutor(4)
obj_list = []
for url in url_list:
obj = pool.submit(task,url)
obj_list.append(obj)
pool.shutdown(wait=True)
for i in obj_list:
print(i.result())#Return the result directly without calling the function. Print out the result directly and completely.
4.Asynchronous call+callback**************************
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import random
import os
import requests
def task(url):
'''The simulation is that crawling multiple source code must have IO operation'''
ret = requests.get(url)
if ret.status_code == 200:
return ret.text
def parse(obj):
'''Analog analysis of data is generally not available. IO'''
print(len(obj.result()))
if __name__ == '__main__':
# Open thread pool, concurrent and parallel execution
url_list = [
'http://www.baidu.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.JD.com',
'http://www.taobao.com',
'https://www.cnblogs.com/jin-xin/articles/7459977.html',
'https://www.luffycity.com/',
'https://www.cnblogs.com/jin-xin/articles/9811379.html',
'https://www.cnblogs.com/jin-xin/articles/11245654.html',
'https://www.sina.com.cn/'
]
pool = ThreadPoolExecutor(4)
for url in url_list:
obj = pool.submit(task, url)
obj.add_done_callback(parse)
#add_done_callback callback function, which returns the return value of a function when performing a task, adds a callback function and executes the line directly after issuing a task. When the same process executes its first task, the main process immediately analyses its results, without waiting for all processes to complete the analysis, thus improving efficiency.
//Thread pool set up 4 threads, asynchronously initiate 10 tasks, each execution of 4 tasks, the time to complete the task must have a sequence, first completed, parse analysis of code tasks
//Give it to an idle thread to execute, and then the thread will execute other tasks concurrently, rather than crawling the source code and analyzing the data.
//Two ways to collect results asynchronously?
1.Agree to recycle the results of all tasks
2.Complete a task,Returns a result
//Asynchronous + callback function
//Callback function: Receive the results of each task in sequence and proceed to the next step
//Premises:
//Multiple tasks (io tasks) handled asynchronously, and non-io tasks handled by callback functions
//Difference:
//Multiprocess, code that executes callback functions by the main process
//Multithread, code that executes callback functions from idle threads