Concurrent programming multithreading

Multithreading for Concurrent Programming (Theory)

Introduction to a threading module

The multiprocess module fully mimics the threading module's interface, and they are very similar at the usage level, so they are not described in detail.

Official link: https://docs.python.org/3/library/threading.html?highlight=threading#

Two ways to open a thread

#Mode 1
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('Too white',))
    t.start()
    print('Main Thread')

Mode 1

#Mode 2
from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('Main Thread')

Mode 2

Three is the difference between opening multiple threads in one process and opening multiple child processes in one process

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #Open thread under main process
    t=Thread(target=work)
    t.start()
    print('main thread/main process')
    '''
    Print results:
    hello
    Main Thread/Main Process
    '''

    #Open a subprocess under the main process
    t=Process(target=work)
    t.start()
    print('main thread/main process')
    '''
    Print results:
    Main Thread/Main Process
    hello
    '''
Who has a fast start

Open Speed Contrast

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1: Opens multiple threads under the main process, each of which is the same as the pid of the main process
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('Main Thread/Main Process pid',os.getpid())

    #part2: Open multiple processes, each with a different pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('Main Thread/Main Process pid',os.getpid())
//Take a look at pid

Contrast pid

from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('main',n) #Undoubtedly, the child process p has changed its global n to 0, but only its own. Viewing the parent process n is still 100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('main',n) #View result is 0 because in-process data is shared between threads in the same process
//Threads within the same process share data for that process?

Threads within the same process share data for that process

Small exercises

socket related exercises

#_*_coding:utf-8_*_
#!/usr/bin/env python
import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()

//Multithreaded concurrent socket server

Server

#_*_coding:utf-8_*_
#!/usr/bin/env python


import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)

//Client

Client

One receives user input, one formats user input in uppercase, and one saves formatted results in a file

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

Other four-thread related methods

Method of Thread Instance Object
  # isAlive(): Returns whether the thread is active.
  # getName(): Returns the thread name.
  # setName(): Sets the thread name.

Some of the methods provided by the threading module:
  # threading.currentThread(): Returns the current thread variable.
  # threading.enumerate(): Returns a list containing running threads.Running refers to threads after they start and before they end, excluding threads before they start and after they terminate.
  # threading.activeCount(): Returns the number of threads running with the same results as len(threading.enumerate()).
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #Open thread under main process
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #Main Thread
    print(threading.enumerate()) #There are two running threads along with the main thread
    print(threading.active_count())
    print('Main Thread/Main Process')

    '''
    //Print results:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    //Main Thread/Main Process
    Thread-1
    '''

Main thread waits for child thread to end

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('Main Thread')
    print(t.is_alive())
    '''
    egon say hello
    //Main Thread
    False
    '''

View Code

Five daemon threads

Both processes and threads follow this pattern: the daemon xxx waits for the main xxx to be destroyed when it has finished running

It should be emphasized that running is not terminated

#1. For the main process, run-through means that the main process code is run-through

#2. For the main thread, run complete means that all non-daemon threads in the process in which the main thread resides are run before the main thread is finished

Explain in detail:

#1 The main process runs out after its code has finished (the daemon is recycled at this point), and then the main process waits until all non-daemon sub-processes have finished recycling their resources (otherwise a zombie process will occur) before it ends.

#2 The main thread does not run until the other non-daemon threads have finished running (the daemon thread is then recycled).Because the end of the main thread means the end of the process, the entire process's resources will be recycled, and the process must ensure that all non-daemon threads are running before it can end.
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #Must be set before t.start()
    t.start()

    print('Main Thread')
    print(t.is_alive())
    '''
    //Main Thread
    True
    '''
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")

//Confusing examples

Six mutexes (synchronous lock)

Multithreaded synchronization locks and multiprocess synchronization locks make sense, that is, when multiple threads grab the same data (resource), we want to ensure the data is safe and in a reasonable order.

from threading import Thread
import time

x = 100
def task():
    global x
    temp = x
    time.sleep(0.1)
    temp -= 1
    x = temp



if __name__ == '__main__':
    t_l1 = []
    for i in range(100):
        t = Thread(target=task)
        t_l1.append(t)
        t.start()

    for i in t_l1:
        i.join()
    print(f'main{x}')

Problem of seizing the same resource unlocked

from threading import Thread
from threading import Lock
import time

x = 100
lock = Lock()

def task():
    global x
    lock.acquire()
    temp = x
    time.sleep(0.1)
    temp -= 1
    x = temp
    lock.release()


if __name__ == '__main__':
    t_l1 = []
    for i in range(100):
        t = Thread(target=task)
        t_l1.append(t)
        t.start()

    for i in t_l1:
        i.join()
    print(f'main{x}')

Synchronization lock ensures data security

Seven Deadlocks and Recursive Locks

Processes also have deadlocks and recursive locks, which are the same as thread deadlocks and recursive locks.

Deadlock refers to the phenomenon of two or more processes or threads waiting for each other due to competing for resources during execution. Without external forces, they will not be able to proceed.In this case, the system is in a deadlock state or a deadlock is created. These processes that are always waiting for each other are called deadlock processes, such as deadlocks

from threading import Thread
from threading import Lock
import time

lock_A = Lock()
lock_B = Lock()


class MyThread(Thread):

    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        lock_A.acquire()
        print(f'{self.name}Get it A lock')

        lock_B.acquire()
        print(f'{self.name}Get it B lock')
        lock_B.release()

        lock_A.release()

    def f2(self):
        lock_B.acquire()
        print(f'{self.name}Get it B lock')
        time.sleep(0.1)

        lock_A.acquire()
        print(f'{self.name}Get it A lock')
        lock_A.release()

        lock_B.release()

if __name__ == '__main__':
    for i in range(3):
        t = MyThread()
        t.start()

    print('main....')

Solution, recursive locking, Python provides a re-lockable RLock in Python to support multiple requests for the same resource in the same thread.

This RLock maintains a Lock and a counter variable internally, and counter records the number of acquisitions so that resources can be requested multiple times.Until all acquire s for one thread are release d, no other thread can obtain resources.In the example above, if RLock is used instead of Lock, no deadlock will occur:

from threading import Thread
from threading import RLock
import time

lock_A = lock_B = RLock()


class MyThread(Thread):
    
    def run(self):
        self.f1()
        self.f2()
    
    def f1(self):
        lock_A.acquire()
        print(f'{self.name}Get it A lock')
        
        lock_B.acquire()
        print(f'{self.name}Get it B lock')
        lock_B.release()
        
        lock_A.release()
    
    def f2(self):
        lock_B.acquire()
        print(f'{self.name}Get it B lock')
        time.sleep(0.1)
        
        lock_A.acquire()
        print(f'{self.name}Get it A lock')
        lock_A.release()
        
        lock_B.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()
    
    print('main....')
    

Eight semaphore Semaphore

Same as process

Semaphore manages a built-in counter,
Built-in counter-1 whenever acquire() is called;
Built-in counter + 1 when release() is called;
The counter cannot be less than 0; when the counter is 0, acquire() blocks the thread until release() is called by another thread.

Instance: (semaphore is available to only five threads at the same time, which limits the maximum number of connections to 5):

from threading import Thread
from threading import Semaphore
from threading import current_thread
import time
import random

sem = Semaphore(5)

def go_public_wc():
    sem.acquire()
    print(f'{current_thread().getName()} Use the toilet ing')
    time.sleep(random.randint(1,3))
    sem.release()


if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=go_public_wc)
        t.start()

Nine Python GIL (Global interpreter Lock)

First, some languages (java, c++, c) support multiple threads in the same process to be able to apply multicore CPUs, which is what we'll hear now about multicore CPU technology like 4-core 8-core.So we have said before that if there is any data insecurity in the application of multi-process, that is, multiple processes grab this data in one file at the same time. Everyone has changed this data, but before it is time to update to the original file, it is also calculated by other processes.Cause data insecurity problem, so we can solve it by locking, multithreaded people think about whether the same, concurrent execution is the problem.But Python used to lock multithreads in its earliest days, but Python added a GIL global interpreter lock at the extreme (when the computer CPU did have only one core), at the interpreter level. It locks the entire thread, not some data operations inside it. Only one thread can use the CPU at a time, that is to sayMulti-threading does not use multi-core, but it is not a problem of Python language, it is a feature of CPython interpreter. If there is no problem with Jpython interpreter, Cpython is the default, because of its speed, Jpython was developed in java, there is no way to use multi-core in Cpython. This is a drawback of python, a historical problem.Although many of the gods of the python team are working to change this, there is no solution.(Is this related to interpretive languages (python, php) and compiled languages???Pending!Compiled languages are usually assigned to you during compilation. Explanatory languages are executed while interpreting, so this lock is added to prevent data insecurity. Is this a disadvantage of all interpretive languages?)

* But can't we concurrently have this lock?When our programs are partially computed, that is, programs with a high CPU usage (CPUs are always computed), that's not good, but if your programs are I/O (which is usually your program) (input, network latency to access web addresses, file read/write to open/close), under what circumstances do you use high concurrency (Financial computing uses artificial intelligence (Alpha Dog), but general business scenarios do not use, crawling web pages, multiuser websites, chat software, processing files. I/O operations rarely use CPU, so multi-threading can be concurrent, because the CPU is only a fast dispatch thread, and there is nothing inside the threadSo computing, like a bunch of network requests, my CPU is very fast one by one to schedule your multithreads, and your threads are going to perform I/O operations.

Detailed introduction to GIL locks: https://www.cnblogs.com/jin-xin/articles/11232225.html

The relationship between ten GIL locks and Lock

GIL VS Lock

    A savvy classmate might ask this question, because, as you said before, Python already has a GIL that guarantees that only one thread can execute at a time, why do we need a lock here? 

First, we need to agree that the purpose of a lock is to protect shared data and that only one thread can modify shared data at a time

    Then we can conclude that different locks should be placed to protect different data.

Finally, it is clear that GIL and Lock are two locks that protect different data, the former at the interpreter level (of course, interpreter-level data is protected, such as garbage collected data), and the latter at the user's own application. It is obvious that GIL is not responsible for this, it can only protect data at the interpreter level.User-defined lock handling, Lock

Process analysis: All threads grab GIL locks or execute permissions

Thread 1 grabs the GIL lock, gets execute privilege, starts execution, and adds a Lock, which has not yet been executed, that is, Thread 1 has not released the Lock yet. It is possible that Thread 2 grabs the GIL lock and begins execution. During execution, it finds that Lock has not yet been released by Thread 1. Thread 2 enters the blockage, gets execute privilege and has the right to execute.Thread 1 gets the GIL and executes normally until Lock is released.This results in the effect of serial operation

Since it's serial, let's execute

  t1.start()

  t1.join

  t2.start()

  t2.join()

This is also serial execution. Why add Lock? Know that join waits for all t1 code to execute, which is equivalent to locking all t1 code, while Lock only locks part of the code that manipulates shared data.

Details:

Because the Python interpreter helps you automatically recycle memory on a regular basis, you can understand that there is a separate thread in the Python interpreter that starts wake up to do a global poll every once in a while to see which memory data can be emptied, when the threads in your own program and the py interpreter's own threads are concurrentRunning, suppose your thread deletes a variable. At the clearing moment when the py interpreter's garbage collection thread empties this variable, it is possible that another thread has just reassigned this memory space that has not yet been emptied. As a result, it is possible that the newly assigned data has been deleted in order to solve the problemSimilarly, the Python interpreter simply and roughly locks, meaning that when a thread is running, no one else can move, which solves the above problem, which is a legacy of earlier versions of Python.

Eleven Event s

Same as process

A key feature of threads is that each thread runs independently and its state is unpredictable.Thread synchronization issues can become tricky when other threads in the program need to determine their next action by determining the state of a thread.To solve these problems, we need to use Event objects from the threading library.The object contains a semaphore that can be set by a thread, allowing the thread to wait for certain events to occur.Initially, the signal flag in the Event object is set to false.If a thread waits for an Event object whose flag is false, the thread will be blocked until the flag is true.If a thread sets the flag of an Event object to true, it will wake up all threads waiting for the Event object.If a thread waits for an Event object that has been set to true, it will ignore the event and continue executing

event.isSet(): Returns the status value of the event;

event.wait(): If event.isSet()==False will block the thread;

event.set(): Sets the state value of event to True, and all thread activations in the blocking pool are ready to be scheduled by the operating system;

event.clear(): The status value of the recovery event is False.

For example, there are multiple worker threads trying to link to MySQL, and we want to make sure that the MySQL service is working before linking so that those worker threads can connect to the MySQL server. If the connection is unsuccessful, they will try to reconnect.Then we can use threading.Event mechanism to coordinate the connection operations of individual worker threads

Twelve Condition s (Understanding)

Make threads wait, release n threads only if a condition is met, and see how to use it ~~

import time
from threading import Thread,RLock,Condition,current_thread

def func1(c):
    c.acquire(False) #Fixed format
    # print(1111)

    c.wait()  #Waiting for notification,
    time.sleep(3)  #When the notification is completed, you execute it serially, which shows the mechanism of the lock.
    print('%s Executed'%(current_thread().getName()))

    c.release()

if __name__ == '__main__':
    c = Condition()
    for i in range(5):
        t = Thread(target=func1,args=(c,))
        t.start()

    while True:
        num = int(input('Please enter the number of threads you want to notify:'))
        c.acquire() #Fixed format
        c.notify(num)  #Notify num threads not to wait, go ahead and execute
        c.release()

#Result analysis: 
# Please enter the number of threads you want to notify:3
# Please enter the number of threads you want to notify:Thread-1 Executed #Sometimes you will find that your results are printed where you want to enter them. This is a printing problem. It doesn't matter, it doesn't affect
# Thread-3 executed
# Thread-2 executed

//Sample Code

View Code

Thirteen Timers (Understanding)

Timer, a timer that executes an operation after a specified n seconds, which may be used when doing a timed task.

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('Link Timeout')
        print('<%s>No.%s Second attempt to link' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>Link succeeded' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]Checking mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

Fourteen Thread Queue

What's the difference between queues and lists?

Queue queue: use import queue in the same way as process queue

  queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

  • class queue.Queue(maxsize=0) #FIFO
import queue #Instead of importing through the threading module, import queue directly, which comes with python
#Usage is basically the same as queue in our process multiprocess
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait() #Error without data, can be done with try
print(q.get())
print(q.get())
print(q.get())
# q.get_nowait() #Error without data, can be done with try
'''
//Results (FIFO):
first
second
third
'''

//FIFO sample code

FIFO example

  class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue() #Queues, like stacks, stacks, did we mention, are they in the first out order?
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait()

print(q.get())
print(q.get())
print(q.get())
# q.get_nowait()
'''
//Results (LIFO):
third
second
first
'''

//FIFO sample code

FIFO example

class queue.PriorityQueue(maxsize=0)#Queues that set priority when storing data

import queue

q=queue.PriorityQueue()
#put enters a tuple, the first element of which is priority (usually a number or a comparison between non-numbers). The smaller the number, the higher the priority
q.put((-10,'a'))
q.put((-5,'a'))  #Negative numbers can also
# q.put((20,'ws'))  #If the precedence of two values is the same, the order is sorted by the acsii code of the following values, and if the first number element of the string is the same, compare the acsii code order of the second element
# q.put((20,'wd'))
# q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() cannot be a dictionary
# q.put((20,('w',1)))  #Two data of the same priority, whose values must be of the same data type to be compared, can be meta-ancestors or sorted by ascii code order of elements

q.put((20,'b'))
q.put((20,'a'))
q.put((0,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
'''
//Result (The smaller the number, the higher the priority will leave the queue):
'''

//Priority Queue Sample Code

Priority Queue Example

These three queues are thread-safe and do not have multiple threads grabbing the same resource or data.

Fifteen Python Standard Modules--concurrent.futures

That's not enough for our thread pool. Let's tell you with a new module. We didn't have a thread pool in the early days. Now python has a new standard or built-in module that provides a new thread pool and process pool. Previously, we talked about the process pool as multiprocessing, and now in this new module, they both use the same thing.

Why put process pools and thread pools together in order to use them in the same way as threadPollExecutor and ProcessPollExecutor, and you can use both of them directly if you import them through this concurrent.futures

The concurrent.futures module provides a highly encapsulated asynchronous call interface
 ThreadPoolExecutor: Thread pool, providing asynchronous calls
 ProcessPoolExecutor: Process pool, providing asynchronous calls
Both implement the same interface, which is defined by the abstract Executor class.

#2 Basic Methods
#submit(fn, *args, **kwargs)
Asynchronous submission of tasks

#map(func, *iterables, timeout=None, chunksize=1) 
Replace for loop submit operation

#shutdown(wait=True) 
pool.close()+pool.join() operation equivalent to process pool
 wait=True, wait until all tasks in the pool have finished recycling resources before continuing
 wait=False, returns immediately, does not wait for the task in the pool to finish executing
 Whatever the value of the wait parameter, the program waits until all tasks are executed
 submit and map must be before shutdown

#result(timeout=None)
Achieving results

#add_done_callback(fn)
callback
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print('%s Printed:'%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #Data for default normal starting threads does not exceed number of CPU s*5
# tpool = ProcessPoolExecutor(max_workers=5) #The use of process pools simply requires changing the ThreadPoolExecutor above to ProcessPoolExecutor, and nothing else
#Asynchronous execution
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #Submit the execution function and return a result object, i as the parameter def submit(self, fn, *args, **kwargs) of the task function: Any form of parameter can be passed
    t_lst.append(t)  #
    # print(t.result())
    #This returned result object t, can't get the result directly, otherwise it becomes serial again, it can be understood to get a number, and after all the threads'results come out, we will get the result through the result object t again.
tpool.shutdown() #Previous close prevents new tasks from coming in + join and waits for all threads to finish executing
print('Main Thread')
for ti in t_lst:
    print('>>>>',ti.result())

# Instead of shutdown(), we can use the following
# while 1:
#     for n,ti in enumerate(t_lst):
#         print('>>>>', ti.result(),n)
#     time.sleep(2) #Every two seconds to go to the results, which one has the results, you can take out which one. To express this, you can poll to get the results without waiting for all the results to come out. Because your task takes a long time to execute, you need to wait a long time to get the results in this way.You can take the quick results out first.If there are no execution results in the result object, then you can not get anything. It is important to note that it is not empty, what is not, how to judge which result I have taken. You can do this by enumerating enumerate s to record where the result object of your result has been takenYes, I won't take it anymore.

#Result analysis: Printed results are out of order, because when the sleep in func function is reached, the threads switch. It is impossible for anyone to print first. But at the end, we get the results through the result object in order, because when our main thread does the for loop, we order the resultsObject is added to the list.
# 37220 Printed:0
# 32292 Printed: 4
# 33444 Printed: 1
# 30068 Printed: 2
# 29884 Printed: 3
# Main Thread
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16

ThreadPoolExecutor Simple use

ThreaPoolExecutor is simple to use

Use of ProcessPoolExecutor:

You just need to change this line of code to the one below, and nothing else will change
 tpool = ThreadPoolExecutor(max_workers=5) #The default starting thread data does not exceed the number of CPU s*5
# tpool = ProcessPoolExecutor(max_workers=5)

You'll find out why you put both the thread pool and the process pool in this module, just as you would use it
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import os,time,random
def task(n):
    print('%s is runing' %threading.get_ident())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    s = executor.map(task,range(1,5)) #map replaces for+submit
    print([i for i in s])

map Use

Use of map

import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    return n*n

def call_back(m):
    print('The results are:%s'%(m.result()))

tpool = ThreadPoolExecutor(max_workers=5)
t_lst = []
for i in range(5):
    t = tpool.submit(func,i).add_done_callback(call_back)

//Simple application of callback function

Callback functions are simple to use

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<process%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<process%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page gets a future object, obj, which needs to be obj.result() to get the result

//Callback function application, you need to practice yourself

Keywords: Python socket MySQL Programming

Added by alevsky on Wed, 21 Aug 2019 03:41:35 +0300