Tushare series starting from 0 - 5 Multithreading and traffic restrictions

According to the instructions of official documents https://waditu.com/document/2?doc_id=27 , there are two ways to get the daily market, one is to traverse the stock code, the other is to traverse the trading date. Theoretically, it will be faster to traverse the stock code when obtaining the full amount of data, because the current total number of stocks is 4493, while the trading date has 7450, and the number of cycles to traverse the stock code will be less. However, because my score is too low, the upper limit of data returned each time is 5000, so stocks with long listing time need to be queried twice. In addition, when taking incremental data, it must be better to implement it by date, so it is finally decided to implement it by transaction date.

At current points, tushare allows 500 visits per minute. When measuring a single thread, the local can only send about 400 requests per minute, which does not achieve the optimal execution efficiency. Therefore, the thread pool is introduced to fill the number of requests.

    def __get_stock_daily(self, start_date='19700101', max_worker=multiprocessing.cpu_count() * 2):
        df_cal_date = StockCalendar().query(
            fields='distinct cal_date',
            where=f'is_open=\'1\' and cal_date >=\'{start_date}\' and cal_date < \'{tomorrow()}\'',
            order_by='cal_date')

        with ThreadPoolExecutor(max_worker) as executor:
            future_to_date = \
                {executor.submit(self.__get_stock_daily_internal, ts_code='', trade_date=row['cal_date']): row
                 for index, row in df_cal_date.iterrows()}
            for future in as_completed(future_to_date):
                row = future_to_date[future]
                try:
                    data = future.result()
                except Exception as ex:
                    self.logger.error(f"failed to retrieve {row['cal_date']}")
                    self.logger.exception(ex)

The number of requests per minute of a single thread is too small, and the number of requests per minute is too large when there are multiple threads. After the threshold of the server is exceeded, the client will only receive the failure information. If you simply do so, on the one hand, it will waste network resources. On the other hand, you need to develop an additional monitoring and compensation mechanism locally to prevent missing data.

In order to avoid this situation, a current limiter needs to be added. The function of the current limiter is to limit the maximum number of client requests per minute to no more than 500. Once this limit is reached, all request threads will be blocked temporarily. When the number of requests is lower than the limit, all requests will be released.

When it is implemented, the proxy mode is adopted, which is in the pro of the original tushare_ Before the API actually sends the request, add__ allow_request judgment.

class ThrottleDataApi(object):

    class RequestRecord(object):
        def __init__(self, rate):
            self.request_times = []
            self.throttle_rate = rate
            self.event = threading.Event()
            self.reach_limit = False
            self.lock = threading.RLock()
            self.event.set()

    # func: ([requesttime...], rate, event)
    __request_records = {}

    def __init__(self, api=ts.pro_api()):
        self.api = api
        for key, rate in config.THROTTLE_RATES.items():
            ThrottleDataApi.__request_records[key] = ThrottleDataApi.RequestRecord(rate)

    def __getattr__(self, name):
        self.__allow_request(name)
        return partial(getattr(self.api, name))

    def __allow_request(self, name):
        record = ThrottleDataApi.__request_records.get(name)

        def timer_callback(request_record):
            event.set()
            request_record.reach_limit = False

        if record:
            history = record.request_times
            rate = record.throttle_rate
            event = record.event

            while history and history[-1] <= time.time() - 60:
                history.pop()
            if len(history) >= rate:
                with record.lock:
                    if not record.reach_limit:
                        event.clear()
                        waiting_seconds = 60 + history[-1] - time.time() + 1
                        threading.Timer(waiting_seconds, timer_callback, [record]).start()
                        record.reach_limit = True

            event.wait()
            history.insert(0, time.time())

The traffic limit is set through the configuration item. In case my score becomes more in the future, the configuration can be modified to allow more requests.

# func: times/min
THROTTLE_RATES = {
    'daily': 500,
    'adj_factor': 500
}

Considering the large amount of data, if you take all the data back and save it, the risk is relatively high. If there is an error in the middle, you have to start all over again, which is really not cost-effective. Here, the data is written to the database immediately after each acquisition. The framework can support this pattern as long as it is called in the sub method_ Save method, and then_ full and_ Just return None for delta.

    def __get_stock_daily_internal(self, ts_code, trade_date):
        df = pro.daily(ts_code=ts_code, trade_date=trade_date)
        self.logger.info(f'stock daily[{ts_code}, {trade_date}], length {len(df)}')
        self._save(df)

    def _full(self, **kwargs):
        self.__get_stock_daily()

        return None

    def _delta(self, **kwargs):
        df_origin = self.query(fields='max(trade_date)')
        if df_origin.empty:
            self.__get_stock_daily()
        else:
            self.__get_stock_daily(tomorrow(df_origin.iat[0, 0]))

        return None

Upload all codes to https://github.com/xiekeng/tushare-client , you can take what you are interested in.

Keywords: Python tushare

Added by tempi on Thu, 27 Jan 2022 18:50:14 +0200