Barra模型进阶,多因子模型风险预测¶
研究目的¶
本篇内容主要参考方正金工研究报告“星火” 多因子系列报告的第二篇《Barra模型进阶,多因子模型风险预测》,对Barra模型的因子风险部分进行介绍。其主要利用Barra模型初探一文中的因子收益矩阵,预测未来各个因子的风险,并且对风险进行Newey-West自相关调整、特征值调整、波动率偏误调整,得到最终描述因子预期风险的协方差矩阵。
特别提示:阅读本篇之前建议先阅读Barra模型初探
内容分布¶
- 1.多因子模型回顾
- 2.Newey-West 自相关调整
- 3.特征值调整
- 4.波动率偏误调整
多因子模型回顾¶
本部分对Barra多因子模型进行简要回顾:假设市场上有K个驱动股票收益的共同因子,那么多因子模型可以表示为: 在本文中,我们衔接了“Barra模型初探”一文,采用聚宽因子库中的45个因子,在使用加权最小二乘回归后筛除6个异常因子,一共剩余39个因子进行下面的计算。
注意:本报告最后对于风险的预测可能不甚理想,原因在于研报中对于因子的选取过于粗糙。事实上,笔者翻阅了Barra模型的手册,发现其因子的构造有一系列复杂的过程,中间要用到各种优化函数。因此,本文着力与给各位读者提供计算风险的思路,如果想力求精确,更应该在因子选取和数据清洗上下功夫。
Newey-West自相关调整¶
依据“Barra模型初探”中的步骤,我们可以得到一个以因子名称为index,时间线为column的数据框,每个数据代表了该因子在某个交易日的收益率。
from six import BytesIO
import pandas as pd
import numpy as np
data=pd.read_csv("factor_return.csv")
data = data.loc[:, ~data.columns.str.contains('^Unnamed')]
#筛选掉所有表现异常的因子
data.drop([15],inplace=True)
data.drop([16],inplace=True)
data.drop([18],inplace=True)
data.drop([19],inplace=True)
data.drop([28],inplace=True)
data.drop([31],inplace=True)
data.index=[np.arange(data.shape[0])]
print(data)
在得到上述数据框后,我们首先要计算初始的协方差矩阵。传统情况下,计算协方差矩阵直接采用股票收益率之间的协方差,这种方法将每一天的数据都视为同等重要。然而现实中市场上因子的收益率是存在序列相依性的,这意味着近期的数据对未来有更好的预测效果,因此采用半衰指数加权平均的方法计算日度协方差矩阵: 其中$f_{k,s}$代表因子$k$在$s$期的收益,$\overline{f_k}$表示因子$k$的收益在样本期内的指数加权平均,$h$表示样本期间的长度(本例为113),$τ$为半衰期参数,$λ=0.5^{1/τ}$。在实际计算中,我们取$τ=90$。
经过上述计算,我们可以得到原始的协方差矩阵,可以记为"init_cov",但由于金融时间序列数据的群聚性与相关性,我们必须采用计量经济学中的Newey-West调整,具体步骤如下: 在实际计算中,我们对方差和协方差序列的相关滞后时间长度均取$D=2$,半衰期设置为$90$。
#引入工具包
import time
from datetime import datetime,timedelta
from jqdata import *
import numpy as np
import pandas as pd
import math
from statsmodels import regression
import statsmodels.api as sm
import matplotlib.pyplot as plt
import datetime
from scipy import stats
from jqfactor import *
from scipy.optimize import minimize
import warnings
warnings.filterwarnings('ignore')
#设置画图样式
plt.style.use('ggplot')
#获取日期列表
def get_tradeday_list(start,end,frequency=None,count=None):
if count !=None:
df=get_price('000001.XSHG',end_date=end,count=count)
else:
df=get_price('000001.XSHG',start_date=start,end_date=end)
if frequency==None or frequency=='day':
return df.index
else:
df['year-month'] = [str(i)[0:7] for i in df.index]
if frequency == 'month':
return df.drop_duplicates('year-month').index
elif frequency == 'quarter':
df['month'] = [str(i)[5:7] for i in df.index]
df = df[(df['month']=='01') | (df['month']=='04') | (df['month']=='07') | (df['month']=='10') ]
return df.drop_duplicates('year-month').index
elif frequency =='halfyear':
df['month'] = [str(i)[5:7] for i in df.index]
df = df[(df['month']=='01') | (df['month']=='06')]
return df.drop_duplicates('year-month').index
#推断交易日为date之后的第shift个交易日
def ShiftTradingDay(date,shift):
# 获取所有的交易日,返回一个包含所有交易日的 list,元素值为 datetime.date 类型.
tradingday = get_all_trade_days()
# 得到date之后shift天那一天在列表中的行标号 返回一个数
date = datetime.date(int(str(date)[:4]),int(str(date)[5:7]),int(str(date)[8:10]))
shiftday_index = list(tradingday).index(date)+shift
# 根据行号返回该日日期 为datetime.date类型
return tradingday[shiftday_index]
#进行新股、St股过滤,返回筛选后的股票
#!!!不能过滤停牌股票
def filter_stock(stockList,date,days=21*3,limit=0):#日频策略加入开盘涨停过滤
#去除上市距beginDate不足3个月的股票
def delect_stop(stocks,beginDate,n=days):
stockList=[]
beginDate = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
for stock in stocks:
start_date=get_security_info(stock).start_date
if start_date<(beginDate-datetime.timedelta(days=n)).date():
stockList.append(stock)
return stockList
#剔除ST股
st_data=get_extras('is_st',stockList, count = 1,end_date=date)
stockList = [stock for stock in stockList if not st_data[stock][0]]
#新股及退市股票
stockList=delect_stop(stockList,date)
#剔除开盘涨停股票
if limit == 1:
#如果需要收盘涨跌停可以改字段即可
df = get_price(stockList,end_date=date,fields=['open','high_limit','low_limit'],count=1).iloc[:,0,:]
df['h_limit']=(df['open']==df['high_limit'])
df['l_limit']=(df['open']==df['low_limit'])
stockList = [df.index[i] for i in range(len(df)) if not (df.h_limit[i] or df.l_limit[i])] #过滤涨跌停股票
return stockList
#为股票池添加行业标记,return df格式 ,为中性化函数的子函数
def get_industry_exposure(stock_list,date):
df = pd.DataFrame(index=get_industries(name='sw_l1').index, columns=stock_list)
for stock in stock_list:
try:
df[stock][get_industry_code_from_security(stock,date=date)] = 1
except:
continue
return df.fillna(0)#将NaN赋为0
#查询个股所在行业函数代码(申万一级) ,为中性化函数的子函数 (是get_industry_exposure的支持函数)
def get_industry_code_from_security(security,date=None):
industry_index=get_industries(name='sw_l1').index
for i in range(0,len(industry_index)):
try:
index = get_industry_stocks(industry_index[i],date=date).index(security)
return industry_index[i]
except:
continue
return u'未找到'
#初始设置,设置统计区间,调仓频率
#设置统计数据区间
index = '000905.XSHG' #设置股票池,和对比基准,这里是中证500
#设置统计起止日期
date_start = '2019-02-01'
date_end = '2019-07-24'
#设置调仓频率
trade_freq = 'day' #month每个自然月;day每个交易日;输入任意数字如 5,则为5日调仓
#获取调仓时间列表
if trade_freq == 'month':
#获取交易日列表,每月首个交易日
date_list = get_tradeday_list(start=date_start,end=date_end,frequency='month',count=None) #自然月的第一天
elif trade_freq == 'day':
date_list = get_tradeday_list(start=date_start,end=date_end,count=None)#获取回测日期间的所有交易日
else:
date_day_list = get_tradeday_list(start=date_start,end=date_end,count=None)#获取回测日期间的所有交易日
date_list = [date_day_list[i] for i in range(len(date_day_list)) if i%int(trade_freq) == 0]
#打印最终筛选出的调仓日期,可以删除
print(date_list)
def Newy_West_Cov(hyperpanel, forewards=21, nlags='Bartlett',
half_life=90, left_decay=True):
'''
Calculate Covariance Matrix adjusted by Newy West,
nlags estimated by Bartlett kernal func.
Parameters:
-----------
hyperpanel: N x T matrix. N denotes numbers of factors,
T is the time window. We set a threshold of
time window to ensure the estimation of covariance
is robust. Th threshold is 100 samples.
forewards: The intervals of estimation of corvariance.
According to Barra CNE5, we set 21 days (one month).
nlags : 'Bartlett' of an int. Number of lags when
estimating autocovariance.
half_life: half_life parameter of decay adjustment.
left_decay: If the latest day is in the right side,
left_decay is True. If the latest day is in the left side,
left_decay is False.
Returns:
--------
np.array, N x N matrix.
References:
----------
方正证券: Barra模型进阶:多因子模型风险预测 '星火'多因子系列(二)
statsmodels.stats.sandwich_covariance.cov_hac
'''
# 判断样本数是否小于100,如果小于100则raise ValueError, 可以自行修改
if hyperpanel.shape[1] < 100:
raise ValueError('Not enough samples')
if ~isinstance(hyperpanel, np.ndarray): # 判断是否输入为hyperpanel
hyperpanel = np.array(hyperpanel)
#按照定义计算lamda
if half_life is None:
lamb = 1
else:
lamb = 0.5 ** (1. / half_life)
#使用lanb**(x/2)本质上是把分子拆分成两部分,可以理解为a^(b/2)*a^(b/2)=a^b,主要是为了方便计算
if left_decay:
weights = np.array(list(map(lambda x: lamb ** (x / 2),
np.arange(hyperpanel.shape[1] - 1, -1, -1))))
else:
weights = np.array(list(map(lambda x: lamb ** (x / 2),
np.arange(hyperpanel.shape[1]))))
weighted_panel = hyperpanel * weights
# filter nan index
nonan_index = np.where(~np.isnan(np.sum(hyperpanel, axis=0)))[0]
filtered_weights = weights[nonan_index]
filtered_weighted_panel = weighted_panel[:, nonan_index]
if filtered_weighted_panel.shape[1] < 0.8 * hyperpanel.shape[1]:
raise ValueError(
'Too many nan values in hyperpanel,please handle nan first.')
# calculate initial covariance,使用np.nanmean处理缺失值问题
filtered_weighted_panel_demean = filtered_weighted_panel -\
np.nanmean(filtered_weighted_panel,axis=1).reshape(-1, 1)
init_covar = np.dot(filtered_weighted_panel_demean,
filtered_weighted_panel_demean.T) / sum(
filtered_weights ** 2)
if nlags == 'Bartlett':
nlags = int(4 * (filtered_weighted_panel.shape[1] / 100) ** (2 / 9))
elif isinstance(nlags, str):
raise ValueError('No such method, try input an int or Bartlett.')
def _autocov(panel, weight, lag):
if left_decay:
weight = weight[lag:]
else:
weight = weight[: lag]
panel_lag = panel[:, :-lag] * weight
panel_current = panel[:, lag:] * weight
nonan_index = np.where(
~np.isnan(np.sum(panel_current + panel_lag, axis=0)))[0]
panel_lag_demean = panel_lag -\
np.nanmean(panel_lag, axis=1).reshape(-1, 1)
panel_current_demean = panel_current -\
np.nanmean(panel_current, axis=1).reshape(-1, 1)
return np.dot(panel_current_demean[:, nonan_index],
panel_lag_demean[:, nonan_index].T) / sum(
weight[nonan_index] ** 2)
def adj_term(delta):
return (nlags - delta + 1) / (nlags + 1) * (_autocov(hyperpanel, weights, delta))
adj_term_sum=np.zeros(adj_term(1).shape)
for delta in np.arange(1,nlags+1):
adj_term_sum+=adj_term(delta)
return forewards * (init_covar + adj_term_sum + adj_term_sum.T)
pd1=pd.DataFrame(Newy_West_Cov(data))
pd1=np.around(pd1,decimals=5)
print(pd1)
特征值调整¶
如前文所述,直接采用协方差矩阵估计会对最优投资组合的风险产生明显低估。基于此,Menchero(2011)提出采用特征值调整(Eigenfactor Risk Adjustment)的方法对因子协方差矩阵进一步修正。
首先对样本协方差矩阵$F^{NW}$进行特征分解,可以得到对角矩阵$D_0$和正交矩阵$U_0$: $D_0=U_0^TF^{NW}U_0$
此外,为了完成修正,还需要进行$m$次蒙特卡洛模拟,每一次的步骤如下:
def eig_value_adj(cov, simu_len,simu_times=10000, adj_coef=1.2):
# number of factor returns
assert cov.ndim == 2
assert cov.shape[0] == cov.shape[1]
# m: number of simulation times
m = simu_times
# n: number of random variables
n = cov.shape[0]
# t: length of simulation numbers
t = simu_len
assert t >= n
d_0, u_0 = np.linalg.eigh(cov)
u_0 *= np.sqrt(d_0)
# After multiply the sqaured root of eignvalue,
# cov = u_0@u_0.T
# m * n * t. iid. standard normal random numbers
standard_normal_nums = np.random.standard_normal((m, n, t))
# m * n * t. t normal random n-dimension vectors with
# 0 mean and covariance matrix cov for each m
normal_nums = np.einsum('ij, ...jk -> ...ik',
u_0,
standard_normal_nums)
del standard_normal_nums
# m * n * n simulated estimated cov for each m
simu_cov = np.einsum('...ij, ...hj -> ...ih',
normal_nums,
normal_nums)
del normal_nums
# d_m: m * n. n eignvalues for each m
# u_m: m * n * n.
d_m, u_m = np.linalg.eigh(simu_cov)
del simu_cov
# d_m_tilde: m * n
d_m_tilde = np.sum(
np.einsum('...ij, ...ik -> ...jk',
u_m,
u_0) ** 2,
axis=2,
)
del u_m
# n * 1
lambda_diag = np.sqrt((np.mean(d_m / d_m_tilde, axis=0))/m)
del d_m, d_m_tilde
gamma_diag = 1. - adj_coef + adj_coef * lambda_diag
u_0 *= gamma_diag
cov_adj = np.dot(u_0, u_0.T)
return cov_adj
import pandas as pd
#注:这一段程序笔者在本机完成,在平台上由于numpy版本的问题暂时无法实现
#Eigen=(eig_value_adj(cov=pd1.values,simu_len=100,adj_coef=1.2, simu_times=10000))
#print(pd.DataFrame(Eigen))
Eigen=pd.read_csv("Eigen.csv")
Eigen = Eigen.loc[:, ~Eigen.columns.str.contains('^Unnamed')]
print(Eigen)
波动率偏误调整¶
传统的多因子模型在估计单个因子风险时,将每个因子视为独立的。也就是说每个因子本身的风险大小是通过该因子自己的时间序列数据得到的,与其他因子无关。但是,实际应用中表明,这样会使风险预测存在持续高估或低估的情况,因此还需要进行波动率偏误调整(Volatility Regime Adjustment)
首先,我们根据日度数据计算K个因子在横截面上的偏差统计量$B_t^F$
注意:笔者查阅了相关研报和Barra手册原文,并未找到具体的$σ_{kt}$的定义,因此这里采用initcov中计算出的$σ{kt}$。
其次,需要通过指数移动加权平均的方法,计算出过去一段时间的平均偏误系数$λ_F$,称作因子波动率乘数(factor volatility multiplier)
最后,计算因子协方差矩阵$F^{VAR}$:
$F^{VAR}=λ_F^2F^{Eigen}$
import numpy as np
import pandas as pd
def initial_cov(sample, forewards=21, lag_type='Bartlett',
half_life=90, ascending=True):
T, N = sample.shape
if half_life is None:
lamb = 1.
else:
lamb = 0.5 ** (1. / half_life)
if ascending:
weights = np.power(
lamb, np.arange(T) / 2.).reshape(-1, 1)
else:
weights = np.power(
lamb, np.arange(T - 1, -1, -1) / 2.).reshape(-1, 1)
if lag_type == 'Bartlett':
nlags = int(4 * (T / 100) ** (2 / 9))
# record nan index
# T * 1
# 1 * N
sample_mean = np.mean(sample, axis=0, keepdims=True)
# T * N
sample_weighted = (sample - sample_mean) * weights
# N * N
init_cov = np.dot(sample_weighted.T, sample_weighted) / np.sum(
weights ** 2, keepdims=True)
return init_cov
def volatility_regime_adjustment(factor_return,Eigen_matrix,window1=21,half_life=42):
#计算每日每个因子的波动标准差
var_list=pd.DataFrame()
for i in range(window1,len(date_list)-1):
variance_matrix=initial_cov((factor_return.iloc[:,i-window1:i]).T.values)
#print(variance_matrix)
var_list[date_list[i]]=variance_matrix.diagonal()
std_list=np.sqrt(var_list)
factor1=factor_return.iloc[:,window1:]
#计算每个横截面数据的偏差统计量BF
factor_num=std_list.shape[0]
BF=np.sqrt(np.sum((factor1.values/std_list.values)**2,axis=0)/factor_num)
#print(BF)
if half_life is None:
lamb = 1
else:
lamb = 0.5 ** (1. / half_life)
weights = list(map(lambda x: lamb ** (x / 2),np.arange(len(BF))))
weights.reverse()
weights=np.array(weights)
BF=np.array(list(BF**2)).reshape(1,-1)
factor_vol_multiplier=np.sqrt((np.sum((BF)*weights))/np.sum(weights))
return factor_vol_multiplier*Eigen_matrix
Factor_cov=volatility_regime_adjustment(data,Eigen,window1=21,half_life=42)
print(pd.DataFrame(Factor_cov))
总结¶
以上,我们利用上一篇报告中的数据,使用Barra风险模型估计出了各个因子之间协方差矩阵。如果想进一步研究,还需要估计每支股票的特异性风险,并且将二者结合,从而可以估计出各个股票间的协方差矩阵。利用这一协方差矩阵,可以进一步构造Smart Beta等策略。