Processes and threads
A process is an instance of a program running. A process can contain multiple threads, so all resources in a thread can be shared among multiple threads in the same process, which is the basic unit of the dynamic operation of the operating system; each thread is an instance of the process, which can be scheduled and run independently, because threads and processes have many similar characteristics. Points, therefore, are also called lightweight processes. Threads run under the process, and the existence of the process depends on the thread.
Appetizer
Create a simple process example based on Python 3
from threading import Thread from time import sleep class CookBook(Thread): def __init__(self): Thread.__init__(self) self.message = "Hello Parallel Python CookBook!!\n" def print_message(self): print(self.message) def run(self): print("Thread Starting\n") x = 0 while x < 10: self.print_message() sleep(2) x += 1 print("Thread Ended!\n") print("Process Started") hello_python = CookBook() hello_python.start() print("Process Ended")
It should be noted that threads should never be silently executed in the background, and resources should be released in time after execution.
Thread-based parallelism
Multi-threaded programming usually uses shared memory space to communicate among threads, which makes managing memory space the key of multi-threaded programming. Python manages threads through a standard library threading module with the following components:
- Thread object
- Lock object
- RLock object
- Signal object
- Conditional object
- Event object
Define a thread
Basic grammar
The sample code is shown below.
import threading def function(i): print("function called by thread: {0}".format(i)) return threads = [] for i in range(5): t = threading.Thread(target=function, args=(i,)) threads.append(t) t.start() lambda t, threads: t.join()
It is important to note that a thread does not run automatically after it is created. It needs to actively call the start() method to start the thread, and join() will block the thread that calls it until the end of execution. PS: You can avoid blocking the main thread by calling t.setDaemon(True) to make it a background thread.
Thread location
The sample code is shown below.
import threading import time def first_function(): print("{0} is starting".format(threading.currentThread().getName())) time.sleep(2) print("{0} is Exiting".format(threading.currentThread().getName())) def second_function(): print("{0} is starting".format(threading.currentThread().getName())) time.sleep(2) print("{0} is Exiting".format(threading.currentThread().getName())) def third_function(): print("{0} is starting".format(threading.currentThread().getName())) time.sleep(2) print("{0} is Exiting".format(threading.currentThread().getName())) if __name__ == "__main__": t1 = threading.Thread(target=first_function,name="first") t2 = threading.Thread(target=second_function,name="second") t3 = threading.Thread(target=third_function,name="third") t1.start() t2.start() t3.start() t1.join() t2.join() t3.join()
The thread name is set by setting the name parameter of the threading.Thread() function, and the current thread name is obtained by threading.currentThread().getName(); the default name of the thread is defined in Thread-{i} format.
Customize a thread object
The sample code is shown below.
import threading import time exitFlag = 0 class myThread(threading.Thread): def __init__(self, threadID, name, counter): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.counter = counter def run(self): print("Starting:{0}".format(self.name)) print_time(self.name, self.counter, 5) print("Exiting:{0}".format(self.name)) def print_time(threadName, delay, counter): while counter: if exitFlag: thread.exit() time.sleep(delay) print("{0} {1}".format(threadName, time.ctime(time.time()))) counter -= 1 t1 = myThread(1, "Thread-1", 1) t2 = myThread(2, "Thread-2", 1) t1.start() t2.start() t1.join() t2.join() print("Exiting Main Thread.")
If you want to customize a thread object, the first step is to define a subclass inheriting the threading.Thread class, implement the constructor, and override the run() method.
Thread synchronization
Lock
The sample code is shown below.
import threading shared_resource_with_lock = 0 shared_resource_with_no_lock = 0 COUNT = 100000 shared_resource_lock = threading.Lock() def increment_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock += 1 shared_resource_lock.release() def decrement_with_lock(): global shared_resource_with_lock for i in range(COUNT): shared_resource_lock.acquire() shared_resource_with_lock -= 1 shared_resource_lock.release() def increment_without_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock += 1 def decrement_wthout_lock(): global shared_resource_with_no_lock for i in range(COUNT): shared_resource_with_no_lock -= 1 if __name__ == "__main__": t1 = threading.Thread(target=increment_with_lock) t2 = threading.Thread(target=decrement_with_lock) t3 = threading.Thread(target=increment_without_lock) t4 = threading.Thread(target=decrement_wthout_lock) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join() print("the value of shared variable with lock management is :{0}".format( shared_resource_with_lock)) print("the value of shared variable with race condition is :{0}".format( shared_resource_with_no_lock))
Thread locks can be obtained by threading.Lock() method. Generally, there are two modes of operation: acquire() and release() are locked between them. If the release fails, the exception of RuntimError() will be displayed.
RLock
RLock is also called recursive Lock. The difference between RLock and RLock is that who gets it and who releases it is obtained by threading.RLock().
The sample code is shown below.
import threading import time class Box(object): lock = threading.RLock() def __init__(self): self.total_items = 0 def execute(self, n): Box.lock.acquire() self.total_items += n Box.lock.release() def add(self): Box.lock.acquire() self.execute(1) Box.lock.release() def remove(self): Box.lock.acquire() self.execute(-1) Box.lock.release() def adder(box, items): while items > 0: print("adding 1 item in the box") box.add() time.sleep(1) items -= 1 def remover(box, items): while items > 0: print("removing 1 item in the box") box.remove() time.sleep(1) items -= 1 if __name__ == "__main__": items = 5 print("putting {0} items in the box".format(items)) box = Box() t1 = threading.Thread(target=adder, args=(box, items)) t2 = threading.Thread(target=remover, args=(box, items)) t1.start() t2.start() t1.join() t2.join() print("{0} items still remain in the box".format(box.total_items))
Semaphore
The sample code is shown below.
import threading import time import random semaphore = threading.Semaphore(0) def consumer(): print("Consumer is waiting.") semaphore.acquire() print("Consumer notify:consumed item numbers {0}".format(item)) def producer(): global item time.sleep(10) item = random.randint(0, 10000) print("producer notify:produced item number {0}".format(item)) semaphore.release() if __name__ == "__main__": for i in range(0, 5): t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join() print("program terminated.")
The semaphore is initialized to 0, and then in two parallel threads, the consumer thread is blocked by calling the semaphore.acquire() function until semaphore.release() is called in the producer, where the producer-consumer mode is simulated for testing; if the semaphore counter reaches 0, the acquire will be blocked. () Method until it is notified by another thread. If the counter of the semaphore is greater than 0, the value - 1 is then allocated resources.
Thread synchronization using conditions
The best example to explain the conditional mechanism is the producer-consumer problem. In this case, as long as the cache is unsatisfactory, the producer always produces to the cache; as long as the cache is not empty, the consumer always takes it out (and then destroys it). When the buffer queue is not empty, the producer will notify the consumer; when the buffer queue is not satisfied, the consumer will notify the producer.
The sample code is shown below.
from threading import Thread, Condition import time items = [] condition = Condition() class consumer(Thread): def __init__(self): Thread.__init__(self) def consume(self): global condition global items condition.acquire() if len(items) == 0: condition.wait() print("Consumer notify:no item to consum") items.pop() print("Consumer notify: consumed 1 item") print("Consumer notify: item to consume are:{0}".format(len(items))) condition.notify() condition.release() def run(self): for i in range(0, 20): time.sleep(2) self.consume() class producer(Thread): def __init__(self): Thread.__init__(self) def produce(self): global condition global items condition.acquire() if len(items) == 10: condition.wait() print("Producer notify:items producted are:{0}".format(len(items))) print("Producer notify:stop the production!!") items.append(1) print("Producer notify:total items producted:{0}".format(len(items))) condition.notify() condition.release() def run(self): for i in range(0, 20): time.sleep(1) self.produce() if __name__ == "__main__": producer = producer() consumer = consumer() producer.start() consumer.start() producer.join() consumer.join()
condition.acquire() is used to get the lock object. condition.wait() will make the current thread blocked until it receives condition.notify() signal. At the same time, the object that calls the notification of the signal also calls condition.release() in time to release resources.
Thread synchronization using events
Events are pairs used for communication between threads. Some threads wait for signals, while others send signals.
The sample code is shown below.
import time from threading import Thread, Event import random items = [] event = Event() class consumer(Thread): def __init__(self, items, event): Thread.__init__(self) self.items = items self.event = event def run(self): while True: time.sleep(2) self.event.wait() item = self.items.pop() print('Consumer notify:{0} popped from list by {1}'.format( item, self.name)) class producer(Thread): def __init__(self, integers, event): Thread.__init__(self) self.items = items self.event = event def run(self): global item for i in range(100): time.sleep(2) item = random.randint(0, 256) self.items.append(item) print('Producer notify: item N° %d appended to list by %s' % (item, self.name)) print('Producer notify: event set by %s' % self.name) self.event.set() print('Produce notify: event cleared by %s ' % self.name) self.event.clear() if __name__ == "__main__": t1 = producer(items, event) t2 = consumer(items, event) t1.start() t2.start() t1.join() t2.join()
Simplify the code using with syntax
import threading import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s') def threading_with(statement): with statement: logging.debug("%s acquired via with" % statement) def Threading_not_with(statement): statement.acquire() try: logging.debug("%s acquired directly " % statement) finally: statement.release() if __name__ == "__main__": lock = threading.Lock() rlock = threading.RLock() condition = threading.Condition() mutex = threading.Semaphore(1) threading_synchronization_list = [lock, rlock, condition, mutex] for statement in threading_synchronization_list: t1 = threading.Thread(target=threading_with, args=(statement,)) t2 = threading.Thread(target=Threading_not_with, args=(statement,)) t1.start() t2.start() t1.join() t2.join()
Using queue for thread communication
Queue has four common methods:
- put(): Add an element to queue
- get(): Delete an element from queue and return it
- task_done(): This method needs to be called every time an element is processed
- join(): All elements are blocked until they are processed
from threading import Thread, Event from queue import Queue import time import random class producer(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): for i in range(10): item = random.randint(0, 256) self.queue.put(item) print("Producer notify: item item N° %d appended to queue by %s" % (item, self.name)) time.sleep(1) class consumer(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: item = self.queue.get() print('Consumer notify : %d popped from queue by %s' % (item, self.name)) self.queue.task_done() if __name__ == "__main__": queue = Queue() t1 = producer(queue) t2 = consumer(queue) t3 = consumer(queue) t4 = consumer(queue) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join()
Process-based Parallelism
multiprocessing is a module in Python standard library, which implements the mechanism of shared memory.
Asynchronous programming
Using concurrent.futures module
The module has the functions of thread pool and process pool, managing parallel programming tasks, dealing with uncertain execution process, and process/thread synchronization. The module is composed of the following parts.
- Conurrent. futures. Executor: This is a virtual base class that provides methods for asynchronous execution.
- submit(function, argument): The execution of a scheduling function (callable object) that passes in argument as a parameter.
- map(function, argument): Argument is executed as a parameter function in an asynchronous manner.
- shutdown(Wait=True): Sends a signal to the executor to release all resources.
- Conurrent. futures. Future: Includes asynchronous execution of functions. Future objects are instances of submit tasks (functions with parameters) to executor s.
The sample code is shown below.
import concurrent.futures import time number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] def evaluate_item(x): result_item = count(x) return result_item def count(number): for i in range(0, 1000000): i = i + 1 return i * number if __name__ == "__main__": # Sequential execution start_time = time.time() for item in number_list: print(evaluate_item(item)) print("Sequential execution in " + str(time.time() - start_time), "seconds") # Thread pool execution start_time_1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("Thread pool execution in " + str(time.time() - start_time_1), "seconds") # Thread pool execution start_time_2 = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("Process pool execution in " + str(time.time() - start_time_2), "seconds")
Managing event loops using Asyncio
Python's syncio module provides methods for managing events, coroutines, tasks and threads, as well as primitives for writing concurrent code. The main components and concepts of this module include:
- Event loop: In the Asyncio module, each process has an event loop.
- COOPERATION: This is a generalization of subroutines. The process can be suspended during execution so that it can wait for external processing (such as IO) to complete and resume execution from where it was previously suspended.
- Futures: Defines a Future object, which, like the concurrent.futures module, represents an incomplete computation.
- Tasks: This is a subclass of Asyncio for encapsulating and managing protocols in parallel mode.
Asyncio provides the following ways to manage event cycles:
- loop = get_event_loop(): Gets the event loop in the current context.
- loop.call_later(time_delay, callback, argument): Delay time_delay seconds before executing the callback method.
- loop.call_soon(callback, argument): Callback is called as soon as possible, and the call_soon() function ends, and the main thread will call callback as soon as it returns to the event loop.
- loop.time(): Returns the internal time of the current time cycle in float type.
- asyncio.set_event_loop(): Set the event loop for the current context.
- asyncio.new_event_loop(): Create a new time loop and return based on this strategy.
- loop.run_forever(): It will run until stop() is called.
The sample code is shown below.
import asyncio import datetime import time def fuction_1(end_time, loop): print("function_1 called") if(loop.time() + 1.0) < end_time: loop.call_later(1, fuction_2, end_time, loop) else: loop.stop() def fuction_2(end_time, loop): print("function_2 called") if(loop.time() + 1.0) < end_time: loop.call_later(1, function_3, end_time, loop) else: loop.stop() def function_3(end_time, loop): print("function_3 called") if(loop.time() + 1.0) < end_time: loop.call_later(1, fuction_1, end_time, loop) else: loop.stop() def function_4(end_time, loop): print("function_4 called") if(loop.time() + 1.0) < end_time: loop.call_later(1, function_4, end_time, loop) else: loop.stop() loop = asyncio.get_event_loop() end_loop = loop.time() + 9.0 loop.call_soon(fuction_1, end_loop, loop) loop.run_forever() loop.close()
Using Asyncio to manage the Consortium
The sample code is shown below.
import asyncio import time from random import randint @asyncio.coroutine def StartState(): print("Start State called \n") input_val = randint(0, 1) time.sleep(1) if input_val == 0: result = yield from State2(input_val) else: result = yield from State1(input_val) print("Resume of the Transition:\nStart State calling" + result) @asyncio.coroutine def State1(transition_value): outputVal = str("State 1 with transition value=%s \n" % (transition_value)) input_val = randint(0, 1) time.sleep(1) print("...Evaluating...") if input_val == 0: result = yield from State3(input_val) else: result = yield from State2(input_val) @asyncio.coroutine def State2(transition_value): outputVal = str("State 2 with transition value= %s \n" % (transition_value)) input_Val = randint(0, 1) time.sleep(1) print("...Evaluating...") if (input_Val == 0): result = yield from State1(input_Val) else: result = yield from State3(input_Val) result = "State 2 calling " + result return outputVal + str(result) @asyncio.coroutine def State3(transition_value): outputVal = str("State 3 with transition value = %s \n" % (transition_value)) input_val = randint(0, 1) time.sleep(1) print("...Evaluating...") if(input_val == 0): result = yield from State1(input_val) else: result = yield from State2(input_val) result = "State 3 calling " + result return outputVal + str(result) @asyncio.coroutine def EndState(transition_value): outputVal = str("End State With transition value = %s \n" % (transition_value)) print("...Stop Computation...") return outputVal if __name__ == "__main__": print("Finites State Machine simulation with Asyncio Coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())
Using Asyncio to Control Tasks
The sample code is shown below.
import asyncio @asyncio.coroutine def factorial(number): f = 1 for i in range(2, number + 1): print("Asyncio.Task:Compute factorial(%s)" % (i)) yield from asyncio.sleep(1) f *= i print("Asyncio.Task - factorial(%s) = %s" % (number, f)) @asyncio.coroutine def fibonacci(number): a, b = 0, 1 for i in range(number): print("Asyncio.Task:Complete fibonacci (%s)" % (i)) yield from asyncio.sleep(1) a, b = b, a+b print("Asyncio.Task - fibonaci (%s)= %s" % (number, a)) @asyncio.coroutine def binomialCoeff(n, k): result = 1 for i in range(1, k+1): result = result * (n-i+1) / i print("Asyncio.Task:Compute binomialCoeff (%s)" % (i)) yield from asyncio.sleep(1) print("Asyncio.Task - binomialCoeff (%s,%s) = %s" % (n, k, result)) if __name__ == "__main__": tasks = [asyncio.Task(factorial(10)), asyncio.Task( fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()
Using Asyncio and Futures
The sample code is shown below.
import asyncio import sys @asyncio.coroutine def first_coroutine(future, N): count = 0 for i in range(1, N + 1): count = count + i yield from asyncio.sleep(4) future.set_result( "first coroutine (sum of N integers) result = " + str(count)) @asyncio.coroutine def second_coroutine(future, N): count = 1 for i in range(2, N + 1): count *= i yield from asyncio.sleep(3) future.set_result("second coroutine (factorial) result = " + str(count)) def got_result(future): print(future.result()) if __name__ == "__main__": N1 = 1 N2 = 1 loop = asyncio.get_event_loop() future1 = asyncio.Future() future2 = asyncio.Future() tasks = [ first_coroutine(future1, N1), second_coroutine(future2, N2) ] future1.add_done_callback(got_result) future2.add_done_callback(got_result) loop.run_until_complete(asyncio.wait(tasks)) loop.close()
Distributed programming
slightly
GPU programming
slightly