本篇复现的研报与上一篇为同一系列,在上一篇的基础上进行的深入研究。
由于市场体制、投资者结构、投资者教育等多方面的原因A股市场投机性较强,既然不能改变A股投机的事实,我们不妨研究如何在投机市场中获利。通过对研报思路的复现过程,我们获得了与研报一致的结果,买入投机程度弱的股票,卖出过度投机的股票即可获取超额收益。
股票的投机程度虽然不能被直接观测,但投机程度高的股票往往伴随着一定的交易行为特征,通过对这些交易行为特征的刻画可变相考察个股的投机程度。我们通过特征波动率、特异度、价格时滞、市值调整换手分别度量股票的波动率高低、个股收益能否被市场风格解释、股价能否反应市场公共信息(市场指数)、和股票的换手程度。这四个指标描述的都是过去一段时间内个股的交易行为特征,我们统称为交易行为类指标。
特质波动率、特异度、价格时滞、流通市值调整换手历史上均表现出较强的收益预测能力,和我们预期的一致,过去一段时间内被过度投机的股票下期收益率普遍偏低,相反较为“正常”的股票反而更有可能获取超额收益。具体到各指标,特异度的正端超额收益最大,年均超越市场等权13.3%,流通市值调整换手正端收益也较大,但风险相对较高,相比之下,特质波动率和价格时滞较为平淡。IC方面,特质波动率和流通市值调整换手IC均值较高,但IR稳定性较差;特异度因子IC均值略低,但稳定性最好;价格时滞因子在四个因子中表现最差。
个股的投机最后必然会表现在股票的交易层面,我们通过波动率高低、个股收益能否被市场风格解释、股价能否及时反应市场公共信息、以及股票的换手程度等四个维度来刻画个股的投机性程度。我们认为被过度投机的股票一般具有下述特征:
高波动性:被投机的股票一般多空分歧较大,多空博弈的结果导致大的波动。由于个股的某些波动是由市场风格变动引起的,所以我们用剔除市场风格波动后的特质波动率来衡量个股的波动程度。
风格独立:一般而言,被过度投机的个股在交易时多依赖于其自身的私有信息,无视市场的涨跌或风格,以此投机性强的股票与市场风格关联性差,市场风格对其收益的解释程度弱。我们构建了特异度指标用来度量个股收益中不能被市场风格解释的成分占比。
价格时滞大:正常情况下个股的股价能够及时的反应市场公共信息(市场指数),但是当个股被过度投机时可能会过度关注个股层面的信息,导致市场层面的消息可能反应不足。我们通过价格时滞因子度量个股是否及时反应了市场公共信息。
高换手:换手率是交易员常用的用来甄别投机性股票的指标,投资者投机股票时一般追求的短期收益,成交频繁,博取短期收益,因此股票投机性强时自然伴随着较高的换手率。考虑到市值和换手的相关性,我们用流通市值调整后的换手率来度量个股的换手特征。
特质波动率、特异度、价格时滞、换手率四个指标描述的都是过去一段时间内(滞后的指标)个股的交易行为特征,我们可以统称为交易行为类指标。股票当前被过度投机时,后期大概率下跌,作为股票投机程度代理变量的交易行为类指标取值越高,股票越有可能被过度投机,后期应该对应着较大的负向alpha,相反,取值越小,越有可能获得正向alpha。
该部分我们将分别介绍特质波动率、特异度、价格时滞、流通市值调整换手的定义、由来、及历史表现情况等。在市场有效性验证方面,有如下几个细节需要说明:
(1) 回测的样本空间为剔除上市不满6个月、ST、*ST后的全部A股;
(2) 由于系统数据只能获取到2005年1月1日之后的,因此除价格时滞因子外,其他因子回测时间均为2005年2月27日至2019年2月27日。价格时滞因子由于需要前三期市场数据做回归,因此开始日期推后三期;
(3) 基于全样本计算信息系数或者相关系数时,每个指标均经过了横截面标准化处理,以剔除指标在时间轴上波动的影响;
(4) 分组检验有效性时将样本空间内所有股票分成10组,每月底调仓,构建等权组合,基准为市场等权组合;
(5) 因子收益率为样本空间内因子值最小的1/3股票构建的流通市值加权组合与最大1/3股票组合收益率之差。
特质波动率率度量的是剔除了市场常见风格波动后剩余的个股特有的波动率,用于度量归属于个股的特有风险。
注:该方程也即Fama-French三因子方程,详细说明可参看上一篇文章,年化参数在复现时统一为252,与研报243不同。
我们通过信息比率IC和分组检验两种方法验证了特质波动率预测次月收益率的能力,结果发现低特质波动率的股票拥有明显的正向超额收益。从2005年至今的长样本来看,特质波动率和次月收益率的spearman相关系数高达-0.098,IR为-0.831,负显著比例较高。特质波动率的分组结果也强有力的支持了这个结论,超额收益率、夏普比率均表现出显著单调性。多空组合表现也较稳定。
特异度反应了股票收益中不能被市场、规模、估值三种常见的风格因子解释的成分占比,特异度越高说明个股涨跌与市场风格的相关性越小,交易中市场层面的信息占比越低,个股层面的信息占比越高。股票的特异度越高,表示市场风格越不能解释其股票收益,过去一段时间内被过度投机的可能性和程度越大。
过去14年里,特异度因子表现极佳,不仅有较高的超额收益,更是具有单因子中罕见的稳定性。spearman秩相关系数均值为-0.090,正显著比例不足10%,IR高达-0.946。十分组年化超额收益和夏普比率也表现出了极强的单调性,多空分化更是明显。
价格时滞度量中涉及到参数滞后期n,我们分别计算了n=3、n=5时的价格时滞,发现价格时滞对参数n不敏感,两者相关系数高达0.93,下文中我们取滞后期n=3的价格时滞因子。
由于价格时滞因子表现略差,且后文可被其他因子解释替代,这里我们简单展示一下其相关结果。
一般来讲,大市值股票换手率较低,小市值股票换手率较高,因此市值因素对换手率因子影响极大。
回归残差项我们用来作为剔除市值后的换手率代理变量,我们称之为流通市值调整换手。
流通市值调整换手相对换手率对股票收益的预测能力大幅增强,spearman秩相关系数均值高达-0.105,流通市值调整换手各分组也表现出相当高的单调性(第1组表现相对较差可能是因为第1组中大市值股票占比仍然较高)和显著的多空分化。但其IC正显著比例较高,IR较低,说明该因子稳定性较差,风险较其他因子略高。
总体而言,各交易行为类因子的表现符合我们的预期,过去一段时间被过度投机的股票下期收益率普遍偏低,同时较为“正常”的股票后期表现相对更好。具体到各指标,特异度指标带来的超额收益最明显,同时风险最小,流通市值调整换手的超额收益也较高,但风险较大,特质波动率和价格时滞的表现相对前两者较为平淡,但仍有可观的超额收益。
为了进一步验证交易行为类因子间的相互替代性,同时考察多个因子对一个交易行为类指标的解释程度,我们应用了因子收益率回归的方法,具体做法如下:以一个或多个因子的因子收益率(月度)为解释变量,通过线性回归方程解释某一个交易行为类指标的因子收益率,观察回归截距项是否显著判定解释变量中的信息能够完全解释该指标的超额收益成分。
特质波动率因子收益率对特异度和流通市值调整换手因子收益率回归结果中,特异度和流通市值调整换手的系数项显著性极高,说明特异度和流通市值调整换手对特质波动率均有一定的解释程度,回归的截距项不显著,也说明特异度和流通市值调整换手几乎全部解释了特质波动率的超额收益来源。同样,在控制特异度后,价格时滞因子收益率的截距项也变得不显著,特异度因子也能够解释价格时滞的收益率来源。
特异度因子收益率对其他交易行为类因子收益率的回归结果显示,特质波动率、价格时滞、流通市值调整换手因子均对特异度因子收益率有一定的解释程度,但剔除这些影响后的截距项仍然显著,说明其他交易行为类因子不能完全解释特异度的超额收益成分,这一结论在加入Fama-French的三个因子收益率和动量因子收益率后仍成立。流通市值调整换手的回归结果和特异度类似,其他交易行为类因子对流通市值调整换手因子均有一定的解释程度,但不能完全解释其超额收益,在一结论在控制Fama-French三因子和动量效应后依然显著。需要提醒的是,在回归关系中我们发现特异度和市值调整换手相互间的回归系数均显著为负,这也说明两者之间几乎没有替代关系,甚至存在部分相关抵消的信息成分。
研究假设市场投机氛围仍较浓,若市场风格转变,对研究结论影响尚不可知。事实上,近几年由于价值投资理念的普及,文中提到的因子的有效性均有所下降,但目前依然显著有效。
from jqdata import *
from jqlib.technical_analysis import *
from jqfactor import *
from scipy import stats
from statsmodels import regression
import datetime
import pandas as pd
import numpy as np
import statsmodels.api as sm
import warnings
warnings.filterwarnings('ignore')
#获取每月最后一个交易日(其余频次是获取第一个交易日,暂未改动)
def get_tradeday_list(start,end,frequency=None,count=None):
if count != None:
df = get_price('000001.XSHG',end_date=end,count=count)
else:
df = get_price('000001.XSHG',start_date=start,end_date=end)
if frequency == None or frequency =='day':
return df.index
else:
df['year-month'] = [str(i)[0:7] for i in df.index]
if frequency == 'month':
return df.drop_duplicates('year-month', keep = 'last').index
elif frequency == 'quarter':
df['month'] = [str(i)[5:7] for i in df.index]
df = df[(df['month']=='01') | (df['month']=='04') | (df['month']=='07') | (df['month']=='10') ]
return df.drop_duplicates('year-month').index
elif frequency =='halfyear':
df['month'] = [str(i)[5:7] for i in df.index]
df = df[(df['month']=='01') | (df['month']=='07')]
return df.drop_duplicates('year-month').index
#获取股票池
def get_stock(stockPool, begin_date):
if stockPool == 'HS300':
stockList = get_index_stocks('000300.XSHG', begin_date)
elif stockPool == 'ZZ500':
stockList = get_index_stocks('399905.XSHE', begin_date)
elif stockPool == 'ZZ800':
stockList = get_index_stocks('399906.XSHE', begin_date)
elif stockPool == 'A':
stockList = list(get_all_securities(['stock'], date = begin_date).index)
#剔除ST股
st_data = get_extras('is_st', stockList, count = 1,end_date = begin_date)
stockList = [stock for stock in stockList if not st_data[stock][0]]
#剔除*st股票
stockList = [stock for stock in stockList if '*' not in get_security_info(stock).display_name]
#剔除上市不足6个月的新股
stockList = delete_new(stockList, begin_date, n = 183)
#剔除停牌
suspended_info_df = get_price(stockList, end_date = begin_date, count = 1, frequency = 'daily', fields = 'paused')['paused']
stockList = [stock for stock in stockList if suspended_info_df[stock][0] == 0]
return stockList
#剔除新股
def delete_new(stocks, beginDate, n = 365):
stockList = []
for stock in stocks:
start_date = get_security_info(stock).start_date
if start_date < (beginDate - datetime.timedelta(days = n)):
stockList.append(stock)
return stockList
def get_factor_data(stocks, start_date, end_date):
if factor == 'IVol':
factor_data = FF3(stocks, start_date, end_date)
elif factor == 'IVR':
factor_data = FF3(stocks, start_date, end_date)
elif factor == 'PriceDelay':
factor_data = PD(stocks, start_date, end_date)
elif factor == 'adjTurnover':
factor_data = adjTurnover(stocks, end_date)
return factor_data
#求Fama-French三因子模型特质波动率
def FF3(stocks, start_date, end_date, rf = 0.04):
LoS=len(stocks)
#查询三因子的语句
q = query(
valuation.code,
valuation.circulating_market_cap,
(balance.total_owner_equities/valuation.circulating_market_cap/100000000.0).label("BP"),
#indicator.roe,
#balance.total_assets.label("Inv")
).filter(
valuation.code.in_(stocks)
)
df = get_fundamentals(q, start_date)
df.index = df['code']
del df['code']
#中性化
#df = neutralize(df, how = ['sw_l1', 'market_cap'], date = end_date, axis = 0)
n = int(LoS/3)
#选出特征股票组合
S=df.sort_values('circulating_market_cap').index.tolist()[: n]
B=df.sort_values('circulating_market_cap').index.tolist()[LoS - n:]
L=df.sort_values('BP').index.tolist()[: n]
H=df.sort_values('BP').index.tolist()[LoS - n:]
df5 = df['circulating_market_cap']
# 获得样本期间的股票价格并计算日收益率
df2 = get_price(stocks, start_date = start_date, end_date = end_date, fields=['close'])['close']
df4 = df2.pct_change()
df4.dropna(how ='all', inplace = True)
df4.fillna(0, inplace = True)
#求因子的值,按流通市值加权
SMB = list(np.dot(df4[S], df5.loc[S] / df5.loc[S].sum()) - np.dot(df4[B], df5.loc[B] / df5.loc[B].sum()))
HML = list(np.dot(df4[H], df5.loc[H] / df5.loc[H].sum()) - np.dot(df4[L], df5.loc[L] / df5.loc[L].sum()))
#用股票池,流通市值为权重作为市场基准
df6 = df5.loc[df4.columns]
df6.fillna(df5.mean(), inplace = True)
RM = list(np.dot(df4, df6 / df6.sum()) - rf/252)
if len(SMB) > len(RM):
SMB.drop(SMB.index[0], inplace = True)
HML.drop(HML.index[0], inplace = True)
#将因子们计算好并且放好
X = pd.DataFrame({"RM":RM, "SMB":SMB, "HML":HML})
# 对样本数据进行线性回归并计算残差
t_scores=[0.0] * LoS
for i in range(LoS):
t_stock = stocks[i]
t_r = linreg(X, df4[t_stock] - rf/252)
t_scores[i] = t_r
#这个scores就是残差
scores = pd.DataFrame({'score': t_scores}, index = stocks)
return scores
#9
# 辅助线性回归的函数
# 输入:X:回归自变量 Y:回归因变量 完美支持list,array,DataFrame等三种数据类型
# 输出:参数估计结果-list类型
def linreg(X,Y):
X=sm.add_constant(array(X, dtype=float))
Y=array(Y)
results = sm.OLS(Y, X).fit()
if factor == 'IVol':
return results.resid.std() * sqrt(252)
elif factor == 'IVR':
return (1 - results.rsquared)
elif factor == 'PriceDelay':
return results.rsquared
elif factor == 'adjTurnover':
return results.resid
#计算价格时滞
def PD(stocks, minus1_date, end_date, rf = 0.04):
X1 = MKT(stocks, minus1_date, end_date, n = 0)
X2 = MKT(stocks, minus1_date, end_date, n = 3)
quote = get_price(stocks, start_date = minus1_date, end_date = end_date, fields=['close'])['close']
ret = quote.pct_change()
ret.dropna(how ='all', inplace = True)
ret.fillna(0, inplace = True)
ret = ret - rf/252
#OLS计算hetero
PriceDelay = pd.DataFrame(index = ['PriceDelay'])
for stock in ret.columns:
Y1 = ret[stock][:len(X1)]
Y2 = ret[stock][:len(X2)]
r0 = linreg(X1 - rf/252, Y1)
r3 = linreg(X2 - rf/252, Y2)
PriceDelay[stock] = 1 - r0/r3
PriceDelay = PriceDelay.T
#返回特质波动率vol
return PriceDelay
#计算滞后n期的市场因子
def MKT(stocks, minus1_date, end_date, n):
if n == 0:
period = [minus1_date, end_date]
else:
period = list(get_tradeday_list(start = None, end = minus1_date, frequency = 'month', count = n * 24).date)
period.append(end_date)
X = pd.DataFrame(columns = period[1:], index = range(24))
for i in range(len(period) - 1):
#设置统计范围
quote = get_price(stocks, start_date = period[i], end_date = period[i + 1], fields=['close'])['close']
ret = quote.pct_change()
ret.dropna(how ='all', inplace = True)
ret.fillna(0, inplace = True)
#构造市场基准收益:流通市值加权
q = query(valuation.circulating_market_cap, valuation.code).filter(valuation.code.in_(stocks))
df = get_fundamentals(q, period[i])
df.index = df['code']
del df['code']
#df = (df * 1e8).apply(np.log)
df = df/df.sum()
if len(ret.columns) > len(df):
ret = ret[df.index.tolist()]
elif len(ret.columns) < len(df):
df = df.loc[ret.columns.tolist()]
else:
pass
final = pd.DataFrame(np.dot(ret, df), columns = ['final'])
X[period[i + 1]] = final['final']
X.dropna(how = 'any', inplace = True)
return X
#市值调整换手
def adjTurnover(stocks, end_date):
VOL20 = get_factor_values(stocks, ['VOL20'], end_date = end_date, count = 1)['VOL20'].T
VOL20.replace(0, np.nan, inplace = True)
VOL20 = VOL20.apply(np.log)
VOL20.columns = ['VOL20']
q = query(valuation.code, valuation.circulating_market_cap).filter(valuation.code.in_(stocks))
df = get_fundamentals(q, end_date)
df.index = df['code']
del df['code']
VOL20['circulating_market_cap'] = (df['circulating_market_cap'] * 1e8).apply(np.log)
VOL20.dropna(how = 'any', inplace = True)
X = VOL20['circulating_market_cap']
Y = VOL20['VOL20']
l = linreg(X,Y)
adjTurnover = pd.DataFrame(l, index = VOL20.index, columns = ['adjTurnover'])
return adjTurnover
#因子值数据处理,最后一个参数决定是否做市值行业中性化,默认做
def data_preprocessing(factor_data, stockList, industry_code, date, ind_neutral = True):
#去极值,参数来自光大研报
factor_data = winsorize_med(factor_data, scale = 3, inf2nan = False, axis = 0)
#缺失值处理
nan_ratio = 1 - factor_data.count() / len(factor_data)
if nan_ratio[0] > 0.2:
print(str(date) + '数据缺失率为:' + str(nan_ratio))
factor_data = replace_nan_indu(factor_data, stockList, industry_code, date)
#标准化处理
factor_data = standardlize(factor_data, axis = 0)
#中性化处理
if ind_neutral == True:
factor_data = neutralize(factor_data, how = ['sw_l1', 'market_cap'], date = date, axis = 0)
return factor_data
#缺失值处理
def replace_nan_indu(factor_data, stockList, industry_code, date):
#把nan用行业中位数代替,依然会有nan,此时用所有股票中位数代替
i_Constituent_Stocks = {}
data_temp = pd.DataFrame(index = industry_code, columns = factor_data.columns)
for i in industry_code:
temp = get_industry_stocks(i, date)
i_Constituent_Stocks[i] = list(set(temp).intersection(set(stockList)))
data_temp.loc[i] = nanmedian(array(factor_data.loc[i_Constituent_Stocks[i], :]))
for factor in data_temp.columns:
#行业缺失值用所有行业中位数代替
null_industry = list(data_temp.loc[pd.isnull(data_temp[factor]), factor].keys())
for i in null_industry:
data_temp.loc[i, factor] = nanmedian(array(data_temp[factor]))
#查询空值所在位置并填充
null_stock = list(factor_data.loc[pd.isnull(factor_data[factor]), factor].keys())
for i in null_stock:
industry = get_industry_name(i_Constituent_Stocks, i)
if industry:
factor_data.loc[i, factor] = data_temp.loc[industry[0], factor]
else:
factor_data.loc[i, factor] = nanmedian(array(factor_data[factor]))
return factor_data
#取股票对应行业
def get_industry_name(i_Constituent_Stocks, value):
return [k for k, v in i_Constituent_Stocks.items() if value in v]
#获取所需数据并存储
def get_df(period):
#申万更新以前
industry_old_code=['801010','801020','801030','801040','801050','801080','801110','801120','801130','801140','801150',\
'801160','801170','801180','801200','801210','801230']
#申万更新以后
industry_new_code = ['801010','801020','801030','801040','801050','801080','801110','801120','801130','801140','801150',\
'801160','801170','801180','801200','801210','801230','801710','801720','801730','801740','801750',\
'801760','801770','801780','801790','801880','801890']
#时段内全部因子值及当期收益率
df = pd.DataFrame(index = get_stock(universe, period[-1]))
for i in range(1, len(period) - 1):
if period[i] < datetime.datetime.date(datetime.datetime.strptime('2014-02-21', '%Y-%m-%d')):
industry_code = industry_old_code
else:
industry_code = industry_new_code
all_stock = get_stock(universe, period[i])
#获取数据
data = get_factor_data(all_stock, period[i - 1], period[i])
#处理数据
p_data = data_preprocessing(data, all_stock, industry_code, period[i], ind_neutral = False)
p_data.columns = [str(period[i]) + factor]
df = pd.concat([df, p_data], axis = 1)
price = get_price(p_data.index.tolist(), start_date = period[i], end_date = period[i + 1], fields = ['close'])['close']
#无法相对于中证全指的超额收益,因为2005年还没有
ret = pd.DataFrame((price.iloc[-1, :] / price.iloc[0, :]) - 1, columns = [str(period[i]) + '收益率'])
ret.fillna(nanmedian(ret), inplace = True)
df = pd.concat([df, ret], axis = 1)
df.dropna(how = 'all', inplace = True)
return df
#计算IC
def get_IC(df):
#IC值序列
IC = []
S = []
for i in range(1, len(period) - 1):
raw = pd.DataFrame(index = df.index)
raw['0'] = df[str(period[i]) + factor]
raw['1'] = df[str(period[i]) + '收益率']
raw.dropna(inplace = True)
ic, p = stats.spearmanr(raw, axis = 0, nan_policy = 'omit')
IC.append(ic)
#显著性检验
if p < 0.05:
if ic > 0:
S.append('pos')
elif ic < 0:
S.append('neg')
else:
S.append('non')
else:
S.append('non')
return IC, S
#汇总结果
def draw_conclusion(period):
df = get_df(period)
#f, t_f = get_f_t(df)
IC, S = get_IC(df)
conclusion = pd.DataFrame(index = [factor])
conclusion['IC均值'] = nanmean(IC)
conclusion['IC正显著比例'] = S.count('pos') / len(S)
conclusion['IC负显著比例'] = S.count('neg') / len(S)
conclusion['abs(IC)>0.02比例'] = len([i for i in IC if abs(i) > 0.02]) / len(IC)
conclusion['IR'] = nanmean(IC) / std(IC)
print(conclusion.T)
IC = pd.DataFrame(IC, index = period[1: len(period) - 1], columns = ['IC'])
IC['index'] = range(len(IC))
IC['S'] = S
IC['pos'] = IC.loc[IC['S'] == 'pos', 'IC']
IC['neg'] = IC.loc[IC['S'] == 'neg', 'IC']
IC['non'] = IC.loc[IC['S'] == 'non', 'IC']
fig1 = plt.figure(2, figsize=(30,5))
plt.scatter(IC['index'], IC['pos'], c = 'r')
plt.scatter(IC['index'], IC['neg'], c = 'b')
plt.scatter(IC['index'], IC['non'], c = 'grey')
plt.grid()
plt.tick_params(labelsize = 20)
plt.rcParams['axes.unicode_minus'] = False
#画多空组合收益率图
r = get_Return_per_day(period, df)
return_line(r, period)
return None
#多空一十组合
def get_Return_per_day(period, df):
#IC = pd.DataFrame()
r = []
for i in range(1, len(period) - 1):
abin_sorted = df[str(period[i]) + factor].dropna().sort_values()
#分组,取第一组和第十组
n = int(len(abin_sorted)/10)
long = abin_sorted.index.tolist()[0:n]
short = abin_sorted.index.tolist()[-n:]
#多头
long_close = df[str(period[i]) + '收益率'].loc[long]
#空头
short_close = df[str(period[i]) + '收益率'].loc[short]
#计算收益率
r.append(long_close.mean() - short_close.mean())
return r
#画多空组合收益率图
def return_line(r, period):
r = np.array(r) + 1
p = []
m = 1
p.append(m)
for i in range(len(r)):
m *= r[i]
p.append(m)
d = period[0: len(p)]
p = pd.Series(p, index = d)
fig1 = plt.figure(1, figsize=(30,5))
plt.plot(p.index, p)
plt.grid()
plt.tick_params(labelsize = 20)
plt.rcParams['axes.unicode_minus'] = False
return None
#控制获取数据时段(因为数据库只到2005年1月1日,且IC计算要前一个月的数据,因此从1月底开始计算)
start_date = '2005-01-31'
end_date = '2019-02-27'
period = get_tradeday_list(start = start_date, end = end_date, frequency = 'month').date
#所需股票池,可选'HS300','ZZ500','ZZ800','A'
global universe
universe = 'A'
#所需因子IVol
global factor
factor = 'IVol'
draw_conclusion(period)
#所需因子IVR
global factor
factor = 'IVR'
draw_conclusion(period)
#所需因子adjTurnover
global factor
factor = 'adjTurnover'
draw_conclusion(period)
#控制获取数据时段(因为数据库只到2005年1月1日,且IC计算要前一个月的数据,因此从1月底开始计算)
start_date = '2005-04-30'
end_date = '2019-02-27'
period = get_tradeday_list(start = start_date, end = end_date, frequency = 'month').date
#所需股票池,可选'HS300','ZZ500','ZZ800','A'
global universe
universe = 'A'
分组回测用自己的代码,数据处理使回测与研究模块保持一致
#所需因子PriceDelay
global factor
factor = 'PriceDelay'
draw_conclusion(period)
#回测用代码
code = '''
# 导入函数库
from jqdata import *
from jqfactor import *
import datetime as dt
import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels import regression
# 初始化函数,设定基准等等
def initialize(context):
# 设定中证全指作为基准
set_benchmark({'000002.XSHG': 0.5, '399107.XSHE': 0.5})
# 开启动态复权模式(真实价格)
set_option('use_real_price', True)
log.set_level('order', 'error')
#第几组
g.group = 1
g.method = 'adjTurnover'
## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
# 开盘前运行
run_monthly(before_market_open, monthday = -1, time='before_open', reference_security = '399107.XSHE')
# 开盘时运行
run_monthly(market_open, monthday = -1, time='open', reference_security = '399107.XSHE')
## 开盘前运行函数
def before_market_open(context):
#设置排序方式
if g.method == 'BP':
g.asc = False
else:
g.asc = True
#设置滑点、手续费
set_slip_fee(context)
#取全A作为股票池
all_stocks = list(get_all_securities(['stock'], date = context.current_dt).index)
feasible_stocks = set_feasible_stocks(context, all_stocks)
if g.method == 'IVol' or g.method == 'IVR':
factor = FF3(feasible_stocks, context.current_dt)
elif g.method == 'PriceDelay':
factor = PD(feasible_stocks, context.current_dt)
elif g.method == 'adjTurnover':
factor = adjTurnover(feasible_stocks, context.current_dt)
#排序
factor = factor.sort(g.method, ascending = g.asc)
n = int(len(factor)/10)
#分组取样
if g.group == 10:
g.tobuy_list = factor.index[(g.group - 1) * n :]
else:
g.tobuy_list = factor.index[(g.group - 1) * n : g.group * n]
#1
#设置可行股票池,剔除st、停牌股票,输入日期
def set_feasible_stocks(context, stockList):
#剔除ST股
st_data = get_extras('is_st', stockList, count = 1, end_date = context.current_dt)
stockList = [stock for stock in stockList if not st_data[stock][0]]
#剔除*st股票
stockList = [stock for stock in stockList if '*' not in get_security_info(stock).display_name]
#剔除上市不足6月的新股
stockList = delete_new(stockList, context.current_dt, n = 183)
#剔除停牌
suspended_info_df = get_price(stockList, end_date = context.current_dt, count = 1, frequency = 'daily', fields = 'paused')['paused']
stockList = [stock for stock in stockList if suspended_info_df[stock][0] == 0]
return stockList
#剔除新股
def delete_new(stocks, beginDate, n = 365):
stockList = []
for stock in stocks:
start_date = get_security_info(stock).start_date
if start_date < dt.datetime.date(beginDate - dt.timedelta(days = n)):
stockList.append(stock)
return stockList
# 根据不同的时间段设置滑点与手续费
def set_slip_fee(context):
# 将滑点设置为0
set_slippage(FixedSlippage(0))
# 根据不同的时间段设置手续费
dt=context.current_dt
if dt>datetime.datetime(2013,1, 1):
set_commission(PerTrade(buy_cost=0.0003,
sell_cost=0.0013,
min_cost=5))
elif dt>datetime.datetime(2011,1, 1):
set_commission(PerTrade(buy_cost=0.001,
sell_cost=0.002,
min_cost=5))
elif dt>datetime.datetime(2009,1, 1):
set_commission(PerTrade(buy_cost=0.002,
sell_cost=0.003,
min_cost=5))
else:
set_commission(PerTrade(buy_cost=0.003,
sell_cost=0.004,
min_cost=5))
## 开盘时运行函数
def market_open(context):
#调仓,先卖出股票
for stock in context.portfolio.long_positions:
if stock not in g.tobuy_list:
order_target_value(stock, 0)
#再买入新股票
total_value = context.portfolio.total_value # 获取总资产
for i in range(len(g.tobuy_list)):
value = total_value / len(g.tobuy_list) # 确定每个标的的权重
order_target_value(g.tobuy_list[i], value) # 调整标的至目标权重
#查看本期持仓股数
print(len(context.portfolio.long_positions))
#求Fama-French三因子模型特质波动率
def FF3(stocks, end_date, rf = 0.04):
LoS=len(stocks)
#查询三因子/五因子的语句
q = query(
valuation.code,
valuation.circulating_market_cap,
(balance.total_owner_equities/valuation.circulating_market_cap/100000000.0).label("BP"),
#indicator.roe,
#balance.total_assets.label("Inv")
).filter(
valuation.code.in_(stocks)
)
start_date = list(get_tradeday_list(start = None, end = end_date, frequency = 'month', count = 24).date)[0]
df = get_fundamentals(q, start_date)
df.index = df['code']
del df['code']
#中性化
#df = neutralize(df, how = ['sw_l1', 'market_cap'], date = start_date, axis = 0)
#选出特征股票组合
S=df.sort('circulating_market_cap').index.tolist()[:LoS/3]
B=df.sort('circulating_market_cap').index.tolist()[LoS-LoS/3:]
L=df.sort('BP').index.tolist()[:LoS/3]
H=df.sort('BP').index.tolist()[LoS-LoS/3:]
df5 = df['circulating_market_cap']
# 获得样本期间的股票价格并计算日收益率
df2 = get_price(stocks, start_date = start_date, end_date = end_date, fields=['close'])['close']
df4 = df2.pct_change()
df4.dropna(how ='all', inplace = True)
df4.fillna(0, inplace = True)
#求因子的值,按流通市值加权
SMB = list(np.dot(df4[S], df5.loc[S] / df5.loc[S].sum()) - np.dot(df4[B], df5.loc[B] / df5.loc[B].sum()))
HML = list(np.dot(df4[H], df5.loc[H] / df5.loc[H].sum()) - np.dot(df4[L], df5.loc[L] / df5.loc[L].sum()))
#用股票池,流通市值为权重作为市场基准
df6 = df5.loc[df4.columns]
df6.fillna(df5.mean(), inplace = True)
RM = list(np.dot(df4, df6 / df6.sum()) - rf/252)
if len(SMB) > len(RM):
SMB.drop(SMB.index[0], inplace = True)
HML.drop(HML.index[0], inplace = True)
#将因子们计算好并且放好
X=pd.DataFrame({"RM":RM, "SMB":SMB, "HML":HML})
# 对样本数据进行线性回归并计算ai
t_scores=[0.0] * LoS
for i in range(LoS):
t_stock = stocks[i]
t_r = linreg(X, df4[t_stock] - rf/252)
t_scores[i] = t_r
scores = pd.DataFrame({g.method: t_scores}, index = stocks)
return scores
#9
# 辅助线性回归的函数
# 输入:X:回归自变量 Y:回归因变量 完美支持list,array,DataFrame等三种数据类型
# 输出:参数估计结果-list类型
def linreg(X,Y):
X=sm.add_constant(array(X))
Y=array(Y)
results = sm.OLS(Y, X).fit()
if g.method == 'IVol':
return results.resid.std() * sqrt(252)
elif g.method == 'IVR':
return (1 - results.rsquared)
elif g.method == 'PriceDelay':
return results.rsquared
elif g.method == 'adjTurnover':
return results.resid
#计算价格时滞
def PD(stocks, end_date, rf = 0.04):
minus1_date = list(get_tradeday_list(start = None, end = end_date, frequency = 'month', count = 24).date)[0]
X1 = MKT(stocks, minus1_date, end_date, n = 0)
X2 = MKT(stocks, minus1_date, end_date, n = 3)
quote = get_price(stocks, start_date = minus1_date, end_date = end_date, fields=['close'])['close']
ret = quote.pct_change()
ret.dropna(how ='all', inplace = True)
ret.fillna(0, inplace = True)
ret = ret - rf/252
#OLS计算hetero
PriceDelay = pd.DataFrame(index = [g.method])
for stock in ret.columns:
Y1 = ret[stock][:len(X1)]
Y2 = ret[stock][:len(X2)]
r0 = linreg(X1 - rf/252, Y1)
r3 = linreg(X2 - rf/252, Y2)
PriceDelay[stock] = 1 - r0/r3
PriceDelay = PriceDelay.T
#返回价格时滞
return PriceDelay
#计算滞后n期的市场因子
def MKT(stocks, minus1_date, end_date, n):
if n == 0:
period = [minus1_date, end_date]
else:
period = list(get_tradeday_list(start = None, end = minus1_date, frequency = 'month', count = n * 24).date)
period.append(end_date)
X = pd.DataFrame(columns = period[1:], index = range(24))
for i in range(len(period) - 1):
#设置统计范围
quote = get_price(stocks, start_date = period[i], end_date = period[i + 1], fields=['close'])['close']
ret = quote.pct_change()
ret.dropna(how ='all', inplace = True)
ret.fillna(0, inplace = True)
#构造市场基准收益:流通市值加权
q = query(valuation.circulating_market_cap, valuation.code).filter(valuation.code.in_(stocks))
df = get_fundamentals(q, period[i])
df.index = df['code']
del df['code']
#df = (df * 1e8).apply(np.log)
df = df/df.sum()
final = pd.DataFrame(np.dot(ret, df), columns = ['final'])
X[period[i + 1]] = final['final']
X.dropna(how = 'any', inplace = True)
return X
def get_tradeday_list(start,end,frequency=None,count=None):
if count != None:
df = get_price('000001.XSHG',end_date=end,count=count)
else:
df = get_price('000001.XSHG',start_date=start,end_date=end)
if frequency == None or frequency =='day':
return df.index
else:
df['year-month'] = [str(i)[0:7] for i in df.index]
if frequency == 'month':
return df.drop_duplicates('year-month', take_last = True).index
elif frequency == 'quarter':
df['month'] = [str(i)[5:7] for i in df.index]
df = df[(df['month']=='01') | (df['month']=='04') | (df['month']=='07') | (df['month']=='10') ]
return df.drop_duplicates('year-month').index
elif frequency =='halfyear':
df['month'] = [str(i)[5:7] for i in df.index]
df = df[(df['month']=='01') | (df['month']=='07')]
return df.drop_duplicates('year-month').index
#流通市值调整换手
def adjTurnover(stocks, end_date):
start_date = list(get_tradeday_list(start = None, end = end_date, frequency = 'month', count = 24).date)[0]
VOL20 = get_factor_values(stocks, ['VOL20'], end_date = end_date, count = 1)['VOL20'].T
VOL20.replace(0, np.nan, inplace = True)
VOL20 = VOL20.apply(np.log)
VOL20.columns = ['VOL20']
q = query(valuation.code, valuation.circulating_market_cap).filter(valuation.code.in_(stocks))
df = get_fundamentals(q, end_date)
df.index = df['code']
del df['code']
VOL20['circulating_market_cap'] = (df['circulating_market_cap'] * 1e8).apply(np.log)
VOL20.dropna(how = 'any', inplace = True)
X = VOL20['circulating_market_cap']
Y = VOL20['VOL20']
l = linreg(X,Y)
adjTurnover = pd.DataFrame(l, index = VOL20.index, columns = ['adjTurnover'])
return adjTurnover
## 收盘后运行函数
def after_market_close(context):
pass
'''
下个模块里method默认为'IVR',需要其他因子的时候更改这个变量至'IVol', 'PriceDelay', 'adjTurnover'即可。 按顺序一次一个,重复运行下面4个模块
#研究调用回测
created_bt_ids = []
method = 'IVR'
#取全部组别进行回测
for i in range(1, 11):
algorithm_id = "e1f0bf5d8c9d75c38497c9b00a01eb9d" #用自己的策略ID
extra_vars = {'group': i, 'method': method}
params = {
"algorithm_id": algorithm_id,
"start_date": "2005-05-31",
"end_date": "2019-02-27",
"frequency": "day",
"initial_cash": "30000000",
"initial_positions": None,
"extras": extra_vars,
"name" : method + ' group:' + str(i)
}
created_bt_ids.append(create_backtest(code = code, **params))
#等上个模块的回测全部运行结束再运行这个模块
#先获取基准收益和交易日信息
gt = get_backtest(backtest_id = created_bt_ids[0])
res = gt.get_results()
b_return = []
t = []
for r in res:
b_return.append(r['benchmark_returns'])
t.append(r['time'])
#建立df存储数据
data = pd.DataFrame(index = t)
data['b_return'] = b_return
#填入不同参数下的收益数据
for i in range(len(created_bt_ids)):
gt = get_backtest(backtest_id = created_bt_ids[i])
res = gt.get_results()
name = gt.get_params()['name']
s_return = []
for r in res:
s_return.append(r['returns'])
data[name] = s_return
#年化超额收益图
dat = (data.iloc[-1, :] + 1)
((dat / dat[0]) ** (1/14) - 1).drop('b_return').plot.bar(figsize = (7.5, 5))
#夏普比率图
sharpe = []
max_drawdown = []
information_ratio = []
name = range(1, 11)
for i in range(len(created_bt_ids)):
gt = get_backtest(backtest_id = created_bt_ids[i])
sharpe.append(gt.get_risk()['sharpe'])
max_drawdown.append(gt.get_risk()['max_drawdown'])
information_ratio.append(gt.get_risk()['information'])
sharpe = pd.Series(sharpe, index = name)
max_drawdown = pd.Series(max_drawdown, index = name)
information_ratio = pd.Series(information_ratio, index = name)
sharpe.plot.bar(figsize = (7.5, 5))
因子收益率回归解释
#获取全部期因子收益率
def get_RegExp_df(period):
IVol = []
IVR= []
PriceDelay = []
adjT = []
MKT = []
SMB = []
HML = []
MOM = []
for i in range(1, len(period) - 1):
stocks = get_stock(universe, period[i - 1])
LoS = len(stocks)
#查询因子的语句
q = query(
valuation.code,
valuation.circulating_market_cap,
(balance.total_owner_equities/valuation.circulating_market_cap/100000000.0).label("BP"),
).filter(
valuation.code.in_(stocks)
)
df = get_fundamentals(q, period[i - 1])
df.index = df['code']
del df['code']
#中性化
#df = neutralize(df, how = ['sw_l1', 'market_cap'], date = end, axis = 0)
n = int(LoS/3)
#选出特征股票组合
S=df.sort_values('circulating_market_cap').index.tolist()[:n]
B=df.sort_values('circulating_market_cap').index.tolist()[LoS - n:]
L=df.sort_values('BP').index.tolist()[:n]
H=df.sort_values('BP').index.tolist()[LoS - n:]
#df5 = (df['circulating_market_cap'] * 1e8).apply(np.log)
df5 = df['circulating_market_cap']
# 获得样本期间的股票价格并计算月收益率
df2 = get_price(stocks, start_date = period[i - 1], end_date = period[i], fields=['close'])['close']
df4 = df2.iloc[-1, :] / df2.iloc[0, :] - 1
df4.fillna(0, inplace = True)
#动量(反转)因子
prior_date = list(get_tradeday_list(start = None, end = period[i - 1], frequency = 'month', count = 24).date)[0]
df22 = get_price(stocks, start_date = prior_date, end_date = period[i - 1], fields=['close'])['close']
df42 = pd.DataFrame(df22.iloc[-1, :] / df22.iloc[0, :] - 1, columns = ['ret'])
df42.fillna(0, inplace = True)
LO = df42.sort_values('ret').index.tolist()[: n]
W = df42.sort_values('ret').index.tolist()[LoS - n:]
#求因子的值,按流通市值加权
SMB.append(np.dot(df4[S], df5.loc[S] / df5.loc[S].sum()) - np.dot(df4[B], df5.loc[B] / df5.loc[B].sum()))
HML.append(np.dot(df4[H], df5.loc[H] / df5.loc[H].sum()) - np.dot(df4[L], df5.loc[L] / df5.loc[L].sum()))
#价格数据股票比财务数据多
df6 = df5.loc[df4.index]
df6.fillna(df5.mean(), inplace = True)
MOM.append(np.dot(df4[LO], df6.loc[LO] / df6.loc[LO].sum()) - np.dot(df4[W], df6.loc[W] / df6.loc[W].sum()))
#用股票池,流通市值为权重作为市场基准
MKT.append(np.dot(df4, df6 / df6.sum()))
global factor
factor = 'IVol'
factor_data = FF3(stocks, period[i - 1], period[i])
factor_data.columns = [factor]
first = factor_data.sort_values(factor).index.tolist()[:n]
last = factor_data.sort_values(factor).index.tolist()[LoS - n:]
IVol.append(np.dot(df4[first], df6.loc[first] / df6.loc[first].sum()) - np.dot(df4[last], df6.loc[last] / df6.loc[last].sum()))
factor = 'IVR'
factor_data = FF3(stocks, period[i - 1], period[i])
factor_data.columns = [factor]
first = factor_data.sort_values(factor).index.tolist()[:n]
last = factor_data.sort_values(factor).index.tolist()[LoS - n:]
IVR.append(np.dot(df4[first], df6.loc[first] / df6.loc[first].sum()) - np.dot(df4[last], df6.loc[last] / df6.loc[last].sum()))
factor = 'PriceDelay'
factor_data = PD(stocks, period[i - 1], period[i])
factor_data.columns = [factor]
first = factor_data.sort_values(factor).index.tolist()[:n]
last = factor_data.sort_values(factor).index.tolist()[LoS - n:]
PriceDelay.append(np.dot(df4[first], df6.loc[first] / df6.loc[first].sum()) - np.dot(df4[last], df6.loc[last] / df6.loc[last].sum()))
factor = 'adjTurnover'
factor_data = adjTurnover(stocks, period[i])
factor_data.columns = [factor]
first = factor_data.sort_values(factor).index.tolist()[:n]
last = factor_data.sort_values(factor).index.tolist()[LoS - n:]
adjT.append(np.dot(df4[first], df6.loc[first] / df6.loc[first].sum()) - np.dot(df4[last], df6.loc[last] / df6.loc[last].sum()))
#将因子们计算好并且放好
f_data = pd.DataFrame({"MKT": MKT, "SMB": SMB, "HML": HML, "MOM": MOM,
'IVol': IVol, 'IVR': IVR, 'PriceDelay': PriceDelay, 'adjTurnover': adjT})
f_data.dropna(how = 'any', inplace = True)
return f_data
#控制获取数据时段(因为数据库只到2005年1月1日,且IC计算要前一个月的数据,因此从1月底开始计算)
start_date = '2005-04-30'
end_date = '2019-02-27'
period = get_tradeday_list(start = start_date, end = end_date, frequency = 'month').date
RegExp_df = get_RegExp_df(period)
#线性回归解释,Yf是被解释变量(只能有一个),Xf是用来解释的变量列表(一个或多个,不能包含Yf)
#可选变量列表['IVol', 'IVR', 'PriceDelay', 'adjTurnover', 'MKT', 'SMB', 'HML', 'MOM']
def RegExp(df, Yf, Xf):
X = sm.add_constant(array(df[Xf], dtype=float))
Y = array(df[Yf])
results = sm.OLS(Y, X).fit()
print(results.summary())
return None
RegExp(RegExp_df, Yf = 'IVol', Xf = ['IVR', 'adjTurnover'])
RegExp(RegExp_df, Yf = 'IVR', Xf = ['IVol', 'PriceDelay', 'adjTurnover'])
RegExp(RegExp_df, Yf = 'IVR', Xf = ['MKT', 'SMB', 'HML', 'MOM', 'IVol', 'PriceDelay', 'adjTurnover'])
RegExp(RegExp_df, Yf = 'PriceDelay', Xf = ['IVR'])
RegExp(RegExp_df, Yf = 'adjTurnover', Xf = ['IVol', 'IVR', 'PriceDelay'])
RegExp(RegExp_df, Yf = 'adjTurnover', Xf = ['MKT', 'SMB', 'HML', 'MOM', 'IVol', 'IVR', 'PriceDelay'])
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...
移动端课程