Reference books
TensorFlow: a practical Google deep learning framework (version 2)
For queues, the operations to modify the queue status are Enqueue, EnqueueMany, and Dequeue. The following program shows how to use these functions to operate a queue.
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: queue_operate.py @time: 2019/1/31 21:32 @desc: Operate on a queue """ import tensorflow as tf # Create a first in, first out queue, specify that at most two elements can be saved in the queue, and specify the type as integer q = tf.FIFOQueue(2, "int32") # Use enqueue_many Function to initialize the elements in the queue. Similar to variable initialization, this initialization process needs to be explicitly called before using queues. init = q.enqueue_many(([0, 10],)) # Use Dequeue Function to dequeue the first element in a queue. The value of this element will be a variable x in x = q.dequeue() # Value to be obtained+1 y = x + 1 # take+1 The value after is rejoined. q_inc = q.enqueue([y]) with tf.Session() as sess: # Run the operation to initialize the queue init.run() for _ in range(5): # Function q_inc Elements that will execute data out of the queue and out of the queue+1,The entire process of rejoining a queue. v, _ = sess.run([x, q_inc]) # Print out the value of the team element print(v)
Operation result:
tf.Coordinator is mainly used to stop multiple threads together. The following program shows how to use tf.Coordinator.
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: coordinator_test1.py @time: 2019/2/2 21:35 @desc: tf.Coordinator It is mainly used to cooperate with multiple threads to stop together. The following program shows how to use tf.Coordinator """ import tensorflow as tf import numpy as np import threading import time # A program running in a thread. This program determines whether to stop and print its own program every 1 second ID. def MyLoop(coord, worker_id): # Use tf.Coordinator Class to determine whether the current thread needs to stop while not coord.should_stop(): # Stop all threads at random. if np.random.rand() < 0.1: print("Stoping from id: %d\n" % worker_id) # call coord.request_stop()Function to notify other threads to stop. coord.request_stop() else: # Print the current thread's Id. print("Working on id: %d\n" % worker_id) # Pause for one second time.sleep(1) # Declare a tf.train.Coordinator Class to work with multiple threads. coord = tf.train.Coordinator() # Declaration to create 5 threads. threads = [threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5)] # Start all threads for t in threads: t.start() # Wait for all threads to exit coord.join(threads)
Run results:
How to use tf.QueueRunner and tf.Coordinator to manage multithreaded queue operations.
#!/usr/bin/env python # -*- coding: UTF-8 -*- # coding=utf-8 """ @author: Li Tian @contact: 694317828@qq.com @software: pycharm @file: queuerunner_test1.py @time: 2019/2/3 12:31 @desc: How to use tf.QueueRunner and tf.Coordinator To manage multithreaded queue operations. """ import tensorflow as tf # Declare a first in, first out queue with a maximum of 100 elements in the queue. The type is real queue = tf.FIFOQueue(100, "float") # Define queue entry enqueue_op = queue.enqueue([tf.random_normal([1])]) # Use tf.train.QueueRunner To create multiple threads to run the queued operations. # tf.train.QueueRunner The first parameter of gives the queue to be operated,[enqueue_op] * 5 # Indicates that 5 threads need to be started, and each thread runs enqueue_op operation qr = tf.train.QueueRunner(queue, [enqueue_op] * 5) # Will be defined QueueRunner join Tensorflow Calculates the set specified on the graph. # tf.train.add_queue_runner Function does not specify a collection # Join the default collection tf.GraphKeys.QUEUE_RUNNERS. The following functions are just defined # qr Add default tf.GraphKeys.QUEUE_RUNNER Set. tf.train.add_queue_runner(qr) # Define outbound operations out_tensor = queue.dequeue() with tf.Session() as sess: # Use tf.train.Coordinator To co start the thread. coord = tf.train.Coordinator() # Use tf.train.QueueRunner You need to explicitly call tf.train.start_queue_runners # To start all threads. Otherwise, because there is no thread running the queued operation, when the queued operation is called, the program will always # Wait for the join operation to be run. tf.train.start_queue_runners Function will start by default # tf.GraphKeys.QUEUE_RUNNERS All of the QueueRunner. Because this function value supports startup # Specify the QueueRunner,So in general tf.train.add_queue_runner Function sum # tf.trian.start_queue_runners The function specifies the same collection. threads = tf.train.start_queue_runners(sess=sess, coord=coord) # Gets the value in the queue. for _ in range(3): print(sess.run(out_tensor)[0]) # Use tf.train.Coordinator To stop all threads coord.request_stop() coord.join(threads)
Run results: