Process Thread Supplement

Deadlocks and recursive locks (both threads and processes, understand)

When you know that the use of locks requires the release of locks, you are also extremely prone to deadlocks when operating on locks (the entire program gets stuck)

# Deadlock phenomenon: Deadlock refers to a deadlock caused by competing for resources during the running of multiple processes. When processes are in this deadlock state, they will no longer be able to move forward without external forces. So let's take an example to show that if at this point a thread A acquires the lock in the order lock a and then lock b, while at the same time another thread B acquires the lock in the order lock B and then lock a.
from threading import Thread, Lock
import time

mutexA = Lock()
mutexB = Lock()
# Classes that are bracketed multiple times must produce different objects
# If you want to achieve multiple parentheses until the same object - singleton mode

class MyThead(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('%s Grab A lock'% self.name)  # Get the current thread name
        mutexB.acquire()
        print('%s Grab B lock'% self.name)
        mutexB.release()
        mutexA.release()
        
    def func2(self):
        mutexB.acquire()
        print('%s Grab B lock'% self.name)
        time.sleep(2)
        mutexA.acquire()
        print('%s Grab A lock'% self.name)  # Get the current thread name
        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t = MyThead()
        t.start()
# Thread A can only use lock after it has been released. If Thread A holds lock A and Thread B holds lock B, Thread A waits for Thread B to release lock B. Thread B also waits for Thread A to release lock A before it gets lock A, so both threads are stuck and the program cannot continue running.

Recursive locks (understand): resolving deadlock issues

"""
Features of recursive locks	
	Can be continuous acquire and release
	But only the first to grab the lock to do the above
	It has a counter inside each acquire Count once plus one per realse One count minus one
	No one else can grab the lock as long as the count is not zero
"""
# Put the above
mutexA = Lock()
mutexB = Lock()
# change into
mutexA = mutexB = RLock()

Semaphore (Understanding)

Semaphores may correspond to different technical points at different stages

In concurrent programming, semaphores refer to locks

"""
If we compare mutually exclusive locks to a toilet
 So the semaphore is equivalent to more than one toilet
"""
from threading import Thread, Semaphore
import time
import random
# Print random Authentication Code with random Module (a pen test for Sogou)
sm = Semaphore(5)  # Numbers in parentheses set the number of'toilets'. When 1 is written inside, semaphores actually limit the number of threads accessing a task, just like mutexes

def task(name):
    sm.acquire()
    print('%s Squatting pit'% name)
    time.sleep(random.randint(1, 5))
    sm.release()

if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=task, args=('Parachute%s Number'%i, ))
        t.start()

Event Events (Understanding)

Some processes/threads need to wait for others to finish running before they can run, similar to signaling

from threading import Thread, Event
import time

event = Event()  # Introducing Events

def light():
    print('Red light on')
    time.sleep(3)
    print('The green light is on')
    # Tell those waiting for the red light to go
    event.set()  # Set the event to which wait can continue executing down

def car(name):
    print('%s Car is on red light'%name)
    event.wait()  # Waiting for someone to signal you
    print('%s Gas Valve Speed Away'%name)

if __name__ == '__main__':
    t = Thread(target=light)
    t.start()

    for i in range(20):
        t = Thread(target=car, args=('%s'%i, ))
        t.start()  # You need to wait until the light ed thread above has finished executing before continuing down

Thread Q (Understanding)

"""
Multiple threads share data under the same process. Why use queues under the same process, because the queues are:
    The Conduit + lock
 Use queues or for data security
"""
import queue

# Queues we are using now are only available for local testing

# 1 Queue q FIFO
q = queue.Queue(3)
q.put(1)
q.get()
q.get_nowait()
q.get(timeout=3)
q.full()
q.empty()

# 2LIFO q
q = queue.LifoQueue(3)  # last in first out
q.put(1)
q.put(2)
q.put(3)
print(q.get())  # 3

# 3 Priority Queue: You can set the priority of data that is put in and out of the queue
q = queue.PriorityQueue(4)
q.put((10, '111'))
q.put((100, '222'))
q.put((0, '333'))
q.put((-5, '444'))
print(q.get())  # (-5, '444')
# put a tuple in parentheses, and the first number indicates priority
# Note that the smaller the number, the higher the priority

Process and Thread Pools (Master)

First review how TCP servers used to achieve concurrency

Every time a person starts a process or thread to process

"""
Whether you start a process or a thread, you need to consume resources
 Only opening threads consumes slightly less than opening processes
 It is impossible for us to open processes and threads indefinitely because the resources of computer hardware can't keep up with them
 Hardware development is far behind software
 Our aim should be to maximize the use of computer hardware while ensuring it works properly
"""
# The concept of pools
"""
What is a pool?
	Pools are used to maximize the use of computers while ensuring the security of computer hardware
	It reduces the efficiency of the program but ensures the security of the computer hardware so that the program you write can run properly
"""

Basic Use

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os

pool = ThreadPoolExecutor(5)  # There are only five threads fixed in the pool
# Numbers can be passed in parentheses. By default, threads that are five times the number of CPUs on the current computer will be opened
pool = ProcessPoolExecutor(5)
# Numbers can be passed in parentheses. If you do not, the current number of computer cpu processes will be opened by default

def task(n):
    print(n,os.getpid())
    time.sleep(2)
    return n**n

def call_back(n):
    print('call_back>>>:',n.result())
"""
Task submission method
    synchronization:Do nothing while waiting in place for the task to return after submitting it
    asynchronous:Do not wait for the task to return after submitting it, execution proceeds
        Return results from asynchronous submission tasks should be obtained through callback mechanisms
"""
# The code commented below is a way to put the resulting future object in a list and then pull the object out of the list as a result()
if __name__ == '__main__':
    # pool.submit(task, 1)  # Submit task to pool, default asynchronous, 1 is the parameter of task
    # t_list = []
    for i in range(20):  # Submit 20 tasks to the pool
        # res = pool.submit(task, i)  # The result is actually a <Future at 0x100f97b38 state=running>object
        # print(res.result())  # Using the result method, you can get the value obtained by the above object, but the result is a synchronous commit, somewhat similar to the join method, so it is not recommended to use
        res = pool.submit(task, i).add_done_callback(call_back)  # This is the legendary callback mechanism, simply by passing in a function when an asynchronous thread passes through call_ When the back function returns a value, it automatically returns the result of the function
        # t_list.append(res)
    # Wait until all tasks in the thread pool have been executed before proceeding further
    # pool.shutdown()  # Close the thread pool and wait for all tasks in the thread pool to finish running
    # for t in t_list:
    #     print('>>>:',t.result())  # It must be ordered, because it's ordered when you put it in the list above
"""
Programs have concurrency and become serial
 Why does the task print None
res.result() What you get is the return of the asynchronously submitted task
"""

Thread pool code summary

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
pool.submit(task, i).add_done_callback(call_back)  # Asynchronous Callback Mechanism

Protocol

"""
process:Resource Units
 thread:Execution Unit
 Concurrency under single thread
 Protocol: This concept was entirely conceived by our programmers

Multichannel Technology
	switch+Save State

We want to detect ourselves at the code level IO Behavior. Once encountered IO Code-level switch
 This gives the operating system the feeling that I have been running this program without IO
 Decepting the operating system to maximize its use CPU

Blind switching and saving may also reduce program efficiency
 Computing intensive switching reduces efficiency
IO Dense    Switching will improve efficiency

"""

gevent module (understand)

This module helps to detect IO and switch tasks when a program encounters IO

# install
pip3 install gevent
from gevent import monkey;monkey.patch_all()
import time
from gevent import spawn

"""
gevent The module itself cannot detect some common io operation
 You need to import an extra sentence when using it
from gevent import monkey
monkey.patch_all()
Because the above two sentences are used gevent Module is definitely imported
 So abbreviation is also supported
from gevent import monkey;monkey.patch_all()
"""
def heng():
    print('Hum')
    time.sleep(2)
    print('Hum')

def ha():
    print('Ha')
    time.sleep(3)
    print('Ha')

def heiheihei():
    print('heiheihei')
    time.sleep(5)
    print('heiheihei')

start_time = time.time()
g1 = spawn(heng)  # Detect IO, note: spawn is submitted asynchronously at the time of detection
g2 = spawn(ha)
g3 = spawn(heiheihei)
g1.join()
g2.join()  # Wait until the detected task is completed before proceeding
g3.join()
# heng()
# ha()
# print(time.time() - start_time)  # 5.005702018737793
print(time.time() - start_time)  # 3.004199981689453   5.005439043045044

Collaboration for TCP server side concurrency

# Server
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn

def communication(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            conn.send(data.upper())
        except ConnectionResetError as e:
            print(e)
            break
    conn.close()

def server(ip, port):
    server = socket.socket()
    server.bind((ip, port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        spawn(communication, conn)

if __name__ == '__main__':
    g1 = spawn(server, '127.0.0.1', 8080)
    g1.join()
 
# Client
from threading import Thread, current_thread
import socket

def x_client():
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 0
    while True:
        msg = '%s say hello %s'%(current_thread().name,n)
        n += 1
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data.decode('utf-8'))

if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=x_client)
        t.start()

Introduction to IO Model

"""
What we're studying here IO Models are all for networks IO Of
Stevens Five types are compared in the article IO Model(Model:
    * blocking IO           block IO
    * nonblocking IO        Non-blocking IO
    * IO multiplexing       IO Multiplex
    * signal driven IO      Signal Driven IO
    * asynchronous IO       asynchronous IO
    Because signal driven IO(Signal Driven IO)Not commonly used in practice, so the remaining four are introduced IO Model. 
"""
# A blocked program goes through the following two processes:
# 1) Waiting for the data to be ready
# 2) Copying the data from the kernel to the process
'''
Common network congestion States:
  	accept
    recv
    recvfrom
    
    send although	There are also io Behavior but not within our consideration
'''

Blocking IO Model

"""
We've written blocking before IO Model, except for coprocess

In fact, when an application establishes a link, it needs to apply to the operating system, that is, the system call, the operating system calls the network card to establish the link, and the operating system puts the received data into memory after the connection is established.

Blocking mainly exists in accept,recv,recvfrom,Because applications need to always request system calls from the kernel (which can be interpreted as the operating system), they cannot copy data into the process until the link is established, or they will have to wait all the time
"""
import socket

server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)

while True:
    conn, addr = server.accept()
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0:break
            print(data)
            conn.send(data.upper())
        except ConnectionResetError as e:
            break
    conn.close()
    
# Opening a multi-process or multi-threaded process pool thread pool on the server side does not actually solve the IO problem	
# A place like this has to wait without evading IO
# It's just that multiple people don't interfere with each other while waiting

non-blocking IO

"""
To achieve a non-blocking IO Model

The solution is actually to set up a link. accept The problem that has been waiting for is that the server first requests a system call, if the connection is already established, then uses it directly, if not, returns a result, and the server asks if other links are established or does something else.
"""
import socket
import time

server = socket.socket()
server.bind(('127.0.0.1', 8081))
server.listen(5)
server.setblocking(False)
# Change all network blockages to non-blocking
r_list = []
del_list = []
while True:
    try:
        conn, addr = server.accept()
        r_list.append(conn)
    except BlockingIOError:
        # time.sleep(0.1)
        # print('list length:', len(r_list)))
        # print('do something else')
        for conn in r_list:
            try:
                data = conn.recv(1024)  # No message error
                if len(data) == 0:  # Client disconnects
                    conn.close()  # Close conn
                    # Remove useless Conns from r_list deletion
                    del_list.append(conn)
                    continue
                conn.send(data.upper())
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_list.append(conn)
        # Waving useless links
        for conn in del_list:
            r_list.remove(conn)
        del_list.clear()

# Client
import socket

client = socket.socket()
client.connect(('127.0.0.1',8081))

while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data)
    
"""
Although not blocked IO It feels so good to you
 But the model takes up a long time CPU And don't work to make CPU Keep Switching
 We will not consider non-blocking in our practical applications IO Model
"""

IO Multiplex

"""
When there is only one object under supervision IO Multiplex Connection Blocking IO Can't compare!!!
however IO Multiplex can monitor many objects at once
 Regulatory mechanisms are inherent to the operating system if you want to use them(select)
Need you to import the corresponding select Modular

When the user process calls select,Then the whole process will be block,At the same time, kernel Will "monitor" all select Conscientious socket,When any one socket The data in is ready. select Will return. This time the user process calls again read Operation, data from kernel Copy to user process. This graph and blocking IO The graph is not really very different, but actually worse. Because there are two system calls required(select and recvfrom),and blocking IO Only one system call was invoked(recvfrom). However, with select The advantage is that it can handle multiple at once connection
"""
import socket
import select

server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False)  # The default is true, and when set to false, the program does not wait here, but goes straight down
read_list = [server]

while True:
    r_list, w_list, x_list = select.select(read_list, [], [])
    """
    Help you regulate
    Return to your supervisor as soon as someone comes
    """
    # print(res)  # ([<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>], [], [])
    # print(server)
    # print(r_list)
    for i in r_list:  #
        """Different processing for different objects"""
        if i is server:
            conn, addr = i.accept()
            # It should also be added to the regulated queue
            read_list.append(conn)
        else:
            res = i.recv(1024)
            if len(res) == 0:
                i.close()
                # Remove invalid supervisory objects
                read_list.remove(i)
                continue
            print(res)
            i.send(b'heiheiheiheihei')

 # Client
import socket

client = socket.socket()
client.connect(('127.0.0.1',8080))

while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data)
    
"""
There are actually many regulatory mechanisms
select mechanism  windows linux All have
poll mechanism    Only in linux Yes   poll and select Multiple objects can be monitored but poll More regulation

Above select and poll Mechanisms are not perfect when there are especially many regulators
 Extremely large delay response may occur

epoll Mechanisms only exist in linux Yes
	It binds a callback mechanism to each supervisor
	Alert as soon as there is a response callback mechanism

Writing code with different detection mechanisms is too cumbersome for different operating systems
 There's one person who can automatically help you choose the appropriate regulatory mechanism based on the platform you're running on
selectors Modular
"""    

Asynchronous IO

"""
asynchronous IO Models are the most efficient and widely used of all models
 Related modules and frameworks
	Modular:asyncio Module ( async Actually asynchronous)
	Asynchronous Framework:sanic tronado twisted
		   Fast
		   
User Process Initiation read Once you've done it, you can start doing something else right away. On the other hand, from kernel The angle, when it is subjected to a asynchronous read After that, it returns immediately, so nothing happens to the user process block. Then? kernel It waits for the data to be ready, then copies it to user memory, and when it's all done, kernel Send a user process signal,Tell it read The operation is complete (simply, apply to the operating system first, then you can do other things, the remaining tasks will be helped by the operating system, after which the signal will be sent to the process, the process will continue to do the remaining things)		   
"""
import threading
import asyncio

@asyncio.coroutine
def hello():
    print('hello world %s'%threading.current_thread())
    yield from asyncio.sleep(1)  # IO operations simulated here
    print('hello world %s' % threading.current_thread())

loop = asyncio.get_event_loop()
tasks = [hello(),hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Comparison of four IO models

So far, all four IO Model s have been described. Now go back and answer the first few questions: what is the difference between blocking and non-blocking, and what is the difference between synchronous IO and asynchronous IO.
First answer the simplest one: blocking vs non-blocking. The differences between the two are clearly illustrated in the previous introduction. Calling blocking IO keeps the corresponding process blocked until the operation is complete, while non-blocking IO returns immediately when the kernel s are ready for data.

Before explaining the difference between synchronous IO and asynchronous IO, you need to define them first. The definition given by Stevens (actually the definition of POSIX) looks like this:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
The difference is that synchronous IO blocks the process when it does an IO operation. By this definition, the four IO models can be divided into two categories: blocking IO, non-blocking IO, and IO multiplexing, all of which belong to synchronous IO, while asynchronous I/O is the latter.

It might be said that non-blocking IO is not blocked. Here's a very "tricky" place where "IO operation" in the definition refers to real IO operations, such as the recvfrom system call in the example. When non-blocking IO executes the recvfrom system call, it will not block the process if the kernels'data is not ready. However, when the data in the kernels is ready, recvfrom copies the data from the kernels into user memory, at which point the process is blocked, during which time the process is blocked. Unlike asynchronous IO, when a process initiates an IO operation, it returns directly and ignores it until kernel sends a signal telling the process that IO is complete. Throughout this process, there is no block at all.

A comparison of the IO Model s is shown in the figure:

  

The differences between non-blocking IO and asynchronous IO are obvious as described above. In non-blocking IO, although a process is not blocked most of the time, it still requires the process to actively check, and when the data is ready, it also requires the process to actively call recvfrom again to copy the data to user memory. Asynchronous IO is completely different. It's like a user process handing over the entire IO operation to someone else (the kernel) and signaling when that person finishes. During this time, the user process does not need to check the status of the IO operation or actively de-copy the data.

Keywords: Python

Added by jgh84 on Fri, 05 Nov 2021 22:53:02 +0200