Daily - completable future asynchronous network car Hailing service timed out and was not closed

Project scenario:

KC project cooperates with several online car Hailing suppliers to request car Hailing through the API interface provided by the supplier. Due to the cooperation with multiple suppliers, it is required to call multiple suppliers at the same time, but if synchronous car Hailing is adopted, the waiting time will be too long. Therefore, multi-threaded asynchronous car Hailing is adopted during car Hailing. KC front-end H5 requests the interface to call the car into the Loading state, and the background sets the timeout time. Within the set time range, all suppliers respond or return the timeout. As long as one or more suppliers call the car successfully, otherwise, it returns the call failure and the user calls the car again.

After a successful call, the scheduled task requests the service provider interface to update the order status, and the relevant strategies cancel the call again. At the same time, when only one driver receives orders, other orders will be cancelled. If multiple drivers receive orders, they will be closed according to the supplier, cost and passenger selection in the strategy, leaving only one order.

The above is the calling scene of the project.

Problem Description:

One night in November 2021, after the release of the version, the team leader let go of a city to call a car according to the product requirements. As a result, when the user placed an order on the H5 page, multiple drivers of the supplier CC received the order. After canceling the order, the driver called abuse, and even individual personal safety threats. Therefore, the emergency shutdown of the taxi service that night showed the seriousness of the matter.

The next day, I came to the station and talked with the product about last night. The product said, "KC has not identified the problem yet. Maybe it will say goodbye to the team leader soon". I heard that, just last night, there was nothing important to do, and I began to look at the KC project that had been modified.

Cause analysis:

I have accessed the online car Hailing supplier RQ in KC project. The code structure of the project is OK. I only need to implement the corresponding online car Hailing interface, but I also have a general idea of the overall car Hailing process.

At first, I suspected that it was a problem of strategy, because the version was released last night and the rear management changed the vehicle strategy, and then this problem occurred. But there is no problem looking at the policy code. Finally, the problem point is located in combination with the log.

The calling code is optimized as follows:

package xianzhan.j17;

import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

/**
 * JDK 17
 *
 * @author xianzhan
 */
public class Main {

    private static final int CORE_NUM       = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = (CORE_NUM << 4) + (CORE_NUM << 2);
    private static final int MAX_POOL_SIZE  = CORE_POOL_SIZE << 2;

    /**
     * The specific configuration is set according to the actual needs of the project
     */
    private static final Executor EXECUTOR = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            60,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(MAX_POOL_SIZE),
            Thread::new,
            (r, e) -> e.execute(r)
    );

    private static final ScheduledExecutorService SCHEDULED_FUTURE = new ScheduledThreadPoolExecutor(
            1,
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("Timeout");
                return t;
            },
            (r, e) -> e.execute(r)
    );

    public static CompletableFuture<OrderCreateOut> orderCreate(OrderCreateIn in) {
        return CompletableFuture.supplyAsync(() -> {
            int spendTime = (int) (Math.random() * 10);
            System.out.printf("order - Start creating. threadId: %s, sleep: %ds, supplierCode: %s%n", Thread.currentThread().getId(), spendTime, in.supplierCode());
            try {
                // Simulate business execution
                TimeUnit.SECONDS.sleep(spendTime);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.printf("order - End creation. threadId: %s, sleep: %ds, supplierCode: %s%n", Thread.currentThread().getId(), spendTime, in.supplierCode());
            return new OrderCreateOut(in.supplierCode(), LocalDateTime.now().toString());
        }, EXECUTOR);
    }

    public static <T> CompletableFuture<T> timeout(long timeout, TimeUnit unit, String message) {
        CompletableFuture<T> ret = new CompletableFuture<>();
        SCHEDULED_FUTURE.schedule(() -> ret.completeExceptionally(new TimeoutException(message)), timeout, unit);
        return ret;
    }

    public static void main(String[] args) {
        System.out.println("main start: " + LocalDateTime.now());

        CallingStatus status = new CallingStatus();
        List<OrderCreateIn> orderCreateIns = List.of(
                new OrderCreateIn("DD"),
                new OrderCreateIn("CC"),
                new OrderCreateIn("SQ")
        );
        @SuppressWarnings("unchecked")
        CompletableFuture<OrderCreateOut>[] futureList = orderCreateIns.stream()
                .map(Main::orderCreate)
                .peek(future -> future.thenAccept(out -> status.setSuccess(true)))
                .toArray(CompletableFuture[]::new);
        CompletableFuture<Void> all = CompletableFuture.allOf(futureList);
        CompletableFuture<Void> timeout = timeout(15, TimeUnit.SECONDS, "Call timeout");
        CompletableFuture<Void> done = all.applyToEither(timeout, Function.identity());
        try {
            done.get();
        } catch (Exception e) {
            // The logging tool records exception information
            e.printStackTrace();
        }

        System.out.printf("Whether the user created the order successfully:%s%n", status.isSuccess());
        System.out.println("main end: " + LocalDateTime.now());
    }

    /**
     * @param supplierCode Supplier code
     */
    private static record OrderCreateIn(String supplierCode) {

    }

    /**
     * @param supplierCode Supplier code
     * @param orderId      Order id returned by successfully created supplier
     */
    private static record OrderCreateOut(String supplierCode, String orderId) {

    }

    private static class CallingStatus {
        /**
         * As long as one supplier is successful, it will be successful
         */
        private volatile boolean success;

        public boolean isSuccess() {
            return success;
        }

        public void setSuccess(boolean success) {
            this.success = success;
        }
    }
}

At first glance, the above code has no problem, that is, it asynchronously requests the supplier interface, and then sets the timeout. If no supplier succeeds within the time range, it returns failure. The problem lies in the successful order without processing timeout!

When we use multithreading, we can't write code with synchronous thinking. At first glance, if the timeout returns, we won't continue to execute, but multithreading will, so we need to deal with the overtime order.

Solution:

package xianzhan.j17;

import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

/**
 * JDK 17
 *
 * @author xianzhan
 */
public class Main {

    private static final int CORE_NUM       = Runtime.getRuntime().availableProcessors();
    private static final int CORE_POOL_SIZE = (CORE_NUM << 4) + (CORE_NUM << 2);
    private static final int MAX_POOL_SIZE  = CORE_POOL_SIZE << 2;

    /**
     * The specific configuration is set according to the actual needs of the project
     */
    private static final Executor EXECUTOR = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            60,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(MAX_POOL_SIZE),
            Thread::new,
            (r, e) -> e.execute(r)
    );

    private static final ScheduledExecutorService SCHEDULED_FUTURE = new ScheduledThreadPoolExecutor(
            1,
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("Timeout");
                return t;
            },
            (r, e) -> e.execute(r)
    );

    public static CompletableFuture<OrderCreateOut> orderCreate(OrderCreateIn in) {
        return CompletableFuture.supplyAsync(() -> {
            int spendTime = (int) (Math.random() * 10);
            System.out.printf("order - Start creating. threadId: %s, sleep: %ds, supplierCode: %s%n", Thread.currentThread().getId(), spendTime, in.supplierCode());
            try {
                // Simulate business execution
                TimeUnit.SECONDS.sleep(spendTime);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.printf("order - End creation. threadId: %s, sleep: %ds, supplierCode: %s%n", Thread.currentThread().getId(), spendTime, in.supplierCode());
            return new OrderCreateOut(in.supplierCode(), LocalDateTime.now().toString());
        }, EXECUTOR);
    }

    public static <T> CompletableFuture<T> timeout(long timeout, TimeUnit unit, String message) {
        CompletableFuture<T> ret = new CompletableFuture<>();
        SCHEDULED_FUTURE.schedule(() -> ret.completeExceptionally(new TimeoutException(message)), timeout, unit);
        return ret;
    }

    public static void main(String[] args) {
        System.out.println("main start: " + LocalDateTime.now());

        CallingStatus status = new CallingStatus();
        List<OrderCreateIn> orderCreateIns = List.of(
                new OrderCreateIn("DD"),
                new OrderCreateIn("CC"),
                new OrderCreateIn("SQ")
        );
        @SuppressWarnings("unchecked")
        CompletableFuture<OrderCreateOut>[] futureList = orderCreateIns.stream()
                // Filter suppliers that have timed out to prevent them from timeout again, resulting in the creation of multiple orders in suppliers
                // .filter(out -> !timeoutSupplierCode.contains(out.supplierCode()))
                .map(Main::orderCreate)
                .peek(future -> future.thenAccept(out -> {
                    status.setSuccess(true);
                    if (status.isTimeout()) {
                        /*
                         * Record the supplier whose request times out and process the overtime order
                         * 1. If the timeout order is not processed, the supplier will no longer request
                         * 2. Cancel the timeout order because it is not involved in policy selection
                         */
                        System.out.printf("Save processing timeout vendor(%s)And orders(%s)%n", out.supplierCode(), out.orderId());
                    }
                }))
                .toArray(CompletableFuture[]::new);
        CompletableFuture<Void> all = CompletableFuture.allOf(futureList);
        CompletableFuture<Void> timeout = timeout(6, TimeUnit.SECONDS, "Call timeout");
        CompletableFuture<Void> done = all.applyToEither(timeout, Function.identity());
        try {
            done.get();
        } catch (Exception e) {
            // The logging tool records exception information
            e.printStackTrace();
            status.setTimeout(true);
        }

        System.out.printf("Whether the user created the order successfully:%s%n", status.isSuccess());
        System.out.println("main end: " + LocalDateTime.now());
    }

    /**
     * @param supplierCode Supplier code
     */
    private static record OrderCreateIn(String supplierCode) {

    }

    /**
     * @param supplierCode Supplier code
     * @param orderId      Order id returned by successfully created supplier
     */
    private static record OrderCreateOut(String supplierCode, String orderId) {

    }

    private static class CallingStatus {
        /**
         * As long as one supplier is successful, it will be successful
         */
        private volatile boolean success;
        /**
         * overtime
         */
        private volatile boolean timeout;

        public boolean isSuccess() {
            return success;
        }

        public void setSuccess(boolean success) {
            this.success = success;
        }

        public boolean isTimeout() {
            return timeout;
        }

        public void setTimeout(boolean timeout) {
            this.timeout = timeout;
        }
    }
}

Three major changes have been made

  1. CallingStatus add volatile boolean timeout timeout judgment
  2. Create an order completabilefuture < ordercreateout > future use thenAccept() method to process the timeout order
  3. Use filter() to filter the last time-out suppliers until the time-out order is successfully cancelled by the policy

Finally, locate the problem, according to the log evidence of supplier request time and supplier response time, and bring the solution to the team leader.

summary

Pay more attention when using multithreading and use parallel thinking, otherwise the unprocessed situation will cause the company to lose money seriously!

Keywords: Java Multithreading

Added by The_Walrus on Fri, 19 Nov 2021 02:53:45 +0200