Barra模型初探,A股市場風格解析¶
研究目的
本篇內容是參考方正金工研究報告“星火” 多因子系列報告的第一篇《Barra模型初探,A股市場風格解析》,主要對Barra模型的基本原理進行介紹,對模型的細節部分進行說明,試圖構建多因子收益歸因模型,並利用風險收益模型對A股市場的風格進行解析,探討 Barra 模型在 A 股市場上的用武之地。
內容分布
- 1.模型介紹
- 1-1.多因子模型介紹
- 1-2.因子標準化
- 1-3.加權最小二乘法
- 1-4.barra風險收益歸因模型介紹
- 2.市場主流因子介紹
- 2-1.因子值計算存儲
- 2-2.因子值處理(加入收益)
- 3.因子收益率計算
- 3-1.進行因子收益計算
- 3-2.因子收益統計分析
- 4.組合收益歸因
- 4-1.構建組合統計收益
- 4-2.組合收益分解
#¶
多因子模型介紹
多因子模型的基礎理論認為:股票的收益是由一些共同的因子來驅動的,不能被這些因子解釋的部分被稱為股票的“特質收益率”, 而每支股票的特質收益率之間是互不相關的。那關於這些共同的因子,和股票收益的關系,可以參考下面的內容
結構化因子風險模型的作用
風險因子也稱為貝塔因子,和 Alpha 因子不同, 風險因子的風險溢價在時間序列上的均值絕對值可以很小,用這個因子來做選股長期可能沒有明顯超額收益,但在月度橫截面上風險因子可以影響顯著影響股票收益,方向可正可負。
因子收益率波動大,控制組合對風險因子的風險暴露,可以提升組合收益的穩定性。同時,通過因子暴露和因子收益率的計算,分析投資組合曆史和當前的風險風險暴露,可以進行收益分析。
在組合優化方面,傳統樣本協方差矩陣估計方法在股票數量較多時,矩陣可能不滿秩或者矩陣條件數 太大,無法直接用於組合優化過程。結構化因子風險模型通過降維的方式減小了股票收益率協方差矩陣的估計誤差,便於風險預測。 下面看下處理的一些細節
因子標準化
由於不同因子在數量級上存在差別, 在實際回歸中需要對單個因子在橫截面上進行標準化, 從而得到均值為 0、標準差為 1 的標準化因子,這里需要特別注意一下的是,為保證全市場基準指數對每個風格因子的暴露程度均為 0,我們需要對每個因子減去其市值加權均值,再除以其標準差,計算方法如下
考慮一個由市值加權構成的投資組合, 可以通過如下驗證看出,該投資組合對於任意因子的暴露度均為0。
加權最小二乘法
前面提到,在 Barra 模型中我們假設每只股票的特質收益率互不相關,但是每只股票的特質收益率列的方差並不相同,這就導致了回歸模型出現異方差性。為解決這一問題,可以采用加權最小二乘WLS 方法進行回歸,對不同的股票賦予不同的權重。
股票特質收益率方差通常與股票的市值規模成反比,即大市值股票的特質收益率方差通常較小,因此在這里的回歸公式中,我們將以市值的平方根占比作為每只股票的回歸權重,將其帶入公式進行計算,然後在我們實際計算的過程中,由於X為奇異矩陣,並不能順利求出收益率f,於是我們采用下面的方法進行處理
風險收益模型介紹
這里先來看下在USE4版本的barra模型下,收益表達式
截距項因子的加入導致自變量因子之間存在多
重共線性, 因此因子的擬合無法直接通過解析解求得,模型的求解轉變成一個帶約束條件的加權最小二乘法求解:
注意, 此處w是指單只股票 n 的市值權重,而w表示的是行業i內所有股票的市值占全體樣本股票市值的比例。
市場風格因子
基於研報中對 Barra 模型框架構建及求解過程的介紹, 我們參考並構建多因子風險收益歸因模型, 並將其運用到 A 股市場上, 從截距項、行業收益、風格收益三方面驗證模型正確性, 觀察市場風格的變化及投資組合的風險收益來源。
這里我們選取的風格因子可以通過聚寬因子庫,風格因子獲取,具體字段及說明如下
風格因子獲取地址: https://www.joinquant.com/help/api/help?name=factor_values#%E9%A3%8E%E6%A0%BC%E5%9B%A0%E5%AD%90
此處我們采用以上因子作為模型的解釋變量,進行下面的研究。
選 定 2016.6.1-2019.6.3 為 樣 本 考 察 期 間 , 以 中 證 500指數(000905.XSHG) 成分股為考察樣本,對市場風格因子的表現進行實證研究,在實際計算中還需對數據進行如下處理:
- 1) 剔除上市時間小於63天的股票;
- 2) 剔除標記為ST、*ST的股票;
- 3) 剔除任意因子為 NaN 的股票;
參考研報中為避免回歸模型中自變量之間產生多重共線性的情況,而引入相關強度指標RSI,對各風格因子之間的相關程度進行檢查,該指標的構造方法如下
其中,corr 是指在截面 t 期,所有股票的 A、 B 因子之間的相關系數。類似於績效評價中的信息比率 IC_IR,RSI指標 綜合考慮了因子的平均相關系數以及相關系數的穩定性大小,下面的計算中有展示2016年到2019年期間各風格因子之間的相關強度,其中市值因子與杠杆因子之間、殘差波動率因子與流動性因子之間存在較強的正相關關系;而市值因子與非線性市值因子之間、杠杆因子與非線性市值之間存在較強的負相關關系。
#工具包、工具函數
#工具函數
import time
from datetime import datetime, timedelta
from jqdata import *
import numpy as np
import pandas as pd
import math
from statsmodels import regression
import statsmodels.api as sm
import matplotlib.pyplot as plt
import datetime
from scipy import stats
from jqfactor import *
from scipy.optimize import minimize
import warnings
warnings.filterwarnings('ignore')
#設置畫圖樣式
plt.style.use('ggplot')
#獲取日期列表
def get_tradeday_list(start,end,frequency=None,count=None):
if count != None:
df = get_price('000001.XSHG',end_date=end,count=count)
else:
df = get_price('000001.XSHG',start_date=start,end_date=end)
if frequency == None or frequency =='day':
return df.index
else:
df['year-month'] = [str(i)[0:7] for i in df.index]
if frequency == 'month':
return df.drop_duplicates('year-month').index
elif frequency == 'quarter':
df['month'] = [str(i)[5:7] for i in df.index]
df = df[(df['month']=='01') | (df['month']=='04') | (df['month']=='07') | (df['month']=='10') ]
return df.drop_duplicates('year-month').index
elif frequency =='halfyear':
df['month'] = [str(i)[5:7] for i in df.index]
df = df[(df['month']=='01') | (df['month']=='06')]
return df.drop_duplicates('year-month').index
def ShiftTradingDay(date,shift):
# 獲取所有的交易日,返回一個包含所有交易日的 list,元素值為 datetime.date 類型.
tradingday = get_all_trade_days()
# 得到date之後shift天那一天在列表中的行標號 返回一個數
date = datetime.date(int(str(date)[:4]),int(str(date)[5:7]),int(str(date)[8:10]))
shiftday_index = list(tradingday).index(date)+shift
# 根據行號返回該日日期 為datetime.date類型
return tradingday[shiftday_index]
#進行新股、St股過濾,返回篩選後的股票
#!!!不能過濾停牌股票
def filter_stock(stockList,date,days=21*3,limit=0):#日頻策略加入開盤漲停過濾
#去除上市距beginDate不足3個月的股票
def delect_stop(stocks,beginDate,n=days):
stockList=[]
beginDate = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
for stock in stocks:
start_date=get_security_info(stock).start_date
if start_date<(beginDate-datetime.timedelta(days=n)).date():
stockList.append(stock)
return stockList
#剔除ST股
st_data=get_extras('is_st',stockList, count = 1,end_date=date)
stockList = [stock for stock in stockList if not st_data[stock][0]]
#新股及退市股票
stockList=delect_stop(stockList,date)
#剔除開盤漲停股票
if limit == 1:
#如果需要收盤漲跌停可以改字段即可
df = get_price(stockList,end_date=date,fields=['open','high_limit','low_limit'],count=1).iloc[:,0,:]
df['h_limit']=(df['open']==df['high_limit'])
df['l_limit']=(df['open']==df['low_limit'])
stockList = [df.index[i] for i in range(len(df)) if not (df.h_limit[i] or df.l_limit[i])] #過濾漲跌停股票
return stockList
#為股票池添加行業標記,return df格式 ,為中性化函數的子函數
def get_industry_exposure(stock_list,date):
df = pd.DataFrame(index=get_industries(name='sw_l1').index, columns=stock_list)
for stock in stock_list:
try:
df[stock][get_industry_code_from_security(stock,date=date)] = 1
except:
continue
return df.fillna(0)#將NaN賦為0
#查詢個股所在行業函數代碼(申萬一級) ,為中性化函數的子函數
def get_industry_code_from_security(security,date=None):
industry_index=get_industries(name='sw_l1').index
for i in range(0,len(industry_index)):
try:
index = get_industry_stocks(industry_index[i],date=date).index(security)
return industry_index[i]
except:
continue
return u'未找到'
#初始設置
#設置統計數據區間
index = '000905.XSHG' #設置股票池,和對比基準,這里是中證500
#設置統計起止日期
date_start = '2016-06-01'
date_end = '2019-06-04'
#設置調倉頻率
trade_freq = 'month' #month每個自然月;day每個交易日;輸入任意數字如 5,則為5日調倉
#獲取調倉時間列表
if trade_freq == 'month':
#獲取交易日列表,每月首個交易日
date_list = get_tradeday_list(start=date_start,end=date_end,frequency='month',count=None) #自然月的第一天
elif trade_freq == 'day':
date_list = get_tradeday_list(start=date_start,end=date_end,count=None)#獲取回測日期間的所有交易日
else:
date_day_list = get_tradeday_list(start=date_start,end=date_end,count=None)#獲取回測日期間的所有交易日
date_list = [date_day_list[i] for i in range(len(date_day_list)) if i%int(trade_freq) == 0]
date_list
#通過聚寬因子獲取barra風險因子值進行記錄
def get_barra_factor(stock_list,date):
#聚寬風險因子獲取
factor_name = ['size','beta','momentum','residual_volatility','non_linear_size','book_to_price_ratio','liquidity','earnings_yield','growth','leverage']
factor_data = get_factor_values(securities=stock_list, factors = factor_name
,end_date=end_date,count=1)
df = pd.DataFrame()
for f in factor_name:
temp_df = pd.DataFrame(factor_data[f]).T
df = pd.concat([df,temp_df],axis=1)
df.columns=factor_name
return df
#進行因子值計算
factor_data_dict = {}
#循環時間列表獲取原始因子數據組成dict
for end_date in date_list[:]:
end_date=str(end_date)[:10]
print('正在計算 {} 因子數據......'.format(end_date))
stocks_list = get_index_stocks(index,date=end_date)
factor_data_dict[end_date] = get_barra_factor(stocks_list,end_date)
factor_data_dict[end_date].head(3)
#數據清洗、包括去極值、標準化、中性化等,並加入y值
import time
t1 = time.time()
factor_data_y_dict = {}
for date_1,date_2 in zip(date_list[:-1],date_list[1:]):
d1 = ShiftTradingDay(date_1,1) #往後推一天
d2 = ShiftTradingDay(date_2,1)
print('開始整理 {} 數據...'.format(str(date_1)[:10]))
factor_df = factor_data_dict[str(date_1)[:10]] #根據字典存儲的日期格式不同進行不同設置
#factor_df = factor_data_dict[date_1] #根據字典存儲的日期格式不同進行不同設置
#factor_df.index = [normalize_code('0'*(6-len(str(i)))+str(i)) for i in factor_df.index] #股票代碼處理
pool = list(factor_df.index)
pool = filter_stock(pool,str(d1)[:10],days=21*3) #進行新股、ST股票過濾
#計算指數漲跌幅
df_1 = get_price(index,end_date=d1,fields=['open'],count = 1)['open']
df_2 = get_price(index,end_date=d2,fields=['open'],count = 1)['open']
index_pct = df_2.values[0]/df_1.values[0] - 1#具體數值
#計算各股票漲跌幅
df_1 = get_price(pool,end_date=d1,fields=['open'],count = 1)['open']
df_2 = get_price(pool,end_date=d2,fields=['open'],count = 1)['open']
df_3 = pd.concat([df_1,df_2],axis=0).T #進行合並
stock_pct = df_3.iloc[:,1]/df_3.iloc[:,0] - 1 #計算pct,series
#對數據進行處理、標準化、去極值、中性化
factor_df = winsorize_med(factor_df, scale=3, inclusive=True, inf2nan=True, axis=0) #中位數去極值處理
factor_df = standardlize(factor_df, inf2nan=True, axis=0) #對每列做標準化處理
#factor_df = neutralize(factor_df, how=how_, date=date_1, axis=0,fillna='sw_l1')#中性化
factor_df['pct_alpha'] = stock_pct-index_pct
factor_df['pct_'] = stock_pct
factor_data_y_dict[str(date_1)[:10]] = factor_df
t2 = time.time()
print('計算數據耗時:{0}'.format(t2-t1))
print(factor_data_y_dict[str(date_1)[:10]].shape)
df = factor_data_y_dict['2019-05-06']
df.head(3)
#計算權重向量
#求w
#計算市值平方根占比
pool = df.index
get_size = get_fundamentals(query(valuation.code,valuation.market_cap).filter(valuation.code.in_(pool)),date='2019-05-06')
get_size.index = get_size['code'].values
get_size['l_size'] = np.sqrt(get_size['market_cap'])
get_size['w'] = get_size['l_size']/sum(get_size['l_size'])
W = (get_size['w'].values).reshape(len(get_size),1)
get_size.head(3)
#計算行業市值權重占比
hy_df = get_industry_exposure(pool,date='2019-05-06').T
hy_df['c'] = [1]*len(hy_df)
df_c1 = pd.concat([hy_df,get_size],axis=1)
ind_name = get_industries(name='sw_l1').index
all_mkt = sum(df_c1['market_cap'].values)
ind_w = []
for ind in ind_name:
df_temp = df_c1[df_c1[ind]==1]
r_temp = sum(df_temp['market_cap'].values)/all_mkt
ind_w.append(r_temp)
#計算每期因子收益率
#注意該循環如計算至2019年1月份報錯,可以另起一個cell,保留原來的factor_f_df,跳過報錯日期,繼續計算保存factor_f_df
factor_f_df = pd.DataFrame()
for d in date_list[:-1]:
d = str(d)[:10]
print('正在計算{}...'.format(d))
#獲取因子暴露
factor_df = factor_data_y_dict[d]
x = factor_df.iloc[:,:-2]
r = factor_df['pct_'].values
pool = factor_df.index
#計算市值平方根占比
get_size = get_fundamentals(query(valuation.code,valuation.market_cap).filter(valuation.code.in_(pool)),date=d)
get_size.index = get_size['code'].values
get_size['l_size'] = np.sqrt(get_size['market_cap'])
get_size['w'] = get_size['l_size']/sum(get_size['l_size'])
W = (get_size['w'].values).reshape(len(get_size),1)
#計算行業市值權重占比
hy_df = get_industry_exposure(pool,date=d).T
hy_df['c'] = [1]*len(hy_df)
df_c1 = pd.concat([hy_df,get_size],axis=1)
ind_name = get_industries(name='sw_l1').index
all_mkt = sum(df_c1['market_cap'].values)
ind_w = []
for ind in ind_name:
df_temp = df_c1[df_c1[ind]==1]
r_temp = sum(df_temp['market_cap'].values)/all_mkt
ind_w.append(r_temp)
#進行大X拚接
X_ = pd.concat([x,hy_df],axis=1)
#最優化求解因子收益率
X = matrix(X_.values)
w_m = get_size['market_cap'].values
w_i = ind_w
#最優化求解因子收益率
def func(f):
sum_l = []
for i in range(len(r)):
if str(r[i]) !='nan':
sum_l.append(w_m[i]*(r[i]-np.dot(X[i],f))**2)
return sum(sum_l)
def func_cons(x):
return sum(multiply(x[-35:-1],w_i))
# 初始值 + 約束條件
f0 = np.ones(45) / 10**4
bnds = tuple((-1,1) for x in f0)
cons = ({'type':'eq', 'fun': func_cons})
options={'disp':False, 'maxiter':1000, 'ftol':1e-4,'eps':1e-4}
res = minimize(func, f0, bounds=bnds, constraints=cons, method='SLSQP', options=options)
factor_f_df[d] = res['x']
factor_f_df.head(3)
factor_f_df.head(3)
市值風格解析
#風格收益
(factor_f_df.iloc[:,:10]+1).cumprod().plot(figsize=(20,8))
#純淨因子累計收益
((factor_f_df.iloc[:,:10]+1).cumprod()-1).iloc[-1,:].plot(kind='bar',figsize=(20,8))
#行業累計收益
((factor_f_df.iloc[:,10:]+1).cumprod()-1).iloc[-1,:].plot(kind='bar',figsize=(20,8))
組合收益歸因
#中證500選50只股票,進行收益分析
#進行因子值計算
factor_data_50_dict = {}
factor_name = ['size','beta','momentum','residual_volatility','non_linear_size','book_to_price_ratio','liquidity','earnings_yield','growth','leverage']
#循環時間列表獲取原始因子數據組成dict
for end_date in date_list[:-1]:
end_date=str(end_date)[:10]
print('正在計算 {} 因子數據......'.format(end_date))
stocks_list = get_index_stocks(index,date=end_date)
pool_ = [stocks_list[i] for i in range(len(stocks_list)) if i%10==0]
stocks_list = pool_
#計算行業市值權重占比
hy_df = get_industry_exposure(pool_,date=end_date).T
hy_df['c'] = [1]*len(hy_df)
x = factor_data_y_dict[end_date].loc[pool_,factor_name]
df_pct = factor_data_y_dict[end_date].loc[pool_,['pct_']]
#進行大X拚接
X_ = pd.concat([x,hy_df,df_pct],axis=1)
factor_data_50_dict[end_date] = X_
factor_data_50_dict[end_date].head(3)
factor_f_df = factor_f_df.T
mark = 1
for d in date_list[:20]:
end_date = str(d)[:10]
d1 = factor_data_50_dict[end_date]
d2 = factor_f_df[end_date]
y_df = pd.DataFrame()
y_df['pct_'] = d1['pct_']
y_df['p_pct_']= np.dot(d1.iloc[:,:-1],d2)
if mark == 1:
y_df_ = y_df
mark = 0
else:
y_df_ = pd.concat([y_df_,y_df],axis=0)
y_df_.head(3)
y_df_.plot(x='pct_', y='p_pct_', kind='scatter',figsize=(12,8))
#以20190506為例,檢查因子暴露度百分位
end_date = '2019-05-06'
d3 = factor_data_y_dict[end_date]
d1 = factor_data_50_dict[end_date].dropna(axis=0)
d_mean = d1.mean(axis=0)[:10]
d4 = pd.concat([d3.iloc[:,:10].T,d_mean],axis=1)
d4['mid'] = [0.5]*len(d4)
(d4.rank(axis=1)[0]/500).plot(kind='bar',figsize=(12,6))
d4['mid'].plot()
plt.show()
y_df.plot(figsize=(12,8))