Parallel Programming of Python 3 Series

Processes and threads

A process is an instance of a program running. A process can contain multiple threads, so all resources in a thread can be shared among multiple threads in the same process, which is the basic unit of the dynamic operation of the operating system; each thread is an instance of the process, which can be scheduled and run independently, because threads and processes have many similar characteristics. Points, therefore, are also called lightweight processes. Threads run under the process, and the existence of the process depends on the thread.

Appetizer

Create a simple process example based on Python 3

from threading import Thread
from time import sleep


class CookBook(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.message = "Hello Parallel Python CookBook!!\n"

    def print_message(self):
        print(self.message)

    def run(self):
        print("Thread Starting\n")
        x = 0
        while x < 10:
            self.print_message()
            sleep(2)
            x += 1
        print("Thread Ended!\n")


print("Process Started")
hello_python = CookBook()

hello_python.start()
print("Process Ended")

It should be noted that threads should never be silently executed in the background, and resources should be released in time after execution.

Thread-based parallelism

Multi-threaded programming usually uses shared memory space to communicate among threads, which makes managing memory space the key of multi-threaded programming. Python manages threads through a standard library threading module with the following components:

  • Thread object
  • Lock object
  • RLock object
  • Signal object
  • Conditional object
  • Event object

Define a thread

Basic grammar

The sample code is shown below.

import threading


def function(i):
    print("function called by thread: {0}".format(i))
    return


threads = []
for i in range(5):
    t = threading.Thread(target=function, args=(i,))
    threads.append(t)
    t.start()

lambda t, threads: t.join()

It is important to note that a thread does not run automatically after it is created. It needs to actively call the start() method to start the thread, and join() will block the thread that calls it until the end of execution. PS: You can avoid blocking the main thread by calling t.setDaemon(True) to make it a background thread.

Thread location

The sample code is shown below.

import threading
import time


def first_function():
    print("{0} is starting".format(threading.currentThread().getName()))
    time.sleep(2)
    print("{0} is Exiting".format(threading.currentThread().getName()))


def second_function():
    print("{0} is starting".format(threading.currentThread().getName()))
    time.sleep(2)
    print("{0} is Exiting".format(threading.currentThread().getName()))


def third_function():
    print("{0} is starting".format(threading.currentThread().getName()))
    time.sleep(2)
    print("{0} is Exiting".format(threading.currentThread().getName()))

if __name__ == "__main__":
    t1 = threading.Thread(target=first_function,name="first")
    t2 = threading.Thread(target=second_function,name="second")
    t3 = threading.Thread(target=third_function,name="third")

    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

The thread name is set by setting the name parameter of the threading.Thread() function, and the current thread name is obtained by threading.currentThread().getName(); the default name of the thread is defined in Thread-{i} format.

Customize a thread object

The sample code is shown below.

import threading
import time

exitFlag = 0


class myThread(threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter

    def run(self):
        print("Starting:{0}".format(self.name))
        print_time(self.name, self.counter, 5)
        print("Exiting:{0}".format(self.name))


def print_time(threadName, delay, counter):
    while counter:
        if exitFlag:
            thread.exit()
        time.sleep(delay)
        print("{0} {1}".format(threadName, time.ctime(time.time())))
        counter -= 1


t1 = myThread(1, "Thread-1", 1)
t2 = myThread(2, "Thread-2", 1)

t1.start()
t2.start()

t1.join()
t2.join()

print("Exiting Main Thread.")

If you want to customize a thread object, the first step is to define a subclass inheriting the threading.Thread class, implement the constructor, and override the run() method.

Thread synchronization

Lock

The sample code is shown below.

import threading

shared_resource_with_lock = 0
shared_resource_with_no_lock = 0
COUNT = 100000
shared_resource_lock = threading.Lock()


def increment_with_lock():
    global shared_resource_with_lock
    for i in range(COUNT):
        shared_resource_lock.acquire()
        shared_resource_with_lock += 1
        shared_resource_lock.release()


def decrement_with_lock():
    global shared_resource_with_lock
    for i in range(COUNT):
        shared_resource_lock.acquire()
        shared_resource_with_lock -= 1
        shared_resource_lock.release()


def increment_without_lock():
    global shared_resource_with_no_lock
    for i in range(COUNT):
        shared_resource_with_no_lock += 1


def decrement_wthout_lock():
    global shared_resource_with_no_lock
    for i in range(COUNT):
        shared_resource_with_no_lock -= 1


if __name__ == "__main__":
    t1 = threading.Thread(target=increment_with_lock)
    t2 = threading.Thread(target=decrement_with_lock)
    t3 = threading.Thread(target=increment_without_lock)
    t4 = threading.Thread(target=decrement_wthout_lock)
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t1.join()
    t2.join()
    t3.join()
    t4.join()
    print("the value of shared variable with lock management is :{0}".format(
        shared_resource_with_lock))
    print("the value of shared variable with race condition is :{0}".format(
        shared_resource_with_no_lock))

Thread locks can be obtained by threading.Lock() method. Generally, there are two modes of operation: acquire() and release() are locked between them. If the release fails, the exception of RuntimError() will be displayed.

RLock

RLock is also called recursive Lock. The difference between RLock and RLock is that who gets it and who releases it is obtained by threading.RLock().

The sample code is shown below.

import threading
import time


class Box(object):
    lock = threading.RLock()

    def __init__(self):
        self.total_items = 0

    def execute(self, n):
        Box.lock.acquire()
        self.total_items += n
        Box.lock.release()

    def add(self):
        Box.lock.acquire()
        self.execute(1)
        Box.lock.release()

    def remove(self):
        Box.lock.acquire()
        self.execute(-1)
        Box.lock.release()


def adder(box, items):
    while items > 0:
        print("adding 1 item in the box")
        box.add()
        time.sleep(1)
        items -= 1


def remover(box, items):
    while items > 0:
        print("removing 1 item in the box")
        box.remove()
        time.sleep(1)
        items -= 1


if __name__ == "__main__":
    items = 5
    print("putting {0} items in the box".format(items))
    box = Box()
    t1 = threading.Thread(target=adder, args=(box, items))
    t2 = threading.Thread(target=remover, args=(box, items))

    t1.start()
    t2.start()

    t1.join()
    t2.join()
    print("{0} items still remain in the box".format(box.total_items))

Semaphore

The sample code is shown below.

import threading
import time
import random

semaphore = threading.Semaphore(0)


def consumer():
    print("Consumer is waiting.")
    semaphore.acquire()
    print("Consumer notify:consumed item numbers {0}".format(item))


def producer():
    global item
    time.sleep(10)
    item = random.randint(0, 10000)
    print("producer notify:produced item number {0}".format(item))
    semaphore.release()


if __name__ == "__main__":
    for i in range(0, 5):
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()

    print("program terminated.")

The semaphore is initialized to 0, and then in two parallel threads, the consumer thread is blocked by calling the semaphore.acquire() function until semaphore.release() is called in the producer, where the producer-consumer mode is simulated for testing; if the semaphore counter reaches 0, the acquire will be blocked. () Method until it is notified by another thread. If the counter of the semaphore is greater than 0, the value - 1 is then allocated resources.

Thread synchronization using conditions

The best example to explain the conditional mechanism is the producer-consumer problem. In this case, as long as the cache is unsatisfactory, the producer always produces to the cache; as long as the cache is not empty, the consumer always takes it out (and then destroys it). When the buffer queue is not empty, the producer will notify the consumer; when the buffer queue is not satisfied, the consumer will notify the producer.

The sample code is shown below.

from threading import Thread, Condition
import time

items = []
condition = Condition()


class consumer(Thread):
    def __init__(self):
        Thread.__init__(self)

    def consume(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 0:
            condition.wait()
            print("Consumer notify:no item to consum")
        items.pop()
        print("Consumer notify: consumed 1 item")
        print("Consumer notify: item to consume are:{0}".format(len(items)))

        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 20):
            time.sleep(2)
            self.consume()


class producer(Thread):
    def __init__(self):
        Thread.__init__(self)

    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 10:
            condition.wait()
            print("Producer notify:items producted are:{0}".format(len(items)))
            print("Producer notify:stop the production!!")
        items.append(1)
        print("Producer notify:total items producted:{0}".format(len(items)))
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 20):
            time.sleep(1)
            self.produce()


if __name__ == "__main__":
    producer = producer()
    consumer = consumer()
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

condition.acquire() is used to get the lock object. condition.wait() will make the current thread blocked until it receives condition.notify() signal. At the same time, the object that calls the notification of the signal also calls condition.release() in time to release resources.

Thread synchronization using events

Events are pairs used for communication between threads. Some threads wait for signals, while others send signals.

The sample code is shown below.

import time
from threading import Thread, Event
import random

items = []
event = Event()


class consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        while True:
            time.sleep(2)
            self.event.wait()
            item = self.items.pop()
            print('Consumer notify:{0} popped from list by {1}'.format(
                item, self.name))


class producer(Thread):
    def __init__(self, integers, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        global item
        for i in range(100):
            time.sleep(2)
            item = random.randint(0, 256)
            self.items.append(item)
            print('Producer notify: item  N° %d appended to list by %s' %
                  (item, self.name))
            print('Producer notify: event set by %s' % self.name)
            self.event.set()
            print('Produce notify: event cleared by %s ' % self.name)
            self.event.clear()


if __name__ == "__main__":
    t1 = producer(items, event)
    t2 = consumer(items, event)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

Simplify the code using with syntax

import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s')


def threading_with(statement):
    with statement:
        logging.debug("%s acquired via with" % statement)


def Threading_not_with(statement):
    statement.acquire()
    try:
        logging.debug("%s acquired directly " % statement)
    finally:
        statement.release()


if __name__ == "__main__":
    lock = threading.Lock()
    rlock = threading.RLock()
    condition = threading.Condition()
    mutex = threading.Semaphore(1)
    threading_synchronization_list = [lock, rlock, condition, mutex]

    for statement in threading_synchronization_list:
        t1 = threading.Thread(target=threading_with, args=(statement,))
        t2 = threading.Thread(target=Threading_not_with, args=(statement,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()

Using queue for thread communication

Queue has four common methods:

  • put(): Add an element to queue
  • get(): Delete an element from queue and return it
  • task_done(): This method needs to be called every time an element is processed
  • join(): All elements are blocked until they are processed
from threading import Thread, Event
from queue import Queue
import time
import random


class producer(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            print("Producer notify: item item N° %d appended to queue by %s" %
                  (item, self.name))
            time.sleep(1)


class consumer(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()
            print('Consumer notify : %d popped from queue by %s' %
                  (item, self.name))
            self.queue.task_done()


if __name__ == "__main__":
    queue = Queue()
    t1 = producer(queue)
    t2 = consumer(queue)
    t3 = consumer(queue)
    t4 = consumer(queue)
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t1.join()
    t2.join()
    t3.join()
    t4.join()

Process-based Parallelism

multiprocessing is a module in Python standard library, which implements the mechanism of shared memory.

Asynchronous programming

Using concurrent.futures module

The module has the functions of thread pool and process pool, managing parallel programming tasks, dealing with uncertain execution process, and process/thread synchronization. The module is composed of the following parts.

  • Conurrent. futures. Executor: This is a virtual base class that provides methods for asynchronous execution.
  • submit(function, argument): The execution of a scheduling function (callable object) that passes in argument as a parameter.
  • map(function, argument): Argument is executed as a parameter function in an asynchronous manner.
  • shutdown(Wait=True): Sends a signal to the executor to release all resources.
  • Conurrent. futures. Future: Includes asynchronous execution of functions. Future objects are instances of submit tasks (functions with parameters) to executor s.

The sample code is shown below.

import concurrent.futures
import time

number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


def evaluate_item(x):
    result_item = count(x)
    return result_item


def count(number):
    for i in range(0, 1000000):
        i = i + 1
    return i * number


if __name__ == "__main__":
    # Sequential execution
    start_time = time.time()
    for item in number_list:
        print(evaluate_item(item))
    print("Sequential execution in " + str(time.time() - start_time), "seconds")
    # Thread pool execution
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(evaluate_item, item)
                   for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in " +
          str(time.time() - start_time_1), "seconds")
    # Thread pool execution
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(evaluate_item, item)
                   for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Process pool execution in " +
          str(time.time() - start_time_2), "seconds")

Managing event loops using Asyncio

Python's syncio module provides methods for managing events, coroutines, tasks and threads, as well as primitives for writing concurrent code. The main components and concepts of this module include:

  • Event loop: In the Asyncio module, each process has an event loop.
  • COOPERATION: This is a generalization of subroutines. The process can be suspended during execution so that it can wait for external processing (such as IO) to complete and resume execution from where it was previously suspended.
  • Futures: Defines a Future object, which, like the concurrent.futures module, represents an incomplete computation.
  • Tasks: This is a subclass of Asyncio for encapsulating and managing protocols in parallel mode.

Asyncio provides the following ways to manage event cycles:

  • loop = get_event_loop(): Gets the event loop in the current context.
  • loop.call_later(time_delay, callback, argument): Delay time_delay seconds before executing the callback method.
  • loop.call_soon(callback, argument): Callback is called as soon as possible, and the call_soon() function ends, and the main thread will call callback as soon as it returns to the event loop.
  • loop.time(): Returns the internal time of the current time cycle in float type.
  • asyncio.set_event_loop(): Set the event loop for the current context.
  • asyncio.new_event_loop(): Create a new time loop and return based on this strategy.
  • loop.run_forever(): It will run until stop() is called.

The sample code is shown below.

import asyncio
import datetime
import time


def fuction_1(end_time, loop):
    print("function_1 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, fuction_2, end_time, loop)
    else:
        loop.stop()


def fuction_2(end_time, loop):
    print("function_2 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, function_3, end_time, loop)
    else:
        loop.stop()


def function_3(end_time, loop):
    print("function_3 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, fuction_1, end_time, loop)
    else:
        loop.stop()


def function_4(end_time, loop):
    print("function_4 called")
    if(loop.time() + 1.0) < end_time:
        loop.call_later(1, function_4, end_time, loop)
    else:
        loop.stop()


loop = asyncio.get_event_loop()

end_loop = loop.time() + 9.0
loop.call_soon(fuction_1, end_loop, loop)
loop.run_forever()
loop.close()

Using Asyncio to manage the Consortium

The sample code is shown below.

import asyncio
import time
from random import randint


@asyncio.coroutine
def StartState():
    print("Start State called \n")
    input_val = randint(0, 1)
    time.sleep(1)
    if input_val == 0:
        result = yield from State2(input_val)
    else:
        result = yield from State1(input_val)
    print("Resume of the Transition:\nStart State calling" + result)


@asyncio.coroutine
def State1(transition_value):
    outputVal = str("State 1 with transition value=%s \n" % (transition_value))
    input_val = randint(0, 1)
    time.sleep(1)
    print("...Evaluating...")
    if input_val == 0:
        result = yield from State3(input_val)
    else:
        result = yield from State2(input_val)


@asyncio.coroutine
def State2(transition_value):
    outputVal = str("State 2 with transition value= %s \n" %
                    (transition_value))
    input_Val = randint(0, 1)
    time.sleep(1)
    print("...Evaluating...")
    if (input_Val == 0):
        result = yield from State1(input_Val)
    else:
        result = yield from State3(input_Val)
    result = "State 2 calling " + result
    return outputVal + str(result)


@asyncio.coroutine
def State3(transition_value):
    outputVal = str("State 3 with transition value = %s \n" %
                    (transition_value))
    input_val = randint(0, 1)
    time.sleep(1)
    print("...Evaluating...")
    if(input_val == 0):
        result = yield from State1(input_val)
    else:
        result = yield from State2(input_val)
    result = "State 3 calling " + result
    return outputVal + str(result)


@asyncio.coroutine
def EndState(transition_value):
    outputVal = str("End State With transition value = %s \n" %
                    (transition_value))
    print("...Stop Computation...")
    return outputVal


if __name__ == "__main__":
    print("Finites State Machine simulation with Asyncio Coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())

Using Asyncio to Control Tasks

The sample code is shown below.

import asyncio


@asyncio.coroutine
def factorial(number):
    f = 1
    for i in range(2, number + 1):
        print("Asyncio.Task:Compute factorial(%s)" % (i))
        yield from asyncio.sleep(1)
        f *= i
    print("Asyncio.Task - factorial(%s) = %s" % (number, f))


@asyncio.coroutine
def fibonacci(number):
    a, b = 0, 1
    for i in range(number):
        print("Asyncio.Task:Complete fibonacci (%s)" % (i))
        yield from asyncio.sleep(1)
        a, b = b, a+b
    print("Asyncio.Task - fibonaci (%s)= %s" % (number, a))


@asyncio.coroutine
def binomialCoeff(n, k):
    result = 1
    for i in range(1, k+1):
        result = result * (n-i+1) / i
        print("Asyncio.Task:Compute binomialCoeff (%s)" % (i))
        yield from asyncio.sleep(1)
    print("Asyncio.Task - binomialCoeff (%s,%s) = %s" % (n, k, result))


if __name__ == "__main__":
    tasks = [asyncio.Task(factorial(10)), asyncio.Task(
        fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

Using Asyncio and Futures

The sample code is shown below.

import asyncio
import sys


@asyncio.coroutine
def first_coroutine(future, N):
    count = 0
    for i in range(1, N + 1):
        count = count + i
    yield from asyncio.sleep(4)
    future.set_result(
        "first coroutine (sum of N integers) result = " + str(count))


@asyncio.coroutine
def second_coroutine(future, N):
    count = 1
    for i in range(2, N + 1):
        count *= i
    yield from asyncio.sleep(3)
    future.set_result("second coroutine (factorial) result = " + str(count))


def got_result(future):
    print(future.result())


if __name__ == "__main__":
    N1 = 1
    N2 = 1
    loop = asyncio.get_event_loop()
    future1 = asyncio.Future()
    future2 = asyncio.Future()
    tasks = [
        first_coroutine(future1, N1),
        second_coroutine(future2, N2)
    ]
    future1.add_done_callback(got_result)
    future2.add_done_callback(got_result)
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

Distributed programming

slightly

GPU programming

slightly

Related reference

Keywords: Python Programming Lambda

Added by alexszilagyi on Fri, 13 Sep 2019 16:50:01 +0300