Tensorflow multithreaded input data processing framework -- queues and multithreading

Reference books

TensorFlow: a practical Google deep learning framework (version 2)

For queues, the operations to modify the queue status are Enqueue, EnqueueMany, and Dequeue. The following program shows how to use these functions to operate a queue.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8 

"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: queue_operate.py
@time: 2019/1/31 21:32
@desc: Operate on a queue
"""

import tensorflow as tf

# Create a first in, first out queue, specify that at most two elements can be saved in the queue, and specify the type as integer
q = tf.FIFOQueue(2, "int32")
# Use enqueue_many Function to initialize the elements in the queue. Similar to variable initialization, this initialization process needs to be explicitly called before using queues.
init = q.enqueue_many(([0, 10],))
# Use Dequeue Function to dequeue the first element in a queue. The value of this element will be a variable x in
x = q.dequeue()
# Value to be obtained+1
y = x + 1
# take+1 The value after is rejoined.
q_inc = q.enqueue([y])

with tf.Session() as sess:
    # Run the operation to initialize the queue
    init.run()
    for _ in range(5):
        # Function q_inc Elements that will execute data out of the queue and out of the queue+1,The entire process of rejoining a queue.
        v, _ = sess.run([x, q_inc])
        # Print out the value of the team element
        print(v)

Operation result:


tf.Coordinator is mainly used to stop multiple threads together. The following program shows how to use tf.Coordinator.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8 

"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: coordinator_test1.py
@time: 2019/2/2 21:35
@desc: tf.Coordinator It is mainly used to cooperate with multiple threads to stop together. The following program shows how to use tf.Coordinator
"""

import tensorflow as tf
import numpy as np
import threading
import time


# A program running in a thread. This program determines whether to stop and print its own program every 1 second ID. 
def MyLoop(coord, worker_id):
    # Use tf.Coordinator Class to determine whether the current thread needs to stop
    while not coord.should_stop():
        # Stop all threads at random.
        if np.random.rand() < 0.1:
            print("Stoping from id: %d\n" % worker_id)
            # call coord.request_stop()Function to notify other threads to stop.
            coord.request_stop()
        else:
            # Print the current thread's Id. 
            print("Working on id: %d\n" % worker_id)
        # Pause for one second
        time.sleep(1)


# Declare a tf.train.Coordinator Class to work with multiple threads.
coord = tf.train.Coordinator()
# Declaration to create 5 threads.
threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)]
# Start all threads
for t in threads:
    t.start()
# Wait for all threads to exit
coord.join(threads)

Run results:


How to use tf.QueueRunner and tf.Coordinator to manage multithreaded queue operations.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# coding=utf-8 

"""
@author: Li Tian
@contact: 694317828@qq.com
@software: pycharm
@file: queuerunner_test1.py
@time: 2019/2/3 12:31
@desc: How to use tf.QueueRunner and tf.Coordinator To manage multithreaded queue operations.
"""

import tensorflow as tf

# Declare a first in, first out queue with a maximum of 100 elements in the queue. The type is real
queue = tf.FIFOQueue(100, "float")
# Define queue entry
enqueue_op = queue.enqueue([tf.random_normal([1])])

# Use tf.train.QueueRunner To create multiple threads to run the queued operations.
# tf.train.QueueRunner The first parameter of gives the queue to be operated,[enqueue_op] * 5
# Indicates that 5 threads need to be started, and each thread runs enqueue_op operation
qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)

# Will be defined QueueRunner join Tensorflow Calculates the set specified on the graph.
# tf.train.add_queue_runner Function does not specify a collection
# Join the default collection tf.GraphKeys.QUEUE_RUNNERS. The following functions are just defined
# qr Add default tf.GraphKeys.QUEUE_RUNNER Set.
tf.train.add_queue_runner(qr)
# Define outbound operations
out_tensor = queue.dequeue()

with tf.Session() as sess:
    # Use tf.train.Coordinator To co start the thread.
    coord = tf.train.Coordinator()
    # Use tf.train.QueueRunner You need to explicitly call tf.train.start_queue_runners
    # To start all threads. Otherwise, because there is no thread running the queued operation, when the queued operation is called, the program will always
    # Wait for the join operation to be run. tf.train.start_queue_runners Function will start by default
    # tf.GraphKeys.QUEUE_RUNNERS All of the QueueRunner. Because this function value supports startup
    # Specify the QueueRunner,So in general tf.train.add_queue_runner Function sum
    # tf.trian.start_queue_runners The function specifies the same collection.
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    # Gets the value in the queue.
    for _ in range(3):
        print(sess.run(out_tensor)[0])

    # Use tf.train.Coordinator To stop all threads
    coord.request_stop()
    coord.join(threads)

Run results:

Keywords: Python Pycharm Session Google

Added by john_nyc on Thu, 05 Dec 2019 07:04:18 +0200