前面两个部分已经把数据做好了前期处理,这篇主要就少单因子回归和其有效性检验。
1、模型的选择
2、有效性检验
一般来说统计模型分为两种,一种是时间序列模型,一种是横截面模型。其中横截面模型时目前业界较常用于因子测试的方法。
这两种模型存在以下两点共性。第一,这两类模型都给出了资产收益、波动分解的线性、可加形式;第二这两类模型都要求解释变量互不相关,即正交化。在搭建模型时,时间序列的解释变量的数据更容易处理,但存在回归残差的自相关性,而横截面模型由于它描述不同资产收益差异和因子暴露差异之间的关联,因子设定可以比较灵活,由于横截面模型用多资产方式来捕捉解释变量的变异度,采用单片截面滚动估计,对不同资产价格波动差异的解释更为及时。所以通常都会选择横截面模型来进行回归。
我们选择每期针对全体样本做一次回归,回归时因子暴露为已知变量,回归得到每期的一个因子收益值
回归的方法包括最小二乘法OLS,加权最小二乘法WLS,稳健回归RLM(Robust Linear Model),这里我就不详细说明每种的差异,在这里选取的是可以更好地处理异常值影响的RLM方法。
from jqfactor import Factor, calc_factors
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
stock = get_index_stocks('000300.XSHG')
class Hs300Alpha(Factor):
# 设置因子名称
name = 'hs300_alpha'
# 设置获取数据的时间窗口长度
max_window = 10
# 设置依赖的数据
dependencies = ['close']
# 计算因子的函数, 需要返回一个 pandas.Series, index 是股票代码,value 是因子值
def calc(self, data):
# 获取个股的收盘价数据
close = data['close']
# 计算个股近10日收益
stock_return = close.iloc[-1,:]/close.iloc[0,:] -1
# 获取指数(沪深300)的收盘价数据
index_close = self._get_extra_data(securities=['000300.XSHG'], fields=['close'])['close']
# 计算指数的近10日收益
index_return = index_close.iat[-1,0]/index_close.iat[0,0] - 1
# 计算 alpha
alpha = stock_return - index_return
return alpha
factors = calc_factors(stock, [Hs300Alpha()], start_date='2017-01-01', end_date='2017-12-31')
data=factors['hs300_alpha']
#处理缺失值
output=pd.DataFrame()
for i in range(300):
p=sum(data.iloc[:,i].isnull())/len(data.iloc[:,i])
if p<0.2:
data.iloc[:,i].fillna(mean(data.iloc[:,i]))
output[i]=data.iloc[:,i]
output.columns=data.columns[output.columns]
output=output.fillna(mean(data))
#异常值处理
for i in range(len(output.columns)):
MAD=median(abs(output.iloc[:,i]-median(output.iloc[:,i])))
MAX=median(output.iloc[:,i]) 3*1.4826*MAD
MIN=median(output.iloc[:,i])-3*1.4826*MAD
output.iloc[:,i][output.iloc[:,1]>MAX]=MAX
output.iloc[:,i][output.iloc[:,1]<MIN]=MIN
#标准化
for i in range(len(output.columns)):
output.iloc[:,i]=(output.iloc[:,i]-mean(output.iloc[:,i]))/std(output.iloc[:,i])
#获得行业哑变量矩阵
from jqdata import *
sw=get_industries(name='sw_l1').index
industry=pd.DataFrame(0,columns=output.columns,index=range(0,28))
for i in range(len(sw)):
temp=list(set(output.columns).intersection(set(get_industry_stocks(sw[i]))))
industry.loc[i,temp]=1
#去除市值、行业因素,得到新的因子值
newx=pd.DataFrame()
for i in range(len(output.index)):
m= get_fundamentals(query(valuation.circulating_cap,valuation.code).filter(valuation.code.in_(output.columns)), date=output.index[i])
m.index=np.array(m['code'])
m=m.iloc[:,0]
m=(m-mean(m))/std(m)
x=output.iloc[i,:]
conc=pd.concat([x,m,industry.T],axis=1).fillna(mean(m))
est=sm.OLS(conc.iloc[:,0],conc.iloc[:,1:]).fit()
y_fitted = est.fittedvalues
newx[i]=est.resid
newx=newx.T
newx.index=output.index
newx=newx.iloc[1:,:]
#看图
'''
fig, ax = plt.subplots(figsize=(8,6))
ax.plot(conc.iloc[:,1],conc.iloc[:,0], 'o', label='data')
ax.plot(conc.iloc[:,1], y_fitted, 'r--.',label='OLS')
'''
#将因子值和y值匹配
output=output.iloc[:-1,:]
df = get_price(list(output.columns), start_date='2017-01-01', end_date='2017-12-31', frequency='daily', fields=['close'])
y=df['close'].diff()/np.array(df['close'])
y=y.iloc[1:,:]
y=y.fillna(mean(y))
y = y.drop((y.index).difference(newx.index))
#做回归 求回归系数
f=[0]*len(y.index)
t=[0]*len(y.index)
for i in range(len(y.index)):
rlm_model = sm.RLM(y.iloc[i,:], newx.iloc[i,:], M=sm.robust.norms.HuberT()).fit()
f[i]=float(rlm_model.params)
t[i]=float(rlm_model.tvalues)
'''
#对回归的结果画图
y_fit=rlm_model.fittedvalues
fig, ax = plt.subplots(figsize=(8,6))
ax.plot(newx,y, 'o', label='data')
ax.plot(newx, y_fit, 'r--.',label='OLS')
'''
因子的评价标准分为4个部分
1、因子收益率检验:检验每组是否可以取得正的收益,波动率还有正的概率。
2、因子的显著性检验:检验每组是否可以取得正的超额收益率,概率有多大。
3、因子区分度:第一组的收益率是否显著大于第五组,因子是否能显著把表现好的股票和表现差的股票区分开。
4、因子的延续性检验:由因子得到的分组是否能够在较长的一段时间保持良好的方向性,即在本期表现好的因子能否在下期也表现好。
1、因子收益率检验
(1)平均收益(各组)
(2)标准差(各组)
(3)夏普比率(各组)
(4)胜率(各组)(正的概率)
2、因子显著性检验
(1)超额均值
(2)跟踪误差(超额收益率的标准差,衡量各组偏离基准的程度)
(3)信息比率(超额均值/超额标准差)
(4)T统计量
(5)超额概率(超基准收益率中>0的概率)
3、因子区分度(第一组-最后一组)
(1)平均收益
(2)标准差
(3)胜率
(4)夏普比率(平均/标准差)
(5)T值
4、因子延续性检验------IC值
#做分层回溯法
newy=df['close']/df['close'].iloc[0,:]-1
newy=newy.iloc[1:,:]
newy=newy.fillna(np.mean(y))
newy = newy.drop((newy.index).difference(newx.index))
ind=get_price('000300.XSHG', start_date=y.index[0], end_date=y.index[-1], fields=['close'])
ind=(ind.iloc[0,:]-ind.iloc[-1,:])/ind.iloc[0,:]
fc1=[0]*len(newx.index)
fc2=[0]*len(newx.index)
fc3=[0]*len(newx.index)
fc4=[0]*len(newx.index)
fc5=[0]*len(newx.index)
output_mean=pd.DataFrame()
output_std=pd.DataFrame()
output_sharp=pd.DataFrame()
output_win=pd.DataFrame()
for i in range(len(newx.index)):
d=pd.DataFrame()
d1=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5)].index
d2=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5*2) ].index
d2=(d2).difference(d1)
d3=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5*3) ].index
d3=(d3).difference(d2)
d4=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5*4) ].index
d4=(d4).difference(d3)
d5=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5*5) ].index
d5=(d5).difference(d4)
d=[d1,d2,d3,d4,d5]
temp_mean=[0]*5
temp_std=[0]*5
temp_win=[0]*5
excess_mean=[0]*5
excess_std=[0]*5
excess_win=[0]*5
for j in range(0,5):
temp_mean[j]=np.mean(y.iloc[i,:][d[j]])
for j in range(0,5):
temp_std[j]=np.std(y.iloc[i,:][d[j]])
for j in range(0,5):
temp_win[j]=sum(y.iloc[i,:][d[j]]>0)/len(y.iloc[i,:][d[j]])
output_mean[i]=temp_mean
output_std[i]=temp_std
output_sharp[i]=output_mean[i]/output_std[i]
output_win[i]=temp_win
for j in range(0,5):
sto=get_price(list(d[j]), start_date=y.index[0], end_date=y.index[-1], fields=['close'])['close']
sto=(sto.iloc[0,:]-sto.iloc[-1,:])/sto.iloc[0,:]
excess_mean[j]=(sto-list(ind)).mean()
excess_std[j]=std(sto-list(ind))
excess_win[j]=mean((sto-list(ind))>0)/len(d[j])
#因子收益率检验
a=pd.DataFrame()
#平均收益(各组)(年化---复利下)
a[0]=(output_mean.mean(axis=1) 1)**365-1
#标准差(各组)
a[1]=(output_std.mean(axis=1))
#夏普比率(各组)
a[2]=output_sharp.mean(axis=1)
#胜率(各组)
a[3]=output_win.mean(axis=1)
a.columns=['年化平均收益','标准差','夏普比率','胜率']
a.index=['第一组','第二组','第三组','第四组','第五组']
a
#因子显著性检验
b=pd.DataFrame()
#超额均值
b[0]=excess_mean
#跟踪误差
b[1]=excess_std
#信息比例
b[2]=np.array(excess_mean)/np.array(excess_std)
#t值
b[3]=np.array(excess_mean)/(np.array(excess_std)/((len(y.columns)/5)**0.5))
#胜率
b[4]=excess_win
b.columns=['超额均值','跟踪误差','信息比率','T统计量','胜率']
b.index=['第一组','第二组','第三组','第四组','第五组']
b
#因子区分度
c=pd.DataFrame(a.iloc[0,:]-a.iloc[-1,:],columns=['第一组-第五组']).T
c
#因子延续性
#计算IC值序列
d=[0]*5
IC=[0]*len(y.columns)
for i in range(len(y.columns)):
IC[i]=corrcoef(pd.Series(f).rank(),y.iloc[:,i].rank())[0,1]
#计算IC值的均值
d[0]=mean(IC)
#计算IC值的标准差
d[1]=std(IC)
#IC大于0的比例
d[2]=sum(pd.Series(IC)>0)/len(IC)
#IC绝对值大于0.02的比例
d[3]=sum(pd.Series(IC)>0.02)/len(IC)
#IR值
d[4]=mean(IC)/std(IC)
d=pd.DataFrame(d,index=['IC均值','IC标准差','IC大于0的比例','IC绝对值大于0.02的比例','IR值']).T
d
from jqfactor import Factor, calc_factors
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
stock = get_index_stocks('000300.XSHG')
class Hs300Alpha(Factor):
# 设置因子名称
name = 'hs300_alpha'
# 设置获取数据的时间窗口长度
max_window = 10
# 设置依赖的数据
dependencies = ['close']
# 计算因子的函数, 需要返回一个 pandas.Series, index 是股票代码,value 是因子值
def calc(self, data):
# 获取个股的收盘价数据
close = data['close']
# 计算个股近10日收益
stock_return = close.iloc[-1,:]/close.iloc[0,:] -1
# 获取指数(沪深300)的收盘价数据
index_close = self._get_extra_data(securities=['000300.XSHG'], fields=['close'])['close']
# 计算指数的近10日收益
index_return = index_close.iat[-1,0]/index_close.iat[0,0] - 1
# 计算 alpha
alpha = stock_return - index_return
return alpha
factors = calc_factors(stock, [Hs300Alpha()], start_date='2017-01-01', end_date='2017-12-31')
/opt/conda/envs/python3new/lib/python3.6/site-packages/statsmodels/compat/pandas.py:56: FutureWarning: The pandas.core.datetools module is deprecated and will be removed in a future version. Please use the pandas.tseries module instead. from pandas.core import datetools
data=factors['hs300_alpha']
#处理缺失值
output=pd.DataFrame()
for i in range(300):
p=sum(data.iloc[:,i].isnull())/len(data.iloc[:,i])
if p<0.2:
data.iloc[:,i].fillna(mean(data.iloc[:,i]))
output[i]=data.iloc[:,i]
output.columns=data.columns[output.columns]
output=output.fillna(mean(data))
#异常值处理
for i in range(len(output.columns)):
MAD=median(abs(output.iloc[:,i]-median(output.iloc[:,i])))
MAX=median(output.iloc[:,i])+3*1.4826*MAD
MIN=median(output.iloc[:,i])-3*1.4826*MAD
output.iloc[:,i][output.iloc[:,1]>MAX]=MAX
output.iloc[:,i][output.iloc[:,1]<MIN]=MIN
#标准化
for i in range(len(output.columns)):
output.iloc[:,i]=(output.iloc[:,i]-mean(output.iloc[:,i]))/std(output.iloc[:,i])
#获得行业哑变量矩阵
from jqdata import *
sw=get_industries(name='sw_l1').index
industry=pd.DataFrame(0,columns=output.columns,index=range(0,28))
for i in range(len(sw)):
temp=list(set(output.columns).intersection(set(get_industry_stocks(sw[i]))))
industry.loc[i,temp]=1
#去除市值、行业因素,得到新的因子值
newx=pd.DataFrame()
for i in range(len(output.index)):
m= get_fundamentals(query(valuation.circulating_cap,valuation.code).filter(valuation.code.in_(output.columns)), date=output.index[i])
m.index=np.array(m['code'])
m=m.iloc[:,0]
m=(m-mean(m))/std(m)
x=output.iloc[i,:]
conc=pd.concat([x,m,industry.T],axis=1).fillna(mean(m))
est=sm.OLS(conc.iloc[:,0],conc.iloc[:,1:]).fit()
y_fitted = est.fittedvalues
newx[i]=est.resid
newx=newx.T
newx.index=output.index
newx=newx.iloc[1:,:]
#看图
'''
fig, ax = plt.subplots(figsize=(8,6))
ax.plot(conc.iloc[:,1],conc.iloc[:,0], 'o', label='data')
ax.plot(conc.iloc[:,1], y_fitted, 'r--.',label='OLS')
'''
#将因子值和y值匹配
output=output.iloc[:-1,:]
df = get_price(list(output.columns), start_date='2017-01-01', end_date='2017-12-31', frequency='daily', fields=['close'])
y=df['close'].diff()/np.array(df['close'])
y=y.iloc[1:,:]
y=y.fillna(mean(y))
y = y.drop((y.index).difference(newx.index))
#做回归 求回归系数
f=[0]*len(y.index)
t=[0]*len(y.index)
for i in range(len(y.index)):
rlm_model = sm.RLM(y.iloc[i,:], newx.iloc[i,:], M=sm.robust.norms.HuberT()).fit()
f[i]=float(rlm_model.params)
t[i]=float(rlm_model.tvalues)
'''
#对回归的结果画图
y_fit=rlm_model.fittedvalues
fig, ax = plt.subplots(figsize=(8,6))
ax.plot(newx,y, 'o', label='data')
ax.plot(newx, y_fit, 'r--.',label='OLS')
'''
#做分层回溯法
newy=df['close']/df['close'].iloc[0,:]-1
newy=newy.iloc[1:,:]
newy=newy.fillna(np.mean(y))
newy = newy.drop((newy.index).difference(newx.index))
ind=get_price('000300.XSHG', start_date=y.index[0], end_date=y.index[-1], fields=['close'])
ind=(ind.iloc[0,:]-ind.iloc[-1,:])/ind.iloc[0,:]
fc1=[0]*len(newx.index)
fc2=[0]*len(newx.index)
fc3=[0]*len(newx.index)
fc4=[0]*len(newx.index)
fc5=[0]*len(newx.index)
output_mean=pd.DataFrame()
output_std=pd.DataFrame()
output_sharp=pd.DataFrame()
output_win=pd.DataFrame()
for i in range(len(newx.index)):
d=pd.DataFrame()
d1=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5)].index
d2=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5*2) ].index
d2=(d2).difference(d1)
d3=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5*3) ].index
d3=(d3).difference(d2)
d4=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5*4) ].index
d4=(d4).difference(d3)
d5=newx.iloc[i,:][newx.iloc[i,:].rank()<=(len(newx.index)/5*5) ].index
d5=(d5).difference(d4)
d=[d1,d2,d3,d4,d5]
temp_mean=[0]*5
temp_std=[0]*5
temp_win=[0]*5
excess_mean=[0]*5
excess_std=[0]*5
excess_win=[0]*5
for j in range(0,5):
temp_mean[j]=np.mean(y.iloc[i,:][d[j]])
for j in range(0,5):
temp_std[j]=np.std(y.iloc[i,:][d[j]])
for j in range(0,5):
temp_win[j]=sum(y.iloc[i,:][d[j]]>0)/len(y.iloc[i,:][d[j]])
output_mean[i]=temp_mean
output_std[i]=temp_std
output_sharp[i]=output_mean[i]/output_std[i]
output_win[i]=temp_win
for j in range(0,5):
sto=get_price(list(d[j]), start_date=y.index[0], end_date=y.index[-1], fields=['close'])['close']
sto=(sto.iloc[0,:]-sto.iloc[-1,:])/sto.iloc[0,:]
excess_mean[j]=(sto-list(ind)).mean()
excess_std[j]=std(sto-list(ind))
excess_win[j]=mean((sto-list(ind))>0)/len(d[j])
#因子收益率检验
a=pd.DataFrame()
#平均收益(各组)(年化---复利下)
a[0]=(output_mean.mean(axis=1)+1)**365-1
#标准差(各组)
a[1]=(output_std.mean(axis=1))
#夏普比率(各组)
a[2]=output_sharp.mean(axis=1)
#胜率(各组)
a[3]=output_win.mean(axis=1)
a.columns=['年化平均收益','标准差','夏普比率','胜率']
a.index=['第一组','第二组','第三组','第四组','第五组']
a
#因子显著性检验
b=pd.DataFrame()
#超额均值
b[0]=excess_mean
#跟踪误差
b[1]=excess_std
#信息比例
b[2]=np.array(excess_mean)/np.array(excess_std)
#t值
b[3]=np.array(excess_mean)/(np.array(excess_std)/((len(y.columns)/5)**0.5))
#胜率
b[4]=excess_win
b.columns=['超额均值','跟踪误差','信息比率','T统计量','胜率']
b.index=['第一组','第二组','第三组','第四组','第五组']
b
#因子区分度
c=pd.DataFrame(a.iloc[0,:]-a.iloc[-1,:],columns=['第一组-第五组']).T
c
#因子延续性
#计算IC值序列
d=[0]*5
IC=[0]*len(y.columns)
for i in range(len(y.columns)):
IC[i]=corrcoef(pd.Series(f).rank(),y.iloc[:,i].rank())[0,1]
#计算IC值的均值
d[0]=mean(IC)
#计算IC值的标准差
d[1]=std(IC)
#IC大于0的比例
d[2]=sum(pd.Series(IC)>0)/len(IC)
#IC绝对值大于0.02的比例
d[3]=sum(pd.Series(IC)>0.02)/len(IC)
#IR值
d[4]=mean(IC)/std(IC)
d=pd.DataFrame(d,index=['IC均值','IC标准差','IC大于0的比例','IC绝对值大于0.02的比例','IR值']).T
d
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...
移动端课程