Day 7 thread safety and queuing

1. Processes and threads

Process: a regular running application is a process; Each process runs in its dedicated and protected memory space.
Thread: thread is the basic unit for executing tasks
Each process must have at least one thread (only one by default)
Single thread serial: if you want to execute multiple tasks in one thread, the tasks are executed one by one in sequence

Process workshop; Thread worker

2. Multithreading

Multithreading: create multiple threads in a process; Tasks executed in multiple threads can be executed concurrently (simultaneously)
Multithreading principle: use cpu idle time to work

Using multithreading method 1: directly create the object of Thread class
Thread object = Thread(target = function, args = tuple)

Using multithreading method 2: create subclass objects of Thread
class subclass (Thread):
def init(self):
super().init()
The implementation of task function requires additional data and corresponding attributes

def run(self):
    Tasks that need to be executed in child threads

3. Multi process

Multiple processes: a program can have multiple processes (only one by default)

How to create multiple processes:
Process object = Process(target = function, args = tuple)

from multiprocessing import Process

4. Wait (block) - join

Thread object / process object join()
Specifies the operation to be performed after the thread task / process task is completed

Code with data security

from random import randint
import time
from threading import Thread

# Data security issues
"""
If one thread reads out the data and hasn't written in time after modification, and another thread reads and writes the data, then this
 Data security problems may occur when
"""
# Solution: lock


def save_money(money):
    global account
    x = account
    time.sleep(randint(2, 5))
    account = x + money
    print(f'save money{money}Yuan, balance:{account}')


def draw_money(money):
    global account
    x = account
    if x >= money:
        time.sleep(randint(2, 5))
        account = x - money
        print(f'take{money}Yuan, balance:{account}')
    else:
        print('Sorry, your credit is running low!')


if __name__ == '__main__':
    # Account balance
    account = 2000

    t1 = Thread(target=save_money, args=(20000,))
    t2 = Thread(target=draw_money, args=(1000,))

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(f'balance:{account}')

Lock to solve security problems

from random import randint
import time
from threading import Thread, Lock

# Data security issues
"""
If one thread reads out the data and hasn't written in time after modification, and another thread reads and writes the data, then this
 Data security problems may occur at any time
"""
# Solution: lock
"""
1. Ensure a public data lock (create a lock object): Lock()
2. Lock before a thread obtains public data: Lock object.acquire()
3. Lock on data write or release(Unlock): 
"""


def save_money(money):
    print(f'Save:{money}element')
    global account
    # Lock before obtaining data
    account_lock.acquire()

    x = account
    time.sleep(randint(2, 5))
    account = x + money

    # Unlock after data writing
    account_lock.release()


def draw_money(money):
    print(f'take:{money}element')
    global account

    # Lock before obtaining data
    account_lock.acquire()

    x = account
    if x >= money:
        time.sleep(randint(2, 5))
        account = x - money

        # Unlock after data writing
        account_lock.release()
    else:
        print('Sorry, your credit is running low!')


if __name__ == '__main__':
    # Account balance
    account = 2000
    # One data corresponds to one lock
    account_lock = Lock()

    t1 = Thread(target=save_money, args=(20000,))
    t2 = Thread(target=draw_money, args=(1000,))

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(f'balance:{account}')

RLOCK lock

Data security issues

If one thread reads out the data and hasn't written in time after modification, and another thread reads and writes the data, then this
Data security problems may occur at any time

Solution: lock

  1. One data corresponds to one lock object: RLock()
  2. Place the data operation in the area specified by the lock
    with lock object:
    Operation data
from random import randint
import time
from threading import Thread, RLock


def save_money(money):
    print(f'Save:{money}element')
    global account
    with account_lock:
        x = account
        time.sleep(randint(2, 5))
        account = x + money


def draw_money(money):
    print(f'take:{money}element')
    global account
    with account_lock:
        x = account
        if x >= money:
            time.sleep(randint(2, 5))
            account = x - money
        else:
            print('Sorry, your credit is running low!')


if __name__ == '__main__':
    # Account balance
    account = 2000
    account_lock =RLock()

    t1 = Thread(target=save_money, args=(20000,))
    t2 = Thread(target=draw_money, args=(1000,))

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(f'balance:{account}')

1. Multithreaded data collection

If the tasks of multiple threads produce different data, the method to collect these data together:
Method 1: define a global common container (e.g. list, dictionary, tuple)
Method 2: thread queue

from datetime import datetime
import time
from random import randint
from threading import Thread


def download(name):
    print(f'{name}Start downloading:{datetime.now()}')
    time.sleep(randint(2, 7))
    print(f'{name}Download complete:{datetime.now()}')
    # print(f'{name} data')

    global all_data
    all_data.append(f'{name}data')


# Requirement 1: collect the data obtained by each sub thread in the main thread
# Method: define a global list and add the data generated in the child thread to the list as a list element
if __name__ == '__main__':
    all_data = []

    ts = []
    for x in range(1, 11):
        t = Thread(target=download, args=(f'film{x}',))
        t.start()
        ts.append(t)

    for t in ts:
        t.join()
    print(all_data)

1. Thread queue

Queue: queue is a first in first out container data structure. Both thread queue and process queue in Python are data safe.
Usage: 1) import queue type: from queue import Queue
2) Create queue object: queue () / queue (maximum number of elements)
3) Operation queue: a. enter: queue object Put (data)
b. Out: queue object get() / queue object get(timeout = timeout)

Note: the get operation of the queue will wait when the queue is empty (no error will be reported), and the data will be obtained immediately when there is data in the queue.
If the timeout time is set, an error will be reported if the timeout time is set!

from datetime import datetime
import time
from random import randint
from threading import Thread
from queue import Queue 


def download(name):
    print(f'{name}Start downloading:{datetime.now()}')
    time.sleep(randint(2, 7))
    print(f'{name}Download complete:{datetime.now()}')
    # print(f'{name} data')
    q.put(f'{name}data')


if __name__ == '__main__':
    q = Queue()

    for x in range(1, 11):
        t = Thread(target=download, args=(f'film{x}',))
        t.start()

    # Requirement 2: immediately process all data obtained by the sub thread
    for _ in range(10):
        result = q.get()
        print(f'handle:{result}')
    print('All data processing completed!')

Thread queue actual problem

from datetime import datetime
import time
from random import randint
from threading import Thread
from queue import Queue


def download(name):
    print(f'{name}Start downloading:{datetime.now()}')
    time.sleep(randint(2, 7))
    print(f'{name}Download complete:{datetime.now()}')
    q.put(f'{name}data')


def get_data():
    while True:
        result = q.get()
        if result == 'end':
            break
        print(f'handle:{result}')


if __name__ == '__main__':
    q = Queue()

    ts = []
    for x in range(1, randint(6, 11)):
        t = Thread(target=download, args=(f'film{x}',))
        t.start()
        ts.append(t)

    deal_t = Thread(target=get_data)
    deal_t.start()

    for t in ts:
        t.join()
    q.put('end')

Process queue

# 1. Data sharing between processes
# There is no other way to share data between processes except to use the process queue

from multiprocessing import Process, Queue as P_Queue
from random import randint
from queue import Queue


def func1(q2):
    result = randint(0, 100)
    # 3. Use the global process queue to save data through parameters in the sub process
    q2.put(result)
    print('Data 1 added successfully!')


def func2(q2):
    result = randint(1000, 2000)
    # 3. Use the global process queue to save data through parameters in the child process
    q2.put(result)
    print('Data 2 added successfully!')


if __name__ == '__main__':
    # 1. Create a global process queue
    q = P_Queue()

    # 2. Pass the process queue as a parameter to the child process
    p1 = Process(target=func1, args=(q,))
    p2 = Process(target=func2, args=(q,))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    # 4. Directly obtain the data in the process queue in the main process (if the process queue is to be used in the child process, it must be passed through parameters and cannot be used directly)
    print(q.get())
    print(q.get())

Keywords: Python Java

Added by nevesgodnroc on Thu, 20 Jan 2022 08:43:20 +0200