Multithreading for Concurrent Programming (Theory)
Introduction to a threading module
The multiprocess module fully mimics the threading module's interface, and they are very similar at the usage level, so they are not described in detail.
Official link: https://docs.python.org/3/library/threading.html?highlight=threading#
Two ways to open a thread
#Mode 1 from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('Too white',)) t.start() print('Main Thread')
Mode 1
#Mode 2 from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('Main Thread')
Mode 2
Three is the difference between opening multiple threads in one process and opening multiple child processes in one process
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': #Open thread under main process t=Thread(target=work) t.start() print('main thread/main process') ''' Print results: hello Main Thread/Main Process ''' #Open a subprocess under the main process t=Process(target=work) t.start() print('main thread/main process') ''' Print results: Main Thread/Main Process hello ''' Who has a fast start
Open Speed Contrast
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1: Opens multiple threads under the main process, each of which is the same as the pid of the main process t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('Main Thread/Main Process pid',os.getpid()) #part2: Open multiple processes, each with a different pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('Main Thread/Main Process pid',os.getpid()) //Take a look at pid
Contrast pid
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('main',n) #Undoubtedly, the child process p has changed its global n to 0, but only its own. Viewing the parent process n is still 100 n=1 t=Thread(target=work) t.start() t.join() print('main',n) #View result is 0 because in-process data is shared between threads in the same process //Threads within the same process share data for that process?
Threads within the same process share data for that process
Small exercises
socket related exercises
#_*_coding:utf-8_*_ #!/usr/bin/env python import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start() //Multithreaded concurrent socket server
Server
#_*_coding:utf-8_*_ #!/usr/bin/env python import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue s.send(msg.encode('utf-8')) data=s.recv(1024) print(data) //Client
Client
One receives user input, one formats user input in uppercase, and one saves formatted results in a file
from threading import Thread msg_l=[] format_l=[] def talk(): while True: msg=input('>>: ').strip() if not msg:continue msg_l.append(msg) def format_msg(): while True: if msg_l: res=msg_l.pop() format_l.append(res.upper()) def save(): while True: if format_l: with open('db.txt','a',encoding='utf-8') as f: res=format_l.pop() f.write('%s\n' %res) if __name__ == '__main__': t1=Thread(target=talk) t2=Thread(target=format_msg) t3=Thread(target=save) t1.start() t2.start() t3.start()
Other four-thread related methods
Method of Thread Instance Object # isAlive(): Returns whether the thread is active. # getName(): Returns the thread name. # setName(): Sets the thread name. Some of the methods provided by the threading module: # threading.currentThread(): Returns the current thread variable. # threading.enumerate(): Returns a list containing running threads.Running refers to threads after they start and before they end, excluding threads before they start and after they terminate. # threading.activeCount(): Returns the number of threads running with the same results as len(threading.enumerate()).
from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #Open thread under main process t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) #Main Thread print(threading.enumerate()) #There are two running threads along with the main thread print(threading.active_count()) print('Main Thread/Main Process') ''' //Print results: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] //Main Thread/Main Process Thread-1 '''
Main thread waits for child thread to end
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('Main Thread') print(t.is_alive()) ''' egon say hello //Main Thread False '''
View Code
Five daemon threads
Both processes and threads follow this pattern: the daemon xxx waits for the main xxx to be destroyed when it has finished running
It should be emphasized that running is not terminated
#1. For the main process, run-through means that the main process code is run-through #2. For the main thread, run complete means that all non-daemon threads in the process in which the main thread resides are run before the main thread is finished
Explain in detail:
#1 The main process runs out after its code has finished (the daemon is recycled at this point), and then the main process waits until all non-daemon sub-processes have finished recycling their resources (otherwise a zombie process will occur) before it ends. #2 The main thread does not run until the other non-daemon threads have finished running (the daemon thread is then recycled).Because the end of the main thread means the end of the process, the entire process's resources will be recycled, and the process must ensure that all non-daemon threads are running before it can end.
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #Must be set before t.start() t.start() print('Main Thread') print(t.is_alive()) ''' //Main Thread True '''
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------") //Confusing examples
Six mutexes (synchronous lock)
Multithreaded synchronization locks and multiprocess synchronization locks make sense, that is, when multiple threads grab the same data (resource), we want to ensure the data is safe and in a reasonable order.
from threading import Thread import time x = 100 def task(): global x temp = x time.sleep(0.1) temp -= 1 x = temp if __name__ == '__main__': t_l1 = [] for i in range(100): t = Thread(target=task) t_l1.append(t) t.start() for i in t_l1: i.join() print(f'main{x}')
Problem of seizing the same resource unlocked
from threading import Thread from threading import Lock import time x = 100 lock = Lock() def task(): global x lock.acquire() temp = x time.sleep(0.1) temp -= 1 x = temp lock.release() if __name__ == '__main__': t_l1 = [] for i in range(100): t = Thread(target=task) t_l1.append(t) t.start() for i in t_l1: i.join() print(f'main{x}')
Synchronization lock ensures data security
Seven Deadlocks and Recursive Locks
Processes also have deadlocks and recursive locks, which are the same as thread deadlocks and recursive locks.
Deadlock refers to the phenomenon of two or more processes or threads waiting for each other due to competing for resources during execution. Without external forces, they will not be able to proceed.In this case, the system is in a deadlock state or a deadlock is created. These processes that are always waiting for each other are called deadlock processes, such as deadlocks
from threading import Thread from threading import Lock import time lock_A = Lock() lock_B = Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): lock_A.acquire() print(f'{self.name}Get it A lock') lock_B.acquire() print(f'{self.name}Get it B lock') lock_B.release() lock_A.release() def f2(self): lock_B.acquire() print(f'{self.name}Get it B lock') time.sleep(0.1) lock_A.acquire() print(f'{self.name}Get it A lock') lock_A.release() lock_B.release() if __name__ == '__main__': for i in range(3): t = MyThread() t.start() print('main....')
Solution, recursive locking, Python provides a re-lockable RLock in Python to support multiple requests for the same resource in the same thread.
This RLock maintains a Lock and a counter variable internally, and counter records the number of acquisitions so that resources can be requested multiple times.Until all acquire s for one thread are release d, no other thread can obtain resources.In the example above, if RLock is used instead of Lock, no deadlock will occur:
from threading import Thread from threading import RLock import time lock_A = lock_B = RLock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): lock_A.acquire() print(f'{self.name}Get it A lock') lock_B.acquire() print(f'{self.name}Get it B lock') lock_B.release() lock_A.release() def f2(self): lock_B.acquire() print(f'{self.name}Get it B lock') time.sleep(0.1) lock_A.acquire() print(f'{self.name}Get it A lock') lock_A.release() lock_B.release() if __name__ == '__main__': for i in range(10): t = MyThread() t.start() print('main....')
Eight semaphore Semaphore
Same as process
Semaphore manages a built-in counter,
Built-in counter-1 whenever acquire() is called;
Built-in counter + 1 when release() is called;
The counter cannot be less than 0; when the counter is 0, acquire() blocks the thread until release() is called by another thread.
Instance: (semaphore is available to only five threads at the same time, which limits the maximum number of connections to 5):
from threading import Thread from threading import Semaphore from threading import current_thread import time import random sem = Semaphore(5) def go_public_wc(): sem.acquire() print(f'{current_thread().getName()} Use the toilet ing') time.sleep(random.randint(1,3)) sem.release() if __name__ == '__main__': for i in range(20): t = Thread(target=go_public_wc) t.start()
Nine Python GIL (Global interpreter Lock)
First, some languages (java, c++, c) support multiple threads in the same process to be able to apply multicore CPUs, which is what we'll hear now about multicore CPU technology like 4-core 8-core.So we have said before that if there is any data insecurity in the application of multi-process, that is, multiple processes grab this data in one file at the same time. Everyone has changed this data, but before it is time to update to the original file, it is also calculated by other processes.Cause data insecurity problem, so we can solve it by locking, multithreaded people think about whether the same, concurrent execution is the problem.But Python used to lock multithreads in its earliest days, but Python added a GIL global interpreter lock at the extreme (when the computer CPU did have only one core), at the interpreter level. It locks the entire thread, not some data operations inside it. Only one thread can use the CPU at a time, that is to sayMulti-threading does not use multi-core, but it is not a problem of Python language, it is a feature of CPython interpreter. If there is no problem with Jpython interpreter, Cpython is the default, because of its speed, Jpython was developed in java, there is no way to use multi-core in Cpython. This is a drawback of python, a historical problem.Although many of the gods of the python team are working to change this, there is no solution.(Is this related to interpretive languages (python, php) and compiled languages???Pending!Compiled languages are usually assigned to you during compilation. Explanatory languages are executed while interpreting, so this lock is added to prevent data insecurity. Is this a disadvantage of all interpretive languages?)
* But can't we concurrently have this lock?When our programs are partially computed, that is, programs with a high CPU usage (CPUs are always computed), that's not good, but if your programs are I/O (which is usually your program) (input, network latency to access web addresses, file read/write to open/close), under what circumstances do you use high concurrency (Financial computing uses artificial intelligence (Alpha Dog), but general business scenarios do not use, crawling web pages, multiuser websites, chat software, processing files. I/O operations rarely use CPU, so multi-threading can be concurrent, because the CPU is only a fast dispatch thread, and there is nothing inside the threadSo computing, like a bunch of network requests, my CPU is very fast one by one to schedule your multithreads, and your threads are going to perform I/O operations.
Detailed introduction to GIL locks: https://www.cnblogs.com/jin-xin/articles/11232225.html
The relationship between ten GIL locks and Lock
GIL VS Lock A savvy classmate might ask this question, because, as you said before, Python already has a GIL that guarantees that only one thread can execute at a time, why do we need a lock here? First, we need to agree that the purpose of a lock is to protect shared data and that only one thread can modify shared data at a time Then we can conclude that different locks should be placed to protect different data. Finally, it is clear that GIL and Lock are two locks that protect different data, the former at the interpreter level (of course, interpreter-level data is protected, such as garbage collected data), and the latter at the user's own application. It is obvious that GIL is not responsible for this, it can only protect data at the interpreter level.User-defined lock handling, Lock Process analysis: All threads grab GIL locks or execute permissions Thread 1 grabs the GIL lock, gets execute privilege, starts execution, and adds a Lock, which has not yet been executed, that is, Thread 1 has not released the Lock yet. It is possible that Thread 2 grabs the GIL lock and begins execution. During execution, it finds that Lock has not yet been released by Thread 1. Thread 2 enters the blockage, gets execute privilege and has the right to execute.Thread 1 gets the GIL and executes normally until Lock is released.This results in the effect of serial operation Since it's serial, let's execute t1.start() t1.join t2.start() t2.join() This is also serial execution. Why add Lock? Know that join waits for all t1 code to execute, which is equivalent to locking all t1 code, while Lock only locks part of the code that manipulates shared data.
Details:
Because the Python interpreter helps you automatically recycle memory on a regular basis, you can understand that there is a separate thread in the Python interpreter that starts wake up to do a global poll every once in a while to see which memory data can be emptied, when the threads in your own program and the py interpreter's own threads are concurrentRunning, suppose your thread deletes a variable. At the clearing moment when the py interpreter's garbage collection thread empties this variable, it is possible that another thread has just reassigned this memory space that has not yet been emptied. As a result, it is possible that the newly assigned data has been deleted in order to solve the problemSimilarly, the Python interpreter simply and roughly locks, meaning that when a thread is running, no one else can move, which solves the above problem, which is a legacy of earlier versions of Python.
Eleven Event s
Same as process
A key feature of threads is that each thread runs independently and its state is unpredictable.Thread synchronization issues can become tricky when other threads in the program need to determine their next action by determining the state of a thread.To solve these problems, we need to use Event objects from the threading library.The object contains a semaphore that can be set by a thread, allowing the thread to wait for certain events to occur.Initially, the signal flag in the Event object is set to false.If a thread waits for an Event object whose flag is false, the thread will be blocked until the flag is true.If a thread sets the flag of an Event object to true, it will wake up all threads waiting for the Event object.If a thread waits for an Event object that has been set to true, it will ignore the event and continue executing
event.isSet(): Returns the status value of the event; event.wait(): If event.isSet()==False will block the thread; event.set(): Sets the state value of event to True, and all thread activations in the blocking pool are ready to be scheduled by the operating system; event.clear(): The status value of the recovery event is False.
For example, there are multiple worker threads trying to link to MySQL, and we want to make sure that the MySQL service is working before linking so that those worker threads can connect to the MySQL server. If the connection is unsuccessful, they will try to reconnect.Then we can use threading.Event mechanism to coordinate the connection operations of individual worker threads
Twelve Condition s (Understanding)
Make threads wait, release n threads only if a condition is met, and see how to use it ~~
import time from threading import Thread,RLock,Condition,current_thread def func1(c): c.acquire(False) #Fixed format # print(1111) c.wait() #Waiting for notification, time.sleep(3) #When the notification is completed, you execute it serially, which shows the mechanism of the lock. print('%s Executed'%(current_thread().getName())) c.release() if __name__ == '__main__': c = Condition() for i in range(5): t = Thread(target=func1,args=(c,)) t.start() while True: num = int(input('Please enter the number of threads you want to notify:')) c.acquire() #Fixed format c.notify(num) #Notify num threads not to wait, go ahead and execute c.release() #Result analysis: # Please enter the number of threads you want to notify:3 # Please enter the number of threads you want to notify:Thread-1 Executed #Sometimes you will find that your results are printed where you want to enter them. This is a printing problem. It doesn't matter, it doesn't affect # Thread-3 executed # Thread-2 executed //Sample Code
View Code
Thirteen Timers (Understanding)
Timer, a timer that executes an operation after a specified n seconds, which may be used when doing a timed task.
from threading import Thread,Event import threading import time,random def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise TimeoutError('Link Timeout') print('<%s>No.%s Second attempt to link' % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1 print('<%s>Link succeeded' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]Checking mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()
Fourteen Thread Queue
What's the difference between queues and lists?
Queue queue: use import queue in the same way as process queue
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
- class queue.Queue(maxsize=0) #FIFO
import queue #Instead of importing through the threading module, import queue directly, which comes with python #Usage is basically the same as queue in our process multiprocess q=queue.Queue() q.put('first') q.put('second') q.put('third') # q.put_nowait() #Error without data, can be done with try print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() #Error without data, can be done with try ''' //Results (FIFO): first second third ''' //FIFO sample code
FIFO example
class queue.LifoQueue(maxsize=0) #last in fisrt out
import queue q=queue.LifoQueue() #Queues, like stacks, stacks, did we mention, are they in the first out order? q.put('first') q.put('second') q.put('third') # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() ''' //Results (LIFO): third second first ''' //FIFO sample code
FIFO example
class queue.PriorityQueue(maxsize=0)#Queues that set priority when storing data
import queue q=queue.PriorityQueue() #put enters a tuple, the first element of which is priority (usually a number or a comparison between non-numbers). The smaller the number, the higher the priority q.put((-10,'a')) q.put((-5,'a')) #Negative numbers can also # q.put((20,'ws')) #If the precedence of two values is the same, the order is sorted by the acsii code of the following values, and if the first number element of the string is the same, compare the acsii code order of the second element # q.put((20,'wd')) # q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() cannot be a dictionary # q.put((20,('w',1))) #Two data of the same priority, whose values must be of the same data type to be compared, can be meta-ancestors or sorted by ascii code order of elements q.put((20,'b')) q.put((20,'a')) q.put((0,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ''' //Result (The smaller the number, the higher the priority will leave the queue): ''' //Priority Queue Sample Code
Priority Queue Example
These three queues are thread-safe and do not have multiple threads grabbing the same resource or data.
Fifteen Python Standard Modules--concurrent.futures
That's not enough for our thread pool. Let's tell you with a new module. We didn't have a thread pool in the early days. Now python has a new standard or built-in module that provides a new thread pool and process pool. Previously, we talked about the process pool as multiprocessing, and now in this new module, they both use the same thing.
Why put process pools and thread pools together in order to use them in the same way as threadPollExecutor and ProcessPollExecutor, and you can use both of them directly if you import them through this concurrent.futures
The concurrent.futures module provides a highly encapsulated asynchronous call interface ThreadPoolExecutor: Thread pool, providing asynchronous calls ProcessPoolExecutor: Process pool, providing asynchronous calls Both implement the same interface, which is defined by the abstract Executor class. #2 Basic Methods #submit(fn, *args, **kwargs) Asynchronous submission of tasks #map(func, *iterables, timeout=None, chunksize=1) Replace for loop submit operation #shutdown(wait=True) pool.close()+pool.join() operation equivalent to process pool wait=True, wait until all tasks in the pool have finished recycling resources before continuing wait=False, returns immediately, does not wait for the task in the pool to finish executing Whatever the value of the wait parameter, the program waits until all tasks are executed submit and map must be before shutdown #result(timeout=None) Achieving results #add_done_callback(fn) callback
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) print('%s Printed:'%(threading.get_ident()),n) return n*n tpool = ThreadPoolExecutor(max_workers=5) #Data for default normal starting threads does not exceed number of CPU s*5 # tpool = ProcessPoolExecutor(max_workers=5) #The use of process pools simply requires changing the ThreadPoolExecutor above to ProcessPoolExecutor, and nothing else #Asynchronous execution t_lst = [] for i in range(5): t = tpool.submit(func,i) #Submit the execution function and return a result object, i as the parameter def submit(self, fn, *args, **kwargs) of the task function: Any form of parameter can be passed t_lst.append(t) # # print(t.result()) #This returned result object t, can't get the result directly, otherwise it becomes serial again, it can be understood to get a number, and after all the threads'results come out, we will get the result through the result object t again. tpool.shutdown() #Previous close prevents new tasks from coming in + join and waits for all threads to finish executing print('Main Thread') for ti in t_lst: print('>>>>',ti.result()) # Instead of shutdown(), we can use the following # while 1: # for n,ti in enumerate(t_lst): # print('>>>>', ti.result(),n) # time.sleep(2) #Every two seconds to go to the results, which one has the results, you can take out which one. To express this, you can poll to get the results without waiting for all the results to come out. Because your task takes a long time to execute, you need to wait a long time to get the results in this way.You can take the quick results out first.If there are no execution results in the result object, then you can not get anything. It is important to note that it is not empty, what is not, how to judge which result I have taken. You can do this by enumerating enumerate s to record where the result object of your result has been takenYes, I won't take it anymore. #Result analysis: Printed results are out of order, because when the sleep in func function is reached, the threads switch. It is impossible for anyone to print first. But at the end, we get the results through the result object in order, because when our main thread does the for loop, we order the resultsObject is added to the list. # 37220 Printed:0 # 32292 Printed: 4 # 33444 Printed: 1 # 30068 Printed: 2 # 29884 Printed: 3 # Main Thread # >>>> 0 # >>>> 1 # >>>> 4 # >>>> 9 # >>>> 16 ThreadPoolExecutor Simple use
ThreaPoolExecutor is simple to use
Use of ProcessPoolExecutor:
You just need to change this line of code to the one below, and nothing else will change tpool = ThreadPoolExecutor(max_workers=5) #The default starting thread data does not exceed the number of CPU s*5 # tpool = ProcessPoolExecutor(max_workers=5) You'll find out why you put both the thread pool and the process pool in this module, just as you would use it
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import threading import os,time,random def task(n): print('%s is runing' %threading.get_ident()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) s = executor.map(task,range(1,5)) #map replaces for+submit print([i for i in s]) map Use
Use of map
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) return n*n def call_back(m): print('The results are:%s'%(m.result())) tpool = ThreadPoolExecutor(max_workers=5) t_lst = [] for i in range(5): t = tpool.submit(func,i).add_done_callback(call_back) //Simple application of callback function
Callback functions are simple to use
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<process%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<process%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page gets a future object, obj, which needs to be obj.result() to get the result //Callback function application, you need to practice yourself