Controlling internal queues with Java

There are many reasons why you should use internal queues in your program.

There are many good reasons to use internal queues in a program. Most common patterns contain the same principle - divide the processing into two separate parts, each of which can then work autonomously. Queues are the best way to move objects from one thread to another and ensure the correct visibility of all fields that belong to a particular transition object. This common pattern is called the "consumer producer" pattern.


Today, however, you want to focus more on potential failures, monitoring and how to avoid losing messages in the program.


Is there anything wrong with that? !

Suppose you are running microservice applications in a cloud environment on five instances, which usually deploy new versions within a few days. Your application has a REST endpoint that queues new tasks into an internal queue for processing and immediately returns OK. It then processes tasks asynchronously on another thread.


One day, you have to release a new version of microservice. This is easy; if you run on the cloud, just press the button, and everything will be deployed as an instance without downtime.


When you click the button, you may have made a serious mistake. You've lost all the queued tasks and may receive a lot of complaints from customers.

In fact, there are two ways to solve this problem:

  • You delete the internal queue and implement the external queue (for example, RabbitMQ), and do not confirm that the task has been processed until the end of processing. If you cut it off during deployment, you can reprocess the task when you start and run the new version of the application.

  • You can disconnect all callers from the application to stop populating the internal queue, wait for all tasks to complete, and then trigger the deployment.


However, how can I see that all tasks have been processed? How many tasks are in my internal queue? Dozens, hundreds or thousands? You may not know; it's hard to guess the percentage of processing time between your publishers and queue users.

In general, bounded queues tend to be full or absolutely empty, depending on whether the processing time ratio between publishers and consumers is stable or not. If your queue is relatively occupied by tasks at a certain peak (for example, between 8-11 p.m.) and you have enough time to process them at night, it's absolutely OK - of course, if you are willing to sacrifice the delay of a single task.

Even worse, you have unlimited queues to keep unprocessed tasks, and then, if the publisher is even a little faster than the consumer, you may end up with a large queue when you run the application.

This is the case when you run your own code and can decide which queue to use. You may even encounter this situation when internal queues are handled by any framework used in the application. However, let's focus on the situation where everything is in hand and you have the opportunity to make some changes in the internal queue that you will use at the end.


How to build the right opinion

Let's agree that we need more information about internal queues, and we can't just assume that our queues should be empty when we push a new version of the application into production. Unfortunately, there is no way to expose information about queues that belong to the JDK. Let's take a closer look and try to expose ourselves.

Starting from the foundation

In the first step, we will expose some basic information, which is available in the Queue interface of JDK.

public interface QueueMonitor {
    ThreadPoolExecutor executor();
    /**
     * Returns {@code true} if there is any thread executing any task.
     *
     * @return {@code true} if there is any active task.
     */
    default boolean isRunning() {
        return executor().getActiveCount() > 0;
    }
    /**
     * Returns the approximate number of threads that are actively
     * executing tasks.
     *
     * @return the number of threads
     */
    default int activeCount() {
        return executor().getActiveCount();
    }
    /**
     * Returns the approximate total number of tasks that have
     * completed execution. Because the states of tasks and threads
     * may change dynamically during computation, the returned value
     * is only an approximation, but one that does not ever decrease
     * across successive calls.
     *
     * @return the number of tasks
     */
    default long completedTasksTotal() {
        return executor().getCompletedTaskCount();
    }
    /**
     * Returns the approximate total number of tasks that have ever been
     * scheduled for execution. Because the states of tasks and
     * threads may change dynamically during computation, the returned
     * value is only an approximation.
     *
     * @return the number of tasks
     */
    default long enqueuedTasksTotal() {
        return executor().getTaskCount();
    }
    /**
     * Returns the approximate number of tasks that are current enqueued
     * and waiting to be scheduled for execution.
     *
     * @return number of enqueued tasks.
     */
    default long enqueuedTasksCurrent() {
        return executor().getQueue().size();
    }
    /**
     * Returns the {@link Stream stream} of currently enqueued tasks
     * in an internal queue.
     *
     * @return number of enqueued tasks.
     */
    default Stream<Runnable> enqueuedTasks() {
        return executor().getQueue().stream();
    }
}

 

If you keep all the threadpoolexecutors in this component's interface and provide them with the executor method, you will automatically get some basic information about the queue, which can be further exposed using the custom REST monitor API or JMX. It all depends on whether your service is internal (not exposed to the outside world) or whether you already have HTTP access to the application. If not, JMX might be a better way, depending on the circumstances and nature of the application.


Want to know more

Let's dig deeper for more information. Currently, we are able to list all queued tasks (unprocessed) and see some numbers describing how and how many tasks pass through the queue. However, we lack information about the tasks currently being performed. We can call on it some way to get some useful information about the exact object.

/**
 * A custom trackable thread pool which can keep and provide a currently running
 * task and is able to execute {@link TrackableRunnable} which keeps useful
 * information about the current execution.
 * <p>
 * This implementation follows configuration representing
 * {@link Executors#newSingleThreadExecutor()}, the tracking will stop working
 * with multiple workers, some additional changes needed to be done
 * to support multiple workers.
 */
public class TrackableSingleThreadPoolExecutor extends ThreadPoolExecutor {
    /*
     * Task must be held as a volatile variable even in SingleThreadedExecutor.
     * - A thread is destroyed and new one is recreated when an exception is thrown and caught.
     */
    private volatile TrackableRunnable activeTask;
    private TrackableSingleThreadPoolExecutor(ThreadFactory threadFactory) {
        super(1, 1, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
    }
    @Override
    protected void beforeExecute(Thread thread, Runnable runnable) {
        if (!(runnable instanceof TrackableRunnable)) {
            throw new IllegalArgumentException("Executed task must be an instance of "
                    + TrackableRunnable.class.getSimpleName());
        }
        this.activeTask = (TrackableRunnable) runnable;
    }
    @Override
    protected void afterExecute(Runnable runnable, Throwable thread) {
        this.activeTask = null;
    }
    public TrackableRunnable getActiveTask() {
        return activeTask;
    }
    /**
     * Keeps a context with an executed runnable. We can track information
     * about currently executed task.
     */
    public static class TrackableRunnable implements Runnable {
        private final Contextual context;
        public TrackableRunnable(Contextual context) {
            this.context = context;
        }
        @Override
        public void run() {
            // Some interesting computation.
        }
        public Contextual getContext() {
            return context;
        }
    }
}

 

As mentioned in JavaDoc, this implementation supports only one working program. I don't think it's hard to change the implementation to return a list of active tasks that retain some context information.


How to display information?

You can publish it in two simple ways:

JMX (Java management extension)

  • You just need to implement the MBean and expose what you're looking at

  • Start the MBean server so that it can connect to it in the following ways: JVisualVM or other tools

REST Monitor API

  • Use only when running internal applications, otherwise it can be useful to protect the endpoint in some way:

[
  {
    "executor": "food-preparation",
    "active": "spaghetti",
    "enqueued-tasks-current": 0,
    "enqueued-tasks-total": 6,
    "completed-tasks-total": 6,
    "enqueued-tasks": [
      "pizza",
      "caesar salad",
      "cheerios"
    ]
  },
  {
    "executor": "drink-preparation",
    "active": "cuba libre",
    "enqueued-tasks-current": 0,
    "enqueued-tasks-total": 6,
    "completed-tasks-total": 6,
    "enqueued-tasks": [
      "mojito",
      "beer"
    ]
  }
]

 


How to shut down the actuator normally?

This is another way to help you run out of queues before restarting your application in a cloud environment. In general, Kubernetes is able to wait for the JVM to terminate and execute the close hook

You only need to configure ThreadPoolExecutor shutdown() to call in the shutdown hook

Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdown));

 

However, you will encounter several problems:

  • Termination can be delayed for a longer time, especially if your unlimited queue is full of tasks.

  • You need to make sure that you no longer accept any tasks, because all tasks will be rejected by the performer, and you should specify the performer's behavior when using the appropriate RejectedExecutionHandler implementation.

  • It's better to protect the mission again (especially the important mission). I mean implementing a mechanism that does not acknowledge a rejected message and then returns it to an external queue, for example, an external queue that can wait for a new normal instance and then process it. There may be problems when calling our application through the REST API and automatically rejecting the call and losing the transaction / task.



Thank you for reading! More in-depth discussion welcome message or private letter!

Talk about those things in detail

Keywords: Java REST JDK RabbitMQ

Added by freynolds on Fri, 20 Dec 2019 13:23:15 +0200