Deadlocks and recursive locks (both threads and processes, understand)
When you know that the use of locks requires the release of locks, you are also extremely prone to deadlocks when operating on locks (the entire program gets stuck)
# Deadlock phenomenon: Deadlock refers to a deadlock caused by competing for resources during the running of multiple processes. When processes are in this deadlock state, they will no longer be able to move forward without external forces. So let's take an example to show that if at this point a thread A acquires the lock in the order lock a and then lock b, while at the same time another thread B acquires the lock in the order lock B and then lock a. from threading import Thread, Lock import time mutexA = Lock() mutexB = Lock() # Classes that are bracketed multiple times must produce different objects # If you want to achieve multiple parentheses until the same object - singleton mode class MyThead(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('%s Grab A lock'% self.name) # Get the current thread name mutexB.acquire() print('%s Grab B lock'% self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('%s Grab B lock'% self.name) time.sleep(2) mutexA.acquire() print('%s Grab A lock'% self.name) # Get the current thread name mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t = MyThead() t.start() # Thread A can only use lock after it has been released. If Thread A holds lock A and Thread B holds lock B, Thread A waits for Thread B to release lock B. Thread B also waits for Thread A to release lock A before it gets lock A, so both threads are stuck and the program cannot continue running.
Recursive locks (understand): resolving deadlock issues
""" Features of recursive locks Can be continuous acquire and release But only the first to grab the lock to do the above It has a counter inside each acquire Count once plus one per realse One count minus one No one else can grab the lock as long as the count is not zero """ # Put the above mutexA = Lock() mutexB = Lock() # change into mutexA = mutexB = RLock()
Semaphore (Understanding)
Semaphores may correspond to different technical points at different stages
In concurrent programming, semaphores refer to locks
""" If we compare mutually exclusive locks to a toilet So the semaphore is equivalent to more than one toilet """ from threading import Thread, Semaphore import time import random # Print random Authentication Code with random Module (a pen test for Sogou) sm = Semaphore(5) # Numbers in parentheses set the number of'toilets'. When 1 is written inside, semaphores actually limit the number of threads accessing a task, just like mutexes def task(name): sm.acquire() print('%s Squatting pit'% name) time.sleep(random.randint(1, 5)) sm.release() if __name__ == '__main__': for i in range(20): t = Thread(target=task, args=('Parachute%s Number'%i, )) t.start()
Event Events (Understanding)
Some processes/threads need to wait for others to finish running before they can run, similar to signaling
from threading import Thread, Event import time event = Event() # Introducing Events def light(): print('Red light on') time.sleep(3) print('The green light is on') # Tell those waiting for the red light to go event.set() # Set the event to which wait can continue executing down def car(name): print('%s Car is on red light'%name) event.wait() # Waiting for someone to signal you print('%s Gas Valve Speed Away'%name) if __name__ == '__main__': t = Thread(target=light) t.start() for i in range(20): t = Thread(target=car, args=('%s'%i, )) t.start() # You need to wait until the light ed thread above has finished executing before continuing down
Thread Q (Understanding)
""" Multiple threads share data under the same process. Why use queues under the same process, because the queues are: The Conduit + lock Use queues or for data security """ import queue # Queues we are using now are only available for local testing # 1 Queue q FIFO q = queue.Queue(3) q.put(1) q.get() q.get_nowait() q.get(timeout=3) q.full() q.empty() # 2LIFO q q = queue.LifoQueue(3) # last in first out q.put(1) q.put(2) q.put(3) print(q.get()) # 3 # 3 Priority Queue: You can set the priority of data that is put in and out of the queue q = queue.PriorityQueue(4) q.put((10, '111')) q.put((100, '222')) q.put((0, '333')) q.put((-5, '444')) print(q.get()) # (-5, '444') # put a tuple in parentheses, and the first number indicates priority # Note that the smaller the number, the higher the priority
Process and Thread Pools (Master)
First review how TCP servers used to achieve concurrency
Every time a person starts a process or thread to process
""" Whether you start a process or a thread, you need to consume resources Only opening threads consumes slightly less than opening processes It is impossible for us to open processes and threads indefinitely because the resources of computer hardware can't keep up with them Hardware development is far behind software Our aim should be to maximize the use of computer hardware while ensuring it works properly """ # The concept of pools """ What is a pool? Pools are used to maximize the use of computers while ensuring the security of computer hardware It reduces the efficiency of the program but ensures the security of the computer hardware so that the program you write can run properly """
Basic Use
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time import os pool = ThreadPoolExecutor(5) # There are only five threads fixed in the pool # Numbers can be passed in parentheses. By default, threads that are five times the number of CPUs on the current computer will be opened pool = ProcessPoolExecutor(5) # Numbers can be passed in parentheses. If you do not, the current number of computer cpu processes will be opened by default def task(n): print(n,os.getpid()) time.sleep(2) return n**n def call_back(n): print('call_back>>>:',n.result()) """ Task submission method synchronization:Do nothing while waiting in place for the task to return after submitting it asynchronous:Do not wait for the task to return after submitting it, execution proceeds Return results from asynchronous submission tasks should be obtained through callback mechanisms """ # The code commented below is a way to put the resulting future object in a list and then pull the object out of the list as a result() if __name__ == '__main__': # pool.submit(task, 1) # Submit task to pool, default asynchronous, 1 is the parameter of task # t_list = [] for i in range(20): # Submit 20 tasks to the pool # res = pool.submit(task, i) # The result is actually a <Future at 0x100f97b38 state=running>object # print(res.result()) # Using the result method, you can get the value obtained by the above object, but the result is a synchronous commit, somewhat similar to the join method, so it is not recommended to use res = pool.submit(task, i).add_done_callback(call_back) # This is the legendary callback mechanism, simply by passing in a function when an asynchronous thread passes through call_ When the back function returns a value, it automatically returns the result of the function # t_list.append(res) # Wait until all tasks in the thread pool have been executed before proceeding further # pool.shutdown() # Close the thread pool and wait for all tasks in the thread pool to finish running # for t in t_list: # print('>>>:',t.result()) # It must be ordered, because it's ordered when you put it in the list above """ Programs have concurrency and become serial Why does the task print None res.result() What you get is the return of the asynchronously submitted task """
Thread pool code summary
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor pool = ProcessPoolExecutor(5) pool.submit(task, i).add_done_callback(call_back) # Asynchronous Callback Mechanism
Protocol
""" process:Resource Units thread:Execution Unit Concurrency under single thread Protocol: This concept was entirely conceived by our programmers Multichannel Technology switch+Save State We want to detect ourselves at the code level IO Behavior. Once encountered IO Code-level switch This gives the operating system the feeling that I have been running this program without IO Decepting the operating system to maximize its use CPU Blind switching and saving may also reduce program efficiency Computing intensive switching reduces efficiency IO Dense Switching will improve efficiency """
gevent module (understand)
This module helps to detect IO and switch tasks when a program encounters IO
# install pip3 install gevent
from gevent import monkey;monkey.patch_all() import time from gevent import spawn """ gevent The module itself cannot detect some common io operation You need to import an extra sentence when using it from gevent import monkey monkey.patch_all() Because the above two sentences are used gevent Module is definitely imported So abbreviation is also supported from gevent import monkey;monkey.patch_all() """ def heng(): print('Hum') time.sleep(2) print('Hum') def ha(): print('Ha') time.sleep(3) print('Ha') def heiheihei(): print('heiheihei') time.sleep(5) print('heiheihei') start_time = time.time() g1 = spawn(heng) # Detect IO, note: spawn is submitted asynchronously at the time of detection g2 = spawn(ha) g3 = spawn(heiheihei) g1.join() g2.join() # Wait until the detected task is completed before proceeding g3.join() # heng() # ha() # print(time.time() - start_time) # 5.005702018737793 print(time.time() - start_time) # 3.004199981689453 5.005439043045044
Collaboration for TCP server side concurrency
# Server from gevent import monkey;monkey.patch_all() import socket from gevent import spawn def communication(conn): while True: try: data = conn.recv(1024) if len(data) == 0: break conn.send(data.upper()) except ConnectionResetError as e: print(e) break conn.close() def server(ip, port): server = socket.socket() server.bind((ip, port)) server.listen(5) while True: conn, addr = server.accept() spawn(communication, conn) if __name__ == '__main__': g1 = spawn(server, '127.0.0.1', 8080) g1.join() # Client from threading import Thread, current_thread import socket def x_client(): client = socket.socket() client.connect(('127.0.0.1',8080)) n = 0 while True: msg = '%s say hello %s'%(current_thread().name,n) n += 1 client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8')) if __name__ == '__main__': for i in range(500): t = Thread(target=x_client) t.start()
Introduction to IO Model
""" What we're studying here IO Models are all for networks IO Of Stevens Five types are compared in the article IO Model(Model: * blocking IO block IO * nonblocking IO Non-blocking IO * IO multiplexing IO Multiplex * signal driven IO Signal Driven IO * asynchronous IO asynchronous IO Because signal driven IO(Signal Driven IO)Not commonly used in practice, so the remaining four are introduced IO Model. """ # A blocked program goes through the following two processes: # 1) Waiting for the data to be ready # 2) Copying the data from the kernel to the process ''' Common network congestion States: accept recv recvfrom send although There are also io Behavior but not within our consideration '''
Blocking IO Model
""" We've written blocking before IO Model, except for coprocess In fact, when an application establishes a link, it needs to apply to the operating system, that is, the system call, the operating system calls the network card to establish the link, and the operating system puts the received data into memory after the connection is established. Blocking mainly exists in accept,recv,recvfrom,Because applications need to always request system calls from the kernel (which can be interpreted as the operating system), they cannot copy data into the process until the link is established, or they will have to wait all the time """ import socket server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) while True: conn, addr = server.accept() while True: try: data = conn.recv(1024) if len(data) == 0:break print(data) conn.send(data.upper()) except ConnectionResetError as e: break conn.close() # Opening a multi-process or multi-threaded process pool thread pool on the server side does not actually solve the IO problem # A place like this has to wait without evading IO # It's just that multiple people don't interfere with each other while waiting
non-blocking IO
""" To achieve a non-blocking IO Model The solution is actually to set up a link. accept The problem that has been waiting for is that the server first requests a system call, if the connection is already established, then uses it directly, if not, returns a result, and the server asks if other links are established or does something else. """ import socket import time server = socket.socket() server.bind(('127.0.0.1', 8081)) server.listen(5) server.setblocking(False) # Change all network blockages to non-blocking r_list = [] del_list = [] while True: try: conn, addr = server.accept() r_list.append(conn) except BlockingIOError: # time.sleep(0.1) # print('list length:', len(r_list))) # print('do something else') for conn in r_list: try: data = conn.recv(1024) # No message error if len(data) == 0: # Client disconnects conn.close() # Close conn # Remove useless Conns from r_list deletion del_list.append(conn) continue conn.send(data.upper()) except BlockingIOError: continue except ConnectionResetError: conn.close() del_list.append(conn) # Waving useless links for conn in del_list: r_list.remove(conn) del_list.clear() # Client import socket client = socket.socket() client.connect(('127.0.0.1',8081)) while True: client.send(b'hello world') data = client.recv(1024) print(data) """ Although not blocked IO It feels so good to you But the model takes up a long time CPU And don't work to make CPU Keep Switching We will not consider non-blocking in our practical applications IO Model """
IO Multiplex
""" When there is only one object under supervision IO Multiplex Connection Blocking IO Can't compare!!! however IO Multiplex can monitor many objects at once Regulatory mechanisms are inherent to the operating system if you want to use them(select) Need you to import the corresponding select Modular When the user process calls select,Then the whole process will be block,At the same time, kernel Will "monitor" all select Conscientious socket,When any one socket The data in is ready. select Will return. This time the user process calls again read Operation, data from kernel Copy to user process. This graph and blocking IO The graph is not really very different, but actually worse. Because there are two system calls required(select and recvfrom),and blocking IO Only one system call was invoked(recvfrom). However, with select The advantage is that it can handle multiple at once connection """ import socket import select server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) server.setblocking(False) # The default is true, and when set to false, the program does not wait here, but goes straight down read_list = [server] while True: r_list, w_list, x_list = select.select(read_list, [], []) """ Help you regulate Return to your supervisor as soon as someone comes """ # print(res) # ([<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>], [], []) # print(server) # print(r_list) for i in r_list: # """Different processing for different objects""" if i is server: conn, addr = i.accept() # It should also be added to the regulated queue read_list.append(conn) else: res = i.recv(1024) if len(res) == 0: i.close() # Remove invalid supervisory objects read_list.remove(i) continue print(res) i.send(b'heiheiheiheihei') # Client import socket client = socket.socket() client.connect(('127.0.0.1',8080)) while True: client.send(b'hello world') data = client.recv(1024) print(data) """ There are actually many regulatory mechanisms select mechanism windows linux All have poll mechanism Only in linux Yes poll and select Multiple objects can be monitored but poll More regulation Above select and poll Mechanisms are not perfect when there are especially many regulators Extremely large delay response may occur epoll Mechanisms only exist in linux Yes It binds a callback mechanism to each supervisor Alert as soon as there is a response callback mechanism Writing code with different detection mechanisms is too cumbersome for different operating systems There's one person who can automatically help you choose the appropriate regulatory mechanism based on the platform you're running on selectors Modular """
Asynchronous IO
""" asynchronous IO Models are the most efficient and widely used of all models Related modules and frameworks Modular:asyncio Module ( async Actually asynchronous) Asynchronous Framework:sanic tronado twisted Fast User Process Initiation read Once you've done it, you can start doing something else right away. On the other hand, from kernel The angle, when it is subjected to a asynchronous read After that, it returns immediately, so nothing happens to the user process block. Then? kernel It waits for the data to be ready, then copies it to user memory, and when it's all done, kernel Send a user process signal,Tell it read The operation is complete (simply, apply to the operating system first, then you can do other things, the remaining tasks will be helped by the operating system, after which the signal will be sent to the process, the process will continue to do the remaining things) """ import threading import asyncio @asyncio.coroutine def hello(): print('hello world %s'%threading.current_thread()) yield from asyncio.sleep(1) # IO operations simulated here print('hello world %s' % threading.current_thread()) loop = asyncio.get_event_loop() tasks = [hello(),hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
Comparison of four IO models
So far, all four IO Model s have been described. Now go back and answer the first few questions: what is the difference between blocking and non-blocking, and what is the difference between synchronous IO and asynchronous IO.
First answer the simplest one: blocking vs non-blocking. The differences between the two are clearly illustrated in the previous introduction. Calling blocking IO keeps the corresponding process blocked until the operation is complete, while non-blocking IO returns immediately when the kernel s are ready for data.
Before explaining the difference between synchronous IO and asynchronous IO, you need to define them first. The definition given by Stevens (actually the definition of POSIX) looks like this:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
The difference is that synchronous IO blocks the process when it does an IO operation. By this definition, the four IO models can be divided into two categories: blocking IO, non-blocking IO, and IO multiplexing, all of which belong to synchronous IO, while asynchronous I/O is the latter.
It might be said that non-blocking IO is not blocked. Here's a very "tricky" place where "IO operation" in the definition refers to real IO operations, such as the recvfrom system call in the example. When non-blocking IO executes the recvfrom system call, it will not block the process if the kernels'data is not ready. However, when the data in the kernels is ready, recvfrom copies the data from the kernels into user memory, at which point the process is blocked, during which time the process is blocked. Unlike asynchronous IO, when a process initiates an IO operation, it returns directly and ignores it until kernel sends a signal telling the process that IO is complete. Throughout this process, there is no block at all.
A comparison of the IO Model s is shown in the figure:
The differences between non-blocking IO and asynchronous IO are obvious as described above. In non-blocking IO, although a process is not blocked most of the time, it still requires the process to actively check, and when the data is ready, it also requires the process to actively call recvfrom again to copy the data to user memory. Asynchronous IO is completely different. It's like a user process handing over the entire IO operation to someone else (the kernel) and signaling when that person finishes. During this time, the user process does not need to check the status of the IO operation or actively de-copy the data.