Python implements quantitative stock selection

What is stock selection?

Stock selection is an active investment strategy. First, analyze the prospect of a single stock according to some rules or algorithms, and then build a portfolio for long-term holding. Generally, the stocks of the portfolio are required to have low correlation, so as to hedge the systemic risk. Otherwise, when the market weakens, the portfolio will also face huge downside risk.

What model is used?

On how to select stocks, academia has put forward many different models, and the most classic is Markowitz's portfolio theory. Here, we use the MM trend template, which is a technical stock selection method proposed by a legendary foreign investment master. The core idea is to measure the stock kinetic energy through technical indicators, select the most potential stocks, buy and hold them.

MM trend model

  1. The stock price is higher than the 150 day moving average and the 200 day moving average
  2. The 150 day moving average is higher than the 200 day moving average
  3. The 200 day moving average rose for at least one month
  4. The 50 day moving average is higher than the 150 day moving average and the 200 day moving average
  5. The stock price is higher than the 50 day moving average
  6. The stock price is 30% higher than the 52 week low
  7. The stock price is within 25% of the 52 week high
  8. The relative strength index (RS) is greater than or equal to 70. Here, the relative strength refers to the comparison between the stock and the market. Rs = 1-year return of the stock / 1-year return of the benchmark index

About Mark Minervini

One of the most famous traders in the United States, who once earned a yield of 30000%, was called a billionaire before the age of 34. See the book "financial geek" for details.

Technical problems faced by stock selection?

  1. Where can I get a lot of historical data of stocks?
  2. How to improve computing performance when there are a large number of stocks?

This paper will use Python to realize the quantitative stock selection of MM model, and solve the above two technical problems.

import os
import datetime as dt
import time
from typing import Any, Dict, Optional, List

import requests
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import talib
import multiprocessing as mp
from requests.exceptions import ConnectionError, Timeout

%matplotlib inline"fivethirtyeight")

1. Obtain historical data from hummingbird data

Hummingbird data It is an emerging financial data provider, providing real-time quotation and historical data including stocks, foreign exchange, commodity futures and digital currency, and providing API interface , it is a convenient channel for all financial practitioners to obtain free data.

## Write custom functions and obtain data through API

def fetch_trochil(url: str,
                  params: Dict[str, str],
                  attempt: int = 3,
                  timeout: int = 3) -> Dict[str, Any]:
    """decorate requests.get function"""
    for i in range(attempt):
            resp = requests.get(url, params, timeout=timeout)
            data = resp.json()["data"]
            if not data:
                raise Exception("empty dataset")
            return data
        except (ConnectionError, Timeout) as e:
            i += 1
            time.sleep(i * 0.5)

def fetch_cnstocks(apikey: str) -> pd.DataFrame:
    """Get from hummingbird data A Stock product list"""
    url = ""
    params = {"apikey": apikey}

    res = fetch_trochil(url, params)

    return pd.DataFrame.from_records(res)

def fetch_daily_ohlc(symbol: str,
                     date_from: dt.datetime,
                     date_to: dt.datetime,
                     apikey: str) -> pd.DataFrame:
    """Get from hummingbird data A Stock day chart history K Line"""
    url = ""
    params = {
        "symbol": symbol,
        "start_date": date_from.strftime("%Y-%m-%d"),
        "end_date": date_to.strftime("%Y-%m-%d"),
        "freq": "daily",
        "apikey": apikey

    res = fetch_trochil(url, params)

    return pd.DataFrame.from_records(res)

def fetch_index_ohlc(symbol: str,
                     date_from: dt.datetime,
                     date_to: dt.datetime,
                     apikey: str) -> pd.DataFrame:
    """Obtain the daily chart historical data of the stock index"""
    url = ""
    params = {
        "symbol": symbol,
        "start_date": date_from.strftime("%Y-%m-%d"),
        "end_date": date_to.strftime("%Y-%m-%d"),
        "apikey": apikey

    res = fetch_trochil(url, params)

    return pd.DataFrame.from_records(res)

1.1 product list

First obtain all stock ID s of Shanghai and Shenzhen A-share listed enterprises.

apikey = os.getenv("TROCHIL_API")  # use your apikey
cnstocks = fetch_cnstocks(apikey)

Successfully obtained the product information of 3789 stocks of Shanghai and Shenzhen A shares. The prefix 'SH' represents the stocks of Shanghai Stock Exchange and 'SZ' represents the stocks of Shenzhen Stock Exchange. Only the stocks of Shanghai Stock Exchange are used in modeling.

# Filter stocks prefixed with 'SH'
cnstocks_shsz = cnstocks.query("symbol.str.startswith('SH')")

1.2 historical data of individual stocks

Obtain the daily chart historical price of stocks in Shanghai Stock Exchange from hummingbird data. According to the MM trend model, we need at least the historical data of the past 260 days. Some newly listed or delisted stocks may not meet the requirements, so we exclude the stocks with less than 260 K-line.


# Download historical data from 2019 to the present
# Stocks with K-line less than 260 trading days shall be excluded during downloading
date_from = dt.datetime(2019, 1, 1)
date_to =
symbols = cnstocks_shsz.symbol.to_list()
min_klines = 260

# Download one by one. The API of hummingbird data has no minute request limit
# First store the data in the list, and then merge and clean it after downloading
ohlc_list = []
for symbol in symbols:
        ohlc = fetch_daily_ohlc(symbol, date_from, date_to, apikey)
        if ohlc is not None and len(ohlc) >= min_klines:
            ohlc.set_index("datetime", inplace=True)
    except Exception as e:

CPU times: user 21.7 s, sys: 349 ms, total: 22 s
Wall time: 49.3 s

It takes less than a minute to download the historical data of more than 1500 stocks (about 400 trading days). Next, we integrate and clean the data, and then store it locally for subsequent analysis.

ohlc_joined = pd.concat(ohlc_list)

<class 'pandas.core.frame.DataFrame'>
Index: 532756 entries, 2019-01-02 to 2020-07-29
Data columns (total 6 columns):
 #   Column  Non-Null Count   Dtype  
---  ------  --------------   -----  
 0   open    532756 non-null  float64
 1   high    532756 non-null  float64
 2   low     532756 non-null  float64
 3   close   532756 non-null  float64
 4   volume  532756 non-null  float64
 5   symbol  532756 non-null  object 
dtypes: float64(5), object(1)
memory usage: 28.5+ MB

Check for missing values.


open      0
high      0
low       0
close     0
volume    0
symbol    0
dtype: int64

Save locally and store in csv format. Later, you can read data directly from the local to avoid the waste of time caused by API requests.

ohlc_joined.to_csv("cnstock_daily_ohlc.csv", index=True)

1.3 Shanghai Stock Index

Obtain the historical price of Shanghai stock index and calculate the cumulative rate of return in the past year, which is used to calculate the relative strength of individual stocks.

benchmark = fetch_index_ohlc("shci", date_from, date_to, apikey)

# Calculate the 1-year cumulative rate of return, which is calculated by 252 trading days in 1 year
benchmark_ann_ret = benchmark.close.pct_change(252).iloc[-1]


2. Stock selection

def screen(close: pd.Series, benchmark_ann_ret: float) -> pd.Series:
    """realization MM The logic of stock selection model to evaluate whether a single stock meets the screening conditions

        close(pd.Series): Stock closing price, default time series index
        benchmark_ann_ret(float): The 1-year yield of the benchmark index is used to calculate the relative strength
    # Calculate the daily moving average of 50150200
    ema_50 = talib.EMA(close, 50).iloc[-1]
    ema_150 = talib.EMA(close, 150).iloc[-1]
    ema_200 = talib.EMA(close, 200).iloc[-1]

    # The 20 day movement of the 200 day moving average is smooth, which is used to judge whether the 200 day moving average rises
    ema_200_smooth = talib.EMA(talib.EMA(close, 200), 20).iloc[-1]

    # 52 week high and 52 week low of closing price
    high_52week = close.rolling(52 * 5).max().iloc[-1]
    low_52week = close.rolling(52 * 5).min().iloc[-1]

    # Latest closing price
    cl = close.iloc[-1]

    # Screening condition 1: the closing price is higher than the 150 day moving average and the 200 day moving average
    if cl > ema_150 and cl > ema_200:
        condition_1 = True
        condition_1 = False

    # Screening condition 2: the 150 day moving average is higher than the 200 day moving average
    if ema_150 > ema_200:
        condition_2 = True
        condition_2 = False

    # Screening condition 3: the daily average of 200 rises by 1 month
    if ema_200 > ema_200_smooth:
        condition_3 = True
        condition_3 = False

    # Screening condition 4: the 50 day moving average is higher than the 150 day moving average and the 200 day moving average
    if ema_50 > ema_150 and ema_50 > ema_200:
        condition_4 = True
        condition_4 = False

    # Screening condition 5: the closing price is higher than the 50 day moving average
    if cl > ema_50:
        condition_5 = True
        condition_5 = False

    # Screening condition 6: the closing price is 30% higher than the 52 week low
    if cl >= low_52week * 1.3:
        condition_6 = True
        condition_6 = False

    # Screening condition 7: the closing price is within 25% of the 52 week high
    if cl >= high_52week * 0.75 and cl <= high_52week * 1.25:
        condition_7 = True
        condition_7 = False

    # Screening condition 8: the relative strength index is greater than or equal to 70
    rs = close.pct_change(252).iloc[-1] / benchmark_ann_ret * 100
    if rs >= 70:
        condition_8 = True
        condition_8 = False

    # Judge whether the stock meets the standard
    if (condition_1 and condition_2 and condition_3 and
        condition_4 and condition_5 and condition_6 and
        condition_7 and condition_8):
        meet_criterion = True
        meet_criterion = False

    out = {
        "rs": round(rs, 2),
        "close": cl,
        "ema_50": ema_50,
        "ema_150": ema_150,
        "ema_200": ema_200,
        "high_52week": high_52week,
        "low_52week": low_52week,
        "meet_criterion": meet_criterion

    return pd.Series(out)

2.1 synchronization

Firstly, we use the synchronous method to filter, and apply the same filter function to 1400 stocks.

# Only select stocks with sufficient historical data
symbols_to_screen = list(ohlc_joined.symbol.unique())

# Convert the format of data frame from long format to wide format
ohlc_joined_wide = ohlc_joined.pivot(columns="symbol", values="close").fillna(method="ffill")



results = ohlc_joined_wide.apply(screen, benchmark_ann_ret=benchmark_ann_ret)
results = results.T

CPU times: user 2.97 s, sys: 6.47 ms, total: 2.98 s
Wall time: 2.97 s

Synchronous calculation takes about 3 seconds, which is acceptable in the research stage, but not in the production stage. Imagine you make the stock selection system into a product. After the user selects the conditions and clicks the filter, it takes at least 3 seconds to get the results, which will lead to a very bad user experience. Next, we try to solve this problem with multiple processes.

Let's first look at the stocks that meet the conditions?

results.query("meet_criterion == True").sort_values("rs", ascending=False)

389 stocks are eligible. From the perspective of quantitative trading, it seems that they have not successfully selected potential stocks. Of course, this is related to the selection of parameters.

Whether the model is effective is not the subject of this article (we will explore it in other articles), so don't pay too much attention to this first.

2.2 multi process

Next, try to speed up the stock selection process with multiple processes to see if you can reduce the screening time to less than 1 second. The core idea of multiprocess computing is divide and conquer, distribute similar computing tasks to different CPU s, and finally summarize the results. Here, multiprocessing is used to realize multiprocessing.


# Define worker function
def screen_stocks(df: pd.DataFrame, benchmark_ann_ret: float) -> pd.DataFrame:
    results = df.apply(screen, benchmark_ann_ret=benchmark_ann_ret)
    return results.T

# To split the data frame, first try to split the data frame into four parts (divided by columns) with four processes
df_chunks = np.array_split(ohlc_joined_wide, 4, axis=1)

# Use multiprocessing The pool object manages the process pool
with mp.Pool(processes=4) as p:
    future_results = [p.apply_async(
        screen_stocks, kwds={"df": df, "benchmark_ann_ret": benchmark_ann_ret}) for df in df_chunks]
    results = pd.concat([r.get() for r in future_results])

CPU times: user 934 ms, sys: 204 ms, total: 1.14 s
Wall time: 1.06 s

Using four processes, we successfully shorten the calculation time to about 1 second, and get the same results.

results.query("meet_criterion == True").sort_values("rs", ascending=False)

Next, test the relationship between the number of processes and the calculation time to determine the optimal number of processes.

max_processors = mp.cpu_count()

time_used = {}
for processors in range(1, max_processors + 1):
    df_chunks = np.array_split(ohlc_joined_wide, processors, axis=1)
    t0 = time.time()
    with mp.Pool(processors) as p:
        future_results = [p.apply_async(
            screen_stocks, kwds={"df": df, "benchmark_ann_ret": benchmark_ann_ret}) for df in df_chunks]
        results = pd.concat([r.get() for r in future_results])
    elapsed = time.time() - t0
    time_used[processors] = elapsed

fig, ax = plt.subplots(figsize=(12, 7))
ax = sns.pointplot(x=list(time_used.keys()), y=list(time_used.values()))
ax.set_xlabel("CPU cores")
ax.set_ylabel("Time used(seconds)")
ax.set_title("Computation time vs CPU Cores", loc="left")


As can be seen from the figure above, the calculation time is reduced by half when using two processes (as expected). As the number of processes approaches the maximum number of processes, the decrease of computing time continues to decline, which is not difficult to understand. Because the computer is processing other tasks at the same time, it is impossible to make full use of all processes even if processors=12 is set. From the current situation, it is appropriate to use four processes to process, which can reduce the time from 3.5 seconds to about 1 second.

3. Summary

This article describes how to use Python for quantitative stock selection, including:

  1. Obtain the historical data of Shanghai and Shenzhen A shares from hummingbird data.
  2. User defined functions realize the stock selection logic of MM model.
  3. Multi process computing greatly reduces the time of filtering.

The next research direction is backtracking test. Build the portfolio according to MM model and optimize the screening parameters to see whether it can bring excess returns.

If you like our articles, remember to praise and collect them. We will continue to bring you high-quality articles in the field of data science and quantitative trading.

[about us]

Hummingbird data : open source financial database, aggregating 10000 + time series of mainstream financial markets, providing high-quality free data for the majority of financial practitioners. Our advantages: 1 At the same time, provide real-time quotation and historical data of stocks, foreign exchange and commodity futures; 2. Provide a highly unified API interface. You can integrate the data into your own program and view our API documentation.

This is an era of big data. The mission of hummingbird data is to create wealth with data.

Added by Koobi on Sat, 19 Feb 2022 03:27:06 +0200