Learn more about Dio's CancelToken from the source code

The previous article talked about the use of Dio's CancelToken. This article analyzes how CancelToken can cancel network requests from the source code. Relevant contents are as follows:

  • Implementation of CancelToken class
  • CancelToken how to cancel a network request

CancelToken class

There is not much code for the CalcelToken class. We can directly copy it one by one.

import 'dart:async';
import 'dio_error.dart';
import 'options.dart';

/// You can cancel a request by using a cancel token.
/// One token can be shared with different requests.
/// when a token's [cancel] method invoked, all requests
/// with this token will be cancelled.
class CancelToken {
  CancelToken() {
    _completer = Completer<DioError>();
  }

  /// Whether is throw by [cancel]
  static bool isCancel(DioError e) {
    return e.type == DioErrorType.cancel;
  }

  /// If request have been canceled, save the cancel Error.
  DioError? _cancelError;

  /// If request have been canceled, save the cancel Error.
  DioError? get cancelError => _cancelError;

  late Completer<DioError> _completer;

  RequestOptions? requestOptions;

  /// whether cancelled
  bool get isCancelled => _cancelError != null;

  /// When cancelled, this future will be resolved.
  Future<DioError> get whenCancel => _completer.future;

  /// Cancel the request
  void cancel([dynamic reason]) {
    _cancelError = DioError(
      type: DioErrorType.cancel,
      error: reason,
      requestOptions: requestOptions ?? RequestOptions(path: ''),
    );
    _cancelError!.stackTrace = StackTrace.current;
    _completer.complete(_cancelError);
  }
}

First, look at the comments. We can see a very useful place of CancelToken. A CancelToken can be associated with multiple requests. When canceling, multiple associated requests can be cancelled at the same time. This is very useful when we have multiple requests for a page. Most are attributes:

  • _ cancelError: the cancellation error information stored after cancellation, which can be obtained externally through get.
  • _ Completer: a completer < dioerror > object. Completer is an abstract class used to manage asynchronous operation events. A Future object is returned during construction, and the corresponding complete (corresponding to normal completion) or completeError (corresponding to error handling) can be called. This property is private and cannot be accessed externally.
  • Requestoptions: the requestoptions object is an optional property of the request (such as headers, request parameters, request method, etc.), which can be empty. This property is a public property, indicating that it can be modified externally.
  • isCancelled: Boolean value, used to identify whether to cancel. In fact, it is through_ Whether cancelError is empty is determined. If it is not empty, it indicates that it has been cancelled.
  • whenCancel: Actually_ The future object of the completer can be used to process the response of the operation, which is also equivalent to_ The completer makes an encapsulation and only exposes its future object.
  • Cancel: cancel method, which is the core method, This method constructs a DioError object (used to store cancellation errors). Here, if the reason object is passed during the call, the reason will also be passed to the error parameter, followed by the RequestOptions parameter. If the RequestOptions is empty, an empty RequestOptions object will be constructed. At the same time, the current stack information will be stored in the stackTrace of _cancelerrorto facilitate the tracking of stack information. Finally Call_ completer.complete asynchronous method. This is the key method. Let's see what this method does.

Completer class

Let's go to the complete class to see what the complete method does:

/// All listeners on the future are informed about the value.
void complete([FutureOr<T>? value]);

You can see that this method is an abstract method, which means that it should be implemented by the concrete implementation class of the Completer. At the same time, we can see from the comments that this method is a value generic object that will call the listener to inform the complete method. It can be understood as notifying the observer to process the object. Then we can guess that if there is a cancelToken parameter during the request, a listener should be added to the cancelToken. Continue to look at the implementation of Dio's request code.

Dio's request code

To the source code of Dio When dart looks at it, he finds that all requests are actually aliases of fetch < T > (requestoptions), that is, all requests are actually completed through this method. Let's take a look at the source code of this method. The code is very long. If you are interested, you can read it carefully. We only find the code related to cancelToken here.

@override
Future<Response<T>> fetch<T>(RequestOptions requestOptions) async {
  if (requestOptions.cancelToken != null) {
    requestOptions.cancelToken!.requestOptions = requestOptions;
  }

  if (T != dynamic &&
      !(requestOptions.responseType == ResponseType.bytes ||
          requestOptions.responseType == ResponseType.stream)) {
    if (T == String) {
      requestOptions.responseType = ResponseType.plain;
    } else {
      requestOptions.responseType = ResponseType.json;
    }
  }

  // Convert the request interceptor to a functional callback in which
  // we can handle the return value of interceptor callback.
  FutureOr Function(dynamic) _requestInterceptorWrapper(
    void Function(
      RequestOptions options,
      RequestInterceptorHandler handler,
    )
        interceptor,
  ) {
    return (dynamic _state) async {
      var state = _state as InterceptorState;
      if (state.type == InterceptorResultType.next) {
        return listenCancelForAsyncTask(
          requestOptions.cancelToken,
          Future(() {
            return checkIfNeedEnqueue(interceptors.requestLock, () {
              var requestHandler = RequestInterceptorHandler();
              interceptor(state.data, requestHandler);
              return requestHandler.future;
            });
          }),
        );
      } else {
        return state;
      }
    };
  }

  // Convert the response interceptor to a functional callback in which
  // we can handle the return value of interceptor callback.
  FutureOr<dynamic> Function(dynamic) _responseInterceptorWrapper(
      interceptor) {
    return (_state) async {
      var state = _state as InterceptorState;
      if (state.type == InterceptorResultType.next ||
          state.type == InterceptorResultType.resolveCallFollowing) {
        return listenCancelForAsyncTask(
          requestOptions.cancelToken,
          Future(() {
            return checkIfNeedEnqueue(interceptors.responseLock, () {
              var responseHandler = ResponseInterceptorHandler();
              interceptor(state.data, responseHandler);
              return responseHandler.future;
            });
          }),
        );
      } else {
        return state;
      }
    };
  }

  // Convert the error interceptor to a functional callback in which
  // we can handle the return value of interceptor callback.
  FutureOr<dynamic> Function(dynamic, StackTrace stackTrace)
      _errorInterceptorWrapper(interceptor) {
    return (err, stackTrace) {
      if (err is! InterceptorState) {
        err = InterceptorState(assureDioError(
          err,
          requestOptions,
          stackTrace,
        ));
      }

      if (err.type == InterceptorResultType.next ||
          err.type == InterceptorResultType.rejectCallFollowing) {
        return listenCancelForAsyncTask(
          requestOptions.cancelToken,
          Future(() {
            return checkIfNeedEnqueue(interceptors.errorLock, () {
              var errorHandler = ErrorInterceptorHandler();
              interceptor(err.data, errorHandler);
              return errorHandler.future;
            });
          }),
        );
      } else {
        throw err;
      }
    };
  }

  // Build a request flow in which the processors(interceptors)
  // execute in FIFO order.

  // Start the request flow
  var future = Future<dynamic>(() => InterceptorState(requestOptions));

  // Add request interceptors to request flow
  interceptors.forEach((Interceptor interceptor) {
    future = future.then(_requestInterceptorWrapper(interceptor.onRequest));
  });

  // Add dispatching callback to request flow
  future = future.then(_requestInterceptorWrapper((
    RequestOptions reqOpt,
    RequestInterceptorHandler handler,
  ) {
    requestOptions = reqOpt;
    _dispatchRequest(reqOpt).then(
      (value) => handler.resolve(value, true),
      onError: (e) {
        handler.reject(e, true);
      },
    );
  }));

  // Add response interceptors to request flow
  interceptors.forEach((Interceptor interceptor) {
    future = future.then(_responseInterceptorWrapper(interceptor.onResponse));
  });

  // Add error handlers to request flow
  interceptors.forEach((Interceptor interceptor) {
    future = future.catchError(_errorInterceptorWrapper(interceptor.onError));
  });

  // Normalize errors, we convert error to the DioError
  return future.then<Response<T>>((data) {
    return assureResponse<T>(
      data is InterceptorState ? data.data : data,
      requestOptions,
    );
  }).catchError((err, stackTrace) {
    var isState = err is InterceptorState;

    if (isState) {
      if ((err as InterceptorState).type == InterceptorResultType.resolve) {
        return assureResponse<T>(err.data, requestOptions);
      }
    }

    throw assureDioError(
      isState ? err.data : err,
      requestOptions,
      stackTrace,
    );
  });
}

First, check whether the cancelToken of the current request requestOptions is empty at the beginning. If it is not empty, set the cancelToken requestOptions to the current request requestOptions, which is equivalent to caching all the request parameters in the cancelToken.

Next is the processing of interceptors, including request interceptors, response interceptors and error interceptors. A built-in interceptor packaging method is defined to encapsulate the interceptor as a functional callback for unified interception processing. Let's skip this. The key is that each interceptor's wrapper method has a listenCancelForAsyncTask method, When the interceptor status is next (indicating that there are interceptions to be processed), this method will be called and its return value will be returned. The first parameter of this method is cancelToken. From the method name, it is to listen to the cancellation event of asynchronous tasks and see what this method does.

Asynchronous task cancel event listening

The listenCancelForAsyncTask method is very simple. In fact, it returns a future Any object, and then in this future, if the cancelToken is not empty, subsequent processing will be performed in response to the cancellation event of cancelToken. Future. The feature of any is to assemble a series of asynchronous functions according to a unified interface and execute them in order (the next interceptor executes after the processing of the previous interceptor) to execute onValue (normal condition) and onError (abnormal condition) methods.

Here, if the cancelToken is not empty, the cancelToken cancellation event method will be put into the interceptor, and then the exception will be thrown when an exception occurs. This is actually equivalent to pre interception, that is, if the request has not been processed (not added to the processing queue), it is directly intercepted by the interceptor. If the request has been added to the processing queue, it needs to be processed in the queue scheduling.

static Future<T> listenCancelForAsyncTask<T>(
      CancelToken? cancelToken, Future<T> future) {
  return Future.any([
    if (cancelToken != null) cancelToken.whenCancel.then((e) => throw e),
    future,
  ]);
}
/// Returns the result of the first future in [futures] to complete.
///
/// The returned future is completed with the result of the first
/// future in [futures] to report that it is complete,
/// whether it's with a value or an error.
/// The results of all the other futures are discarded.
///
/// If [futures] is empty, or if none of its futures complete,
/// the returned future never completes.
static Future<T> any<T>(Iterable<Future<T>> futures) {
  var completer = new Completer<T>.sync();
  void onValue(T value) {
    if (!completer.isCompleted) completer.complete(value);
  }

  void onError(Object error, StackTrace stack) {
    if (!completer.isCompleted) completer.completeError(error, stack);
  }

  for (var future in futures) {
    future.then(onValue, onError: onError);
  }
  return completer.future;
}

Request scheduling

The actual request scheduling is in DIO_ mixin. In dart_ The dispatchRequest method is completed, which is actually invoked in the fetch method above. cancelToken is used in two places in this method. One is that the whenCancel attribute of cancelToken is passed in when using the fetch method of httpClientAdapter. The httpClientAdapter is referenced to notify the listener that the request is cancelled after the request is cancelled. In addition, a checkCancelled method is called to check whether to stop the request.

responseBody = await httpClientAdapter.fetch(
  reqOpt,
  stream,
  cancelToken?.whenCancel,
);

// If the request has been cancelled, stop request and throw error.
static void checkCancelled(CancelToken? cancelToken) {
  if (cancelToken != null && cancelToken.cancelError != null) {
    throw cancelToken.cancelError!;
  }
}

From here, we can roughly understand the basic mechanism. In fact, when we call the cancel method of cancelToken, we mark the error message cancelError of cancelToken to make_ dispatchRequest is used to detect whether to cancel when it is scheduled. In_ If it is detected that the cancelError is not empty in the dispatchRequest, a cancelError will be thrown to abort the current and next requests.

summary

From the source code, there are a lot of Future and various packaging methods, which are very difficult to read. This also shows the power of Dio, which makes ordinary people want to blow up these network request headers. In fact, from the source code and debugging trace, the mechanism of CancelToken is:

  • Cancel in advance: if the requested cancelToken is not empty, the asynchronous processing of cancelToken will be added to the interceptor. After cancellation, the request will be blocked directly in the interceptor phase and will not go to the subsequent scheduling phase.
  • Cancel in process: if the request has been added to the scheduling queue, an exception will be thrown to abort the issuance of the request.
  • Cancel after the event: the request has been sent out. When the server returns the result, it will be intercepted in the response processing phase (including errors) and stop the subsequent response processing. Even if the server returns data, it will be intercepted, but this is not of great practical significance. It can not reduce the load of the server, but just avoid the subsequent data processing process.

Keywords: iOS Android Flutter

Added by char skd1 on Sun, 19 Dec 2021 21:43:45 +0200