Production-consumption queue

In many cases, the producer-consumer model is used. For example, there are many tasks to be processed, but they can not be processed immediately. At this time, it is unavoidable to cache the tasks and digest them slowly. The most common is a one-to-one model, a producer, a consumer. Then there are several points to consider:

  1. Production speed is faster than consumption speed. At this time, queue support is needed. Tasks need to be cached down and digested slowly.
  2. Uniform digestion ensures that the interval between tasks is the same. In many cases, we have this requirement. For example, there are a lot of drawing tasks, which can not be given to the screen to draw uncontrollably. It may block the main thread. At this time, we need to leave the interactive space for the main thread.
  3. If the production speed is much faster than the consumption speed, we need to set the maximum queue length to protect our program.
  4. Task abandonment strategy, how to abandon if the task exceeds the queue (without considering this issue for the time being)
#include <functional>
#include <memory>
#include <thread>
#include <queue>
#include <condition_variable>

template<typename T>
class ConsumeQueue
{
	typedef std::function<void(T)> ConsumeFunction;
public:
	//consumeRate: Consumption rate (number of messages / 1s)
	ConsumeQueue(int consumeRate, int publishQueueMaxSize = 1000);
	~ConsumeQueue() { Stop(); };
	void Publish(T data);
	void Consume(ConsumeFunction consumeFunction);
	void Stop();
	void SetConsumeRate(int rate);
	int  GetConsumeRate();
	int  GetPublishQueueLength();
private:
	void ConsumeThreadFunction(ConsumeFunction consumeFunction);
private:
	std::thread*   m_thread = nullptr;
	//Producer queue
	std::queue<T>  m_publishQueue;
	int            m_consumeRate;
	int            m_publishQueueMaxSize;
	std::condition_variable m_wait;
	std::mutex              m_mut;
	bool           m_isStop = false;
};

template<typename T>
int ConsumeQueue<T>::GetPublishQueueLength()
{
	return m_publishQueue.size();
}

template<typename T>
int ConsumeQueue<T>::GetConsumeRate()
{
	return m_consumeRate;
}

template<typename T>
void ConsumeQueue<T>::SetConsumeRate(int rate)
{
	if (m_consumeRate == rate)
	{
		return;
	}
	m_consumeRate = rate;
}

template<typename T>
void ConsumeQueue<T>::Stop()
{
	{
		std::unique_lock<std::mutex> locker(m_mut);
		m_isStop = true;
		m_wait.notify_all();
	}
	if (m_thread)
	{
		m_thread->join();
		delete m_thread;
		m_thread = nullptr;
	}
}

template<typename T>
void ConsumeQueue<T>::Publish(T data)
{
	if (m_isStop)
	{
		return;
	}
	std::unique_lock<std::mutex> locker(m_mut);
	m_publishQueue.push(data);
	if (m_publishQueue.size() > m_publishQueueMaxSize)
	{
		m_publishQueue.pop();
	}
	m_wait.notify_all();
}

template<typename T>
void ConsumeQueue<T>::Consume(ConsumeFunction consumeFunction)
{
	m_thread = new std::thread([=]() {
		ConsumeThreadFunction(consumeFunction);
	});
}

template<typename T>
ConsumeQueue<T>::ConsumeQueue(int consumeRate, int publishQueueMaxSize)
{
	m_consumeRate = consumeRate;
	m_publishQueueMaxSize = publishQueueMaxSize;
}

template<typename T>
void ConsumeQueue<T>::ConsumeThreadFunction(ConsumeFunction consumeFunction)
{
	while (!m_isStop)
	{
		std::unique_lock<std::mutex> locker(m_mut);
		if (m_publishQueue.empty())
		{
			m_wait.wait(locker);
		}
		if (m_isStop)
		{
			return;
		}
		std::queue<T> consumeQueue;
		consumeQueue.swap(m_publishQueue);
		locker.unlock();
		while (!consumeQueue.empty() && !m_isStop)
		{
			consumeFunction(consumeQueue.front());
			consumeQueue.pop();
			std::this_thread::sleep_for(std::chrono::milliseconds(1000 / m_consumeRate));
		}
	}
}

Publish is the production interface, adding tasks to the queue, which can be function objects, string, int, and so on.

Consume accepts a lambda or a function, which is a consumer interface, that is to say, it spits out data here and can process data in this function. However, it should be noted that the Consume interface is executed in another thread. Note the purpose, the operation of ui drawing can not be put here.

Two parameters in the constructor: consumeRate refers to the number of tasks to consume in one second, and publishQueue MaxSize refers to the maximum length of the cache queue, beyond which new messages are added and the earliest messages are discarded.

Keywords: Lambda

Added by dmb on Mon, 12 Aug 2019 15:49:35 +0300