說在前面:
寫本篇文章的目的是解決我上一篇文章《【機器學習的簡單套用】輪動選股模型》一個遺留小問題:如何通過統計邏輯解決因子篩選問題?
上一篇文章link: https://www.joinquant.com/view/community/detail/50d78ca39cf9e6cdec7c7126c75580a8?type=1
PS: 小弟今年大三金融,以下是小弟刷了一些報告後自己整合出來的因子測試框架邏輯(感覺print出來的結果怪怪的),如果在邏輯和編程上有錯誤,還請各位大佬多多指出,多多交流!!
歡迎各種Q,能加我微信交流就最好了哈哈~(Howard董先森: dhl19988882)
0. 基本框架
巨人的肩膀:
參考國盛證券: 《20190116-國盛證券-多因子系列之一:多因子選股體系的思考》 page 21
0.1 ICIR指標
公式來源:國盛證券: 《20190116-國盛證券-多因子系列之一:多因子選股體系的思考》
定義:
IC: 某時點 T0 某因子在全部股票的暴露值與其下期 (T0-T1) 回報的截面相關系數
Rank IC: 某時點 T0 某因子在全部股票暴露值排名與其下期 (T0-T1) 回報排名的截面相關系數
目的:
IC 和 Rank IC: 判斷因子對下一期標的收益率的預測能力,線性框架下,IC越高,因子預測股票收益能力越強
IR 和 Rank IR: 判斷預測能力的穩定性情況
0.2 回歸法
定義:
自變量X:T0時間點因子暴露(因子值大小)
因變量Y: T0-T1時間段股票收益率
回歸算法(RLM):Robust Regression 穩健回歸常見於單因子回歸測試,RLM 通過迭代的賦權回歸可以有效的減小OLS 最小二乘法中異常值(outliers)對參數估計結果有效性和穩定性的影響。(來自:光大證券:《20170501-光大證券-多因子系列報告之三:多因子組合光大Alpha_1.0》, page 5)
因子收益率: 因子暴露X對股票收益率Y的回歸後出來的,回歸系數
因子收益率T值: 每個因子單因子回歸後的T值,衡量顯著性大小
0.3 單調性指標(分層回溯)
單調性指標意義:
衡量因子區分度, 單調性越強的因子,在分層回溯的情況下,對股票的區分度越好
分層回溯:
常見做法是將股票池中按照因子的大小等分組(我的模型是分為10組)構建投資組合,觀察十個投資組合的表現是否依照因子值呈現單調性
0.4 因子篩選(打分)邏輯
靈感來自光大證券:《20170501-光大證券-多因子系列報告之三:多因子組合光大Alpha_1.0》 page 6
PS:
由於單調性指標提取時間很漫長,本文先通過回歸法和信息系數法,來對因子進行初步篩選,然後再用分層回測方法看檢測單個因子的單調性
1. 結果
1.1 打分表頭
1.2 分層回溯(gross profit margin 因子為例)
1.3 多空組合(gross profit margin 因子為例)
1.4 其他結果 (gross profit margin 因子為例)
2. 後記
後記:
下面說一下對於這個因子篩選框架的待解決問題和對於該問題的一些思考:
1. 因子預測有效性的持久度:
有些因子在樣本時間段內有很好的表現,可能在別的時間段就沒有這種表現,那麼這種 “泛化” 問題如何解決?能不能用半衰期 (先計算T0因子暴露對T0-T1、T1-T2……各種滯後時間周期的標的收益率的IC值,然後記錄IC減少到初始值的一半所用時間為半衰期), 用半衰期的各種統計指標如平均值,來衡量待測因子的預測有效性?
2. 篩選本質問題:
對於篩選掉(剔除掉)的因子,難道它們對於以後的標的收益率的預測就沒有貢獻嗎?能不能通過降維來達到 “篩選” 的目的?
參考資料:
上網查了一下知乎,發現有一篇講因子測試的知乎推文講的蠻好的(知乎求職廣告打得很溜,學習了):https://zhuanlan.zhihu.com/p/31733061
Fama和Barra兩個體系下的因子指標定義or介紹(主要是兩位大佬對因子暴露理解不一樣,其他個人認為還好),鏈接: https://henix.github.io/feeds/zhuanlan.factor-investing/2018-08-05-41339443.html
IC相關介紹,請看:《關於多因子模型的“IC信息系數”那些事》
https://zhuanlan.zhihu.com/p/24616859
【筆記】因子測試框架¶
說在前面:¶
寫本篇文章的目的是解決我上一篇文章《【機器學習的簡單套用】輪動選股模型》一個遺留小問題:如何通過統計邏輯解決因子篩選問題?¶
上一篇文章link: https://www.joinquant.com/view/community/detail/50d78ca39cf9e6cdec7c7126c75580a8?type=1
PS: 小弟今年大三金融,以下是小弟刷了一些報告後自己整合出來的因子測試框架邏輯(感覺print出來的結果怪怪的),如果在邏輯和編程上有錯誤,還請各位大佬多多指出,多多交流!!
歡迎各種Q,能加我微信交流就最好了哈哈~(Howard董先森: dhl19988882)
網盤資料:包含先處理好的數據,以及一些研報等參考資料 鏈接:https://pan.baidu.com/s/1hL77LHW_-ot9XL3Ac8Ldtw 提取碼:vzi5
0. 基本框架¶
巨人的肩膀:¶
參考國盛證券: 《20190116-國盛證券-多因子系列之一:多因子選股體系的思考》 page 21
0.1 ICIR指標¶
公式來源:國盛證券: 《20190116-國盛證券-多因子系列之一:多因子選股體系的思考》
定義:¶
IC: 某時點 T0 某因子在全部股票的暴露值與其下期 (T0-T1) 回報的截面相關系數
Rank IC: 某時點 T0 某因子在全部股票暴露值排名與其下期 (T0-T1) 回報排名的截面相關系數
目的:¶
IC 和 Rank IC: 判斷因子對下一期標的收益率的預測能力,線性框架下,IC越高,因子預測股票收益能力越強
IR 和 Rank IR: 判斷預測能力的穩定性情況
0.2 回歸法¶
定義:¶
自變量X:T0時間點因子暴露(因子值大小)
因變量Y: T0-T1時間段股票收益率
回歸算法(RLM):Robust Regression 穩健回歸常見於單因子回歸測試,RLM 通過迭代的賦權回歸可以有效的減小OLS 最小二乘法中異常值(outliers)對參數估計結果有效性和穩定性的影響。(來自:光大證券:《20170501-光大證券-多因子系列報告之三:多因子組合光大Alpha_1.0》, page 5)
因子收益率: 因子暴露X對股票收益率Y的回歸後出來的,回歸系數
因子收益率T值: 每個因子單因子回歸後的T值,衡量顯著性大小
0.3 單調性指標(分層回溯)¶
單調性指標意義:¶
衡量因子區分度, 單調性越強的因子,在分層回溯的情況下,對股票的區分度越好
分層回溯:¶
常見做法是將股票池中按照因子的大小等分組(我的模型是分為10組)構建投資組合,觀察十個投資組合的表現是否依照因子值呈現單調性
0.4 因子篩選(打分)邏輯¶
靈感來自光大證券:《20170501-光大證券-多因子系列報告之三:多因子組合光大Alpha_1.0》 page 6
PS:¶
由於單調性指標提取時間很漫長,本文先通過回歸法和信息系數法,來對因子進行初步篩選,然後再用分層回測方法看檢測單個因子的單調性
1. 數據準備¶
下面是我自己包裝好的一些框架函數,大家有需要就用,replicate時給我credit就好~
第一行是我之前做的套用機器學習輪動選股的框架,link: https://www.joinquant.com/view/community/detail/50d78ca39cf9e6cdec7c7126c75580a8?type=1
# 【基礎框架函數】輪動框架 v9
# coding: utf-8
# 制作人:Howard董先森, wechat:dhl19988882,e-mail: 16hldong.stu.edu.cn
# copy並分享的時候記得給小弟一下credit就好~
import warnings
warnings.filterwarnings("ignore")
'''
導入函數庫
'''
import pandas as pd
import numpy as np
import time
import datetime
import statsmodels.api as sm
import pickle
import warnings
from itertools import combinations
import itertools
from jqfactor import get_factor_values
from jqdata import *
from jqfactor import neutralize
from jqfactor import standardlize
from jqfactor import winsorize
from jqfactor import neutralize
import calendar
from scipy import stats
import statsmodels.api as sm
from statsmodels import regression
import csv
from six import StringIO
from jqlib.technical_analysis import *
#導入pca
from sklearn.decomposition import PCA
from sklearn import svm
from sklearn.model_selection import train_test_split
from sklearn.grid_search import GridSearchCV
from sklearn import metrics
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.pyplot as plt
from jqfactor import neutralize
'''
普通工具
'''
# 多重列表分解函數
def splitlist(b):
alist = []
a = 0
for sublist in b:
try: #用try來判斷是列表中的元素是不是可迭代的,可以迭代的繼續迭代
for i in sublist:
alist.append (i)
except TypeError: #不能迭代的就是直接取出放入alist
alist.append(sublist)
for i in alist:
if type(i) == type([]):#判斷是否還有列表
a =+ 1
break
if a==1:
return printlist(alist) #還有列表,進行遞歸
if a==0:
return alist
#PCA降維
def pca_analysis(data,n_components='mle'):
index = data.index
model = PCA(n_components=n_components)
model.fit(data)
data_pca = model.transform(data)
df = pd.DataFrame(data_pca,index=index)
return df
'''
數據準備
'''
# 指數池子清洗函數
def get_stock_code(index, date):
# 指數池子提取
stockList = get_index_stocks(index , date=date)
#剔除ST股
st_data=get_extras('is_st',stockList, count = 1,end_date = start_date)
stockList = [stock for stock in stockList if not st_data[stock][0]]
#剔除停牌、新股及退市股票
stockList=delect_stop(stockList,date,date)
return stockList
# 因子排列組合函數
def factor_combins(data):
'''
輸入:
格式為Dataframe
data
輸出:
格式為list
所有因子的排列組合
'''
# 排列組合
factor_combins =[]
for i in range(len(data.columns)+1):
combins = [c for c in combinations((data.columns),i)]
factor_combins.append(combins)
# 剔除排列組合中空的情況
factor_combins.pop(0)
# 多層列表展開平鋪成一層
factor_list = list(itertools.chain(*factor_combins))
return factor_list
#獲取時間為date的全部因子數據
def get_factor_data(stock_list,date):
data=pd.DataFrame(index=stock_list)
# 第一波因子提取
q = query(valuation,balance,cash_flow,income,indicator).filter(valuation.code.in_(stock_list))
df = get_fundamentals(q, date)
df['market_cap']=df['market_cap']*100000000
df.index = df['code']
del df['code'],df['id']
# 第二波因子提取
factor_data=get_factor_values(stock_list,['roe_ttm','roa_ttm','total_asset_turnover_rate',\
'net_operate_cash_flow_ttm','net_profit_ttm',\
'cash_to_current_liability','current_ratio',\
'gross_income_ratio','non_recurring_gain_loss',\
'operating_revenue_ttm','net_profit_growth_rate'],end_date=date,count=1)
# 第二波因子轉dataframe格式
factor=pd.DataFrame(index=stock_list)
for i in factor_data.keys():
factor[i]=factor_data[i].iloc[0,:]
#合並得大表
df=pd.concat([df,factor],axis=1)
# 有些因子沒有的,用別的近似代替
df.fillna(0, inplace = True)
# profitability 因子
# proft/net revenue
data['gross_profit_margin'] = df['gross_profit_margin']
data['net_profit_margin'] = df['net_profit_margin']
data['operating_profit_margin'] = df['operating_profit']/(df['total_operating_revenue']-df['total_operating_cost'])
data['pretax_margin'] = (df['net_profit'] + df['income_tax_expense'])/(df['total_operating_revenue']-df['total_operating_cost'])
# profit/capital
data['ROA'] = df['roa']
data['EBIT_OA'] = df['operating_profit']/df['total_assets']
data['ROTC'] = df['operating_profit']/(df['total_owner_equities'] + df['longterm_loan'] + df['shortterm_loan']+df['borrowing_from_centralbank']+df['bonds_payable']+df['longterm_account_payable'])
data['ROE'] = df['roe']
# Others
data['inc_total_revenue_annual'] = df['inc_total_revenue_annual']
# Activity 因子
# inventory
data['inventory_turnover'] = df['operating_cost']/df['inventories']
data['inventory_processing_period'] = 365/data['inventory_turnover']
# account receivables
data['receivables_turnover'] =df['total_operating_revenue']/df['account_receivable']
data['receivables_collection_period'] = 365/data['receivables_turnover']
# activity cycle
data['operating_cycle'] = data['receivables_collection_period']+data['inventory_processing_period']
# Other turnovers
data['total_asset_turnover'] = df['operating_revenue']/df['total_assets']
data['fixed_assets_turnover'] = df['operating_revenue']/df['fixed_assets']
data['working_capital_turnover'] = df['operating_revenue']/(df['total_current_assets']-df['total_current_liability'])
# Liquidity 因子
data['current_ratio'] =df['current_ratio']
data['quick_ratio'] =(df['total_current_assets']-df['inventories'])/df['total_current_liability']
data['cash_ratio'] = (df['cash_equivalents']+df['proxy_secu_proceeds'])/df['total_current_liability']
# leverage
data['debt_to_equity'] = (df['longterm_loan'] + df['shortterm_loan']+df['borrowing_from_centralbank']+df['bonds_payable']+df['longterm_account_payable'])/df['total_owner_equities']
data['debt_to_capital'] = (df['longterm_loan'] + df['shortterm_loan']+df['borrowing_from_centralbank']+df['bonds_payable']+df['longterm_account_payable'])/(df['longterm_loan'] + df['shortterm_loan']+df['borrowing_from_centralbank']+df['bonds_payable']+df['longterm_account_payable']+df['total_owner_equities'])
data['debt_to_assets'] = (df['longterm_loan'] + df['shortterm_loan']+df['borrowing_from_centralbank']+df['bonds_payable']+df['longterm_account_payable'])/df['total_assets']
data['financial_leverage'] = df['total_assets'] / df['total_owner_equities']
# Valuation 因子
# Price
data['EP']=df['net_profit_ttm']/df['market_cap']
data['PB']=df['pb_ratio']
data['PS']=df['ps_ratio']
data['P_CF']=df['market_cap']/df['net_operate_cash_flow']
# 技術面 因子
# 下面技術因子來自於西交大元老師量化小組的SVM研究,report,https://www.joinquant.com/view/community/detail/15371
stock = stock_list
#個股60個月收益與上證綜指回歸的截距項與BETA
stock_close=get_price(stock, count = 60*20+1, end_date=date, frequency='daily', fields=['close'])['close']
SZ_close=get_price('000001.XSHG', count = 60*20+1, end_date=date, frequency='daily', fields=['close'])['close']
stock_pchg=stock_close.pct_change().iloc[1:]
SZ_pchg=SZ_close.pct_change().iloc[1:]
beta=[]
stockalpha=[]
for i in stock:
temp_beta, temp_stockalpha = stats.linregress(SZ_pchg, stock_pchg[i])[:2]
beta.append(temp_beta)
stockalpha.append(temp_stockalpha)
#此處alpha beta為list
data['alpha']=stockalpha
data['beta']=beta
#動量
data['return_1m']=stock_close.iloc[-1]/stock_close.iloc[-20]-1
data['return_3m']=stock_close.iloc[-1]/stock_close.iloc[-60]-1
data['return_6m']=stock_close.iloc[-1]/stock_close.iloc[-120]-1
data['return_12m']=stock_close.iloc[-1]/stock_close.iloc[-240]-1
#取換手率數據
data_turnover_ratio=pd.DataFrame()
data_turnover_ratio['code']=stock
trade_days=list(get_trade_days(end_date=date, count=240*2))
for i in trade_days:
q = query(valuation.code,valuation.turnover_ratio).filter(valuation.code.in_(stock))
temp = get_fundamentals(q, i)
data_turnover_ratio=pd.merge(data_turnover_ratio, temp,how='left',on='code')
data_turnover_ratio=data_turnover_ratio.rename(columns={'turnover_ratio':i})
data_turnover_ratio=data_turnover_ratio.set_index('code').T
#個股個股最近N個月內用每日換手率乘以每日收益率求算術平均值
data['wgt_return_1m']=mean(stock_pchg.iloc[-20:]*data_turnover_ratio.iloc[-20:])
data['wgt_return_3m']=mean(stock_pchg.iloc[-60:]*data_turnover_ratio.iloc[-60:])
data['wgt_return_6m']=mean(stock_pchg.iloc[-120:]*data_turnover_ratio.iloc[-120:])
data['wgt_return_12m']=mean(stock_pchg.iloc[-240:]*data_turnover_ratio.iloc[-240:])
#個股個股最近N個月內用每日換手率乘以函數exp(-x_i/N/4)再乘以每日收益率求算術平均值
temp_data=pd.DataFrame(index=data_turnover_ratio[-240:].index,columns=stock)
temp=[]
for i in range(240):
if i/20<1:
temp.append(exp(-i/1/4))
elif i/20<3:
temp.append(exp(-i/3/4))
elif i/20<6:
temp.append(exp(-i/6/4))
elif i/20<12:
temp.append(exp(-i/12/4))
temp.reverse()
for i in stock:
temp_data[i]=temp
data['exp_wgt_return_1m']=mean(stock_pchg.iloc[-20:]*temp_data.iloc[-20:]*data_turnover_ratio.iloc[-20:])
data['exp_wgt_return_3m']=mean(stock_pchg.iloc[-60:]*temp_data.iloc[-60:]*data_turnover_ratio.iloc[-60:])
data['exp_wgt_return_6m']=mean(stock_pchg.iloc[-120:]*temp_data.iloc[-120:]*data_turnover_ratio.iloc[-120:])
data['exp_wgt_return_12m']=mean(stock_pchg.iloc[-240:]*temp_data.iloc[-240:]*data_turnover_ratio.iloc[-240:])
#波動率
data['std_1m']=stock_pchg.iloc[-20:].std()
data['std_3m']=stock_pchg.iloc[-60:].std()
data['std_6m']=stock_pchg.iloc[-120:].std()
data['std_12m']=stock_pchg.iloc[-240:].std()
#股價
data['ln_price']=np.log(stock_close.iloc[-1])
#換手率
data['turn_1m']=mean(data_turnover_ratio.iloc[-20:])
data['turn_3m']=mean(data_turnover_ratio.iloc[-60:])
data['turn_6m']=mean(data_turnover_ratio.iloc[-120:])
data['turn_12m']=mean(data_turnover_ratio.iloc[-240:])
data['bias_turn_1m']=mean(data_turnover_ratio.iloc[-20:])/mean(data_turnover_ratio)-1
data['bias_turn_3m']=mean(data_turnover_ratio.iloc[-60:])/mean(data_turnover_ratio)-1
data['bias_turn_6m']=mean(data_turnover_ratio.iloc[-120:])/mean(data_turnover_ratio)-1
data['bias_turn_12m']=mean(data_turnover_ratio.iloc[-240:])/mean(data_turnover_ratio)-1
#技術指標
data['PSY']=pd.Series(PSY(stock, date, timeperiod=20))
data['RSI']=pd.Series(RSI(stock, date, N1=20))
data['BIAS']=pd.Series(BIAS(stock,date, N1=20)[0])
dif,dea,macd=MACD(stock, date, SHORT = 10, LONG = 30, MID = 15)
data['DIF']=pd.Series(dif)
data['DEA']=pd.Series(dea)
data['MACD']=pd.Series(macd)
# 去inf
a = np.array(data)
where_are_inf = np.isinf(a)
a[where_are_inf] = nan
factor_data =pd.DataFrame(a, index=data.index, columns = data.columns)
return factor_data
# 行業代碼獲取函數
def get_industry(date,industry_old_code,industry_new_code):
'''
輸入:
date: str, 時間點
industry_old_code: list, 舊的行業代碼
industry_new_code: list, 新的行業代碼
輸出:
list, 行業代碼
'''
#獲取行業因子數據
if datetime.datetime.strptime(date,"%Y-%m-%d").date()<datetime.date(2014,2,21):
industry_code=industry_old_code
else:
industry_code=industry_new_code
len(industry_code)
return industry_code
#取股票對應行業
def get_industry_name(i_Constituent_Stocks, value):
return [k for k, v in i_Constituent_Stocks.items() if value in v]
#缺失值處理
def replace_nan_indu(factor_data, stockList, industry_code, date):
'''
輸入:
factor_data: dataframe, 有缺失值的因子值
stockList: list, 指數池子所有標的
industry_code: list, 行業代碼
date: str, 時間點
輸出:
格式為list
所有因子的排列組合
'''
# 創建一個字典,用於儲存每個行業對應的成分股(行業標的和指數標的取交集)
i_Constituent_Stocks={}
# 創建一個行業因子數據表格,待填因子數據(暫定為均值)
ind_factor_data = pd.DataFrame(index=industry_code,columns=factor_data.columns)
# 遍曆所有行業
for i in industry_code:
# 獲取單個行業所有的標的
stk_code = get_industry_stocks(i, date)
# 儲存每個行業對應的所有成分股到剛剛創建的字典中(行業標的和指數標的取交集)
i_Constituent_Stocks[i] = list(set(stk_code).intersection(set(stockList)))
# 對於某個行業,根據提取出來的指數池子,把所有交集內的因子值取mean,賦值到行業因子值的表格上
ind_factor_data.loc[i]=mean(factor_data.loc[i_Constituent_Stocks[i],:])
# 對於行業值缺失的(行業標的和指數標的取交集),用所有行業的均值代替
for factor in ind_factor_data.columns:
# 提取行業缺失值所在的行業,並放到list去
null_industry=list(ind_factor_data.loc[pd.isnull(ind_factor_data[factor]),factor].keys())
# 遍曆行業缺失值所在的行業
for i in null_industry:
# 所有行業的均值代替,行業缺失的數值
ind_factor_data.loc[i,factor]=mean(ind_factor_data[factor])
# 提取含缺失值的標的
null_stock=list(factor_data.loc[pd.isnull(factor_data[factor]),factor].keys())
# 用行業值(暫定均值)去填補,指數因子缺失值
for i in null_stock:
industry = get_industry_name(i_Constituent_Stocks, i)
if industry:
factor_data.loc[i, factor] = ind_factor_data.loc[industry[0], factor]
else:
factor_data.loc[i, factor] = mean(factor_data[factor])
return factor_data
#去除上市距beginDate不足1年且退市在endDate之後的股票
def delect_stop(stocks,beginDate,endDate,n=365):
stockList=[]
beginDate = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
endDate = datetime.datetime.strptime(endDate, "%Y-%m-%d")
for stock in stocks:
start_date=get_security_info(stock).start_date
end_date=get_security_info(stock).end_date
if start_date<(beginDate-datetime.timedelta(days=n)).date() and end_date>endDate.date():
stockList.append(stock)
return stockList
# 獲取預處理後的因子
def prepare_data(date,index):
'''
輸入:
date: str, 時間點
index: 指數
輸出:
data: 預備處理好的了dataframe
'''
# 指數清洗(剔除ST、退市、新上市不足三個月的標的)
stockList = get_stock_code(index, date)
# 獲取數據
factor_data = get_factor_data(stockList, date)
# 數據預處理
# 缺失值處理
data = replace_nan_indu(factor_data, stockList, industry_code, date)
# 啟動內置函數去極值
data = winsorize(data, qrange=[0.05,0.93], inclusive=True, inf2nan=True, axis=0)
# 啟動內置函數標準化
data = standardlize(data, inf2nan=True, axis=0)
# 啟動內置函數中性化,行業為'sw_l1'
data = neutralize(data, how=['sw_l1','market_cap'], date=date , axis=0)
return data
'''
時間準備
'''
# 時間管理函數
def time_handle(start_date, end_date, period, period_pre,index):
'''
輸入:
start_date: 開始時間,str
end_date: 截至時間,str
period: 周期間隔,str(轉換為周是'W',月'M',季度線'Q',五分鍾'5min',12天'12D')
period_pre: 小周期間隔,str(為了提取第一個T0數據,作為輔助用途)
輸出:
格式為dataframe
columns:
T0: T0時間點的集合
T1: T1時間點的集合
T2: T2時間點的集合
'''
# 設定轉換周期 period_type 轉換為周是'W',月'M',季度線'Q',五分鍾'5min',12天'12D'
time = get_price(index ,start_date,end_date,'daily',fields=['close'])
# 記錄每個周期中最後一個交易日
time['date']=time.index
# 進行轉換,周線的每個變量都等於那一周中最後一個交易日的變量值
period_handle=time.resample(period,how='last')
period_handle['t1'] = period_handle['date']
# 創造超前數據T2
period_handle['t2']=period_handle['t1'].shift(-1)
# 重新設定index,設為模型運行次數
period_handle.index = range(len(period_handle))
# 構建T0
period_handle['t0'] = period_handle['t1'].shift(1)
# 重新獲取小一個周期的數據
time1 = get_price(index ,start_date, end_date,'daily',fields=['close'])
# 記錄每個周期中最後一個交易日
time1['date']=time1.index
# 進行轉換,周線的每個變量都等於那一周中最後一個交易日的變量值
ph =time1.resample(period_pre,how='last').dropna()
# 通過更小的周期來提取的最後一個交易日的值,來提取大表格的第一個T0
period_handle['t0'][0] = ph['date'][0]
# 記錄模型運行次數
period_handle['Replication#'] = period_handle.index
period_handle['Replication'] = period_handle['Replication#'].shift(-1)
del period_handle['Replication#']
del period_handle['date']
# 重新排列列表順序
period_handle['T0'] = period_handle['t0']
period_handle['T1'] = period_handle['t1']
period_handle['T2'] = period_handle['t2']
del period_handle['close']
del period_handle['t0']
del period_handle['t1']
del period_handle['t2']
period_handle.to_csv('period_handle.csv')
return period_handle
# 時間格式修改(從timestamp改為str)
def time_fix(period_handle):
# 導入時間周期管理函數
p = time_handle(start_date, end_date, period, period_pre,index)
# dropna 並且list化
datelist_T0 = list(p.dropna()['T0'])
datelist_T1 = list(p.dropna()['T1'])
datelist_T2 = list(p.dropna()['T2'])
# timestamp 轉str
datelist_T0 = list(np.vectorize(lambda s: s.strftime('%Y-%m-%d'))(datelist_T0))
datelist_T1 = list(np.vectorize(lambda s: s.strftime("%Y-%m-%d"))(datelist_T1))
datelist_T2 = list(np.vectorize(lambda s: s.strftime("%Y-%m-%d"))(datelist_T2))
# list字典化,並轉為dateframe
t = {'T0': datelist_T0, 'T1':datelist_T1, 'T2':datelist_T2}
test_timing = pd.DataFrame(t)
return test_timing
'''
分類函數
'''
# 獲取指數在所有周期內的池子,然後取不重複set的並集,變成新的池子
def get_all_index_stock(index, period_handle):
'''
輸入:
stock_list: 所有周期池子的出現過的股票, list
period_handle: 周期管理函數出來的時間集合, dataframe
輸出:
stock_list: list, 所有周期內,於指數出現過的標的池子
'''
# 時間周期管理
p = time_handle(start_date, end_date, period, period_pre,index)
# 大池子提取
stock_list_pre = []
for date in p['T0']:
# 獲取每個T0的指數池子
stk_list = get_index_stocks(index , date=date)
# 放到stock_list里面
stock_list_pre.append(stk_list)
# 分解多重列表,去重複,返回list
s = splitlist(stock_list_pre)
stock_list = list(set(s))
return stock_list
# 所有周期,指數池子里面所有股票的周期收益率
def get_index_big_pool_return(stock_list, period_handle):
'''
輸入:
stock_list: 所有周期池子的出現過的股票, list
period_handle: 周期管理函數出來的時間集合, dataframe
輸出:
格式為dataframe
columns:
T0: T0時間點的集合
code1: 標的1在T0到T1的收益率
code2: 標的2在T0到T1的收益率
以此類推
'''
# 獲取指數池子所有的索引
stock_code = stock_list
p = time_handle(start_date, end_date, period, period_pre,index)
# 獲取時間表格,剔除T1、T2和Replication列
stk_return = p.copy()
# 收益率計算
for j in stock_code:
j_str = str(j)
p0 = [float(get_price(j,i,i)['close'])for i in p.iloc[:,1]]
p1 = [float(get_price(j,i,i)['close'])for i in p.iloc[:,2]]
a = [(m-n)/n for m,n in zip (p1,p0)]
stk_return[str(j)] = a
# 刪除T1列,保留T0列 ,並賦值給index
del stk_return['T1']
stk_return.index = stk_return['T0']
del stk_return['T0']
del stk_return['T2']
del stk_return['Replication']
# 缺失值用0填補
stk_return.fillna(0)
# 保存
stk_return.to_csv('stock_return.csv')
return stk_return
# 訓練:分類函數
def train_classification(stk_return, index_return):
'''
輸入:
skt_retun: 所有標的在T0-T1時間集當中的周期收益率, dataframe
index_return: 指數在T0-T1時間集合中的周期收益率, dataframe
輸出:
格式為dataframe
columns:
T0: T0時間點的集合
index:
code1: 在T0-T1這個周期里面是否跑贏大盤,跑贏為'1',否則為'0'
code2: 是否跑贏大盤,分類數據
以此類推
'''
# 複制skt_return的dataframe為最終的輸出格式
stock_cl = stk_return.copy()
# 遍曆比較,賦值
for j in range(len(stock_cl.index)):
for i in range(len(stock_cl.columns)):
if stock_cl.iloc[j,i] > index_return.iloc[j,0]:
stock_cl.iloc[j,i] = 1
else:
stock_cl.iloc[j,i] = 0
# 轉置
stock_cl = stock_cl.T
# 保存
stock_cl.to_csv('stock_classification_data.csv')
return stock_cl
# 獲取T0 到 T1時間段,指數周期收益率
def get_index_return_train(index,period_handle):
'''
輸入:
index: 指數, 元素str
period_handle: 周期管理函數出來的時間集合, dataframe
輸出:
格式為dataframe
columns:
T0: T0時間點的集合
index_return: 指數在T0到T1的收益率
'''
# 獲取時間表格,剔除T1、T2和Replication列
p = time_handle(start_date, end_date, period, period_pre,index)
index_return = p.copy()
del index_return['Replication']
del index_return['T2']
# 收益率計算
p0 = [float(get_price(index,i,i)['close']) for i in p.iloc[:,1]]
p1 = [float(get_price(index,i,i)['close']) for i in p.iloc[:,2]]
a = [(m-n)/n for m,n in zip(p1,p0) ]
index_return['index_return'] = a
del index_return['T1']
index_return.index = index_return['T0']
del index_return['T0']
# 保存現成的表格
index_return.to_csv('index_return.csv')
return index_return
# 獲取T1 到 T2 時間段, 指數周期的收益率
def get_index_return_test(index,period_handle):
'''
輸入:
index: 指數, 元素str
period_handle: 周期管理函數出來的時間集合, dataframe
輸出:
格式為dataframe
columns:
T2: T2時間點的集合
index_return: 指數在T0到T1的收益率
'''
# 獲取時間表格,剔除T1、T2和Replication列
p = time_handle(start_date, end_date, period, period_pre,index).dropna()
index_return = p.copy()
del index_return['Replication']
del index_return['T0']
# 收益率計算
p0 = [float(get_price(index,i,i)['close']) for i in p.iloc[:,2]]
p1 = [float(get_price(index,i,i)['close']) for i in p.iloc[:,3]]
a = [(m-n)/n for m,n in zip(p1,p0) ]
index_return['index_return'] = a
del index_return['T1']
index_return.index = index_return['T2']
del index_return['T2']
# 保存現成的表格
index_return.to_csv('index_return.csv')
return index_return
# 獲取T0 到 T1時間段,指數池子里面所有股票的周期收益率
def get_stock_return_train(data, period_handle):
'''
輸入:
data: 因子提取的大表格, dataframe
period_handle: 周期管理函數出來的時間集合, dataframe
輸出:
格式為dataframe
columns:
T0: T0時間點的集合
code1: 標的1在T0到T1的收益率
code2: 標的2在T0到T1的收益率
以此類推
'''
# 獲取指數池子所有的索引
stock_code = list(data.index)
p = time_handle(start_date, end_date, period, period_pre,index)
# 獲取時間表格,剔除T1、T2和Replication列
stk_return = p.copy()
# 收益率計算
for j in stock_code:
j_str = str(j)
p0 = [float(get_price(j,i,i)['close'])for i in p.iloc[:,1]]
p1 = [float(get_price(j,i,i)['close'])for i in p.iloc[:,2]]
a = [(m-n)/n for m,n in zip (p1,p0)]
stk_return[str(j)] = a
# 刪除T1列,保留T0列 ,並賦值給index
del stk_return['T1']
stk_return.index = stk_return['T0']
del stk_return['T0']
del stk_return['T2']
del stk_return['Replication']
# 保存文件
stk_return.to_csv('stk_return_train.csv')
return stk_return
# 獲取 T1 到 T2時間段,預測池子里面所有股票的周其收益率
def get_stock_return_test(pool_list, period_handle):
'''
輸入:
pool_list: 根據預測出來的標的池子, list
period_handle: 周期管理函數出來的時間集合, dataframe
輸出:
格式為dataframe
columns: T2時間點集合
index: 股票對應的收益率
'''
p = time_handle(start_date, end_date, period, period_pre,index)
# 獲取時間表格,剔除T1、T2和Replication列
stk_return = p.copy()
# 收益率計算
for j in pool_list:
j_str = str(j)
p0 = [float(get_price(j,i,i)['close'])for i in p.iloc[:,2]]
p1 = [float(get_price(j,i,i)['close'])for i in p.iloc[:,3]]
a = [(m-n)/n for m,n in zip (p1,p0)]
stk_return[str(j)] = a
# 刪除T1列,保留T0列 ,並賦值給index
del stk_return['T1']
stk_return.index = stk_return['T0']
del stk_return['T0']
del stk_return['T2']
del stk_return['Replication']
# 保存文件
stk_return.to_csv('stk_return_train.csv')
return stk_return
'''
訓練和回測準備
'''
# 根據預測並提取預測池子函數
def model_pool(stock_cl, num, clf, clf1, test_timing):
'''
輸入:
stock_cl: 用以訓練的分類數據
num = 第幾次
clf: 模型
test_timing: str化的時間周期
輸出:
pool_list: 根據模型出來的股票池子
'''
# 獲取T0時間點的因子數據
data = prepare_data(test_timing['T0'][num], index)
# 獲取T1時間點的因子數據
data_test = prepare_data(test_timing['T1'][num],index)
# 如果 T0 和 T1 兩個時間點出來的因子數據,因為某標的剔除導致index不匹配
if len(data.index) != len(data_test.index):
# 取交集作為分類值,訓練還有預測值
data_adj = data.loc[data.index & data_test.index,:]
data_test_adj = data_test.loc[data.index & data_test.index,:]
stock_cl_adj = stock_cl.loc[data.index & data_test.index,:]
# 訓練
X_train = np.array(data_adj)
Y_train = np.array(stock_cl_adj.T)[num]
# 算法訓練
clf.fit(X_train, Y_train)
# 預測
X_predict = np.array(data_test_adj)
# 根據T0到T1的訓練模型,基於T1的因子數據,做T1-T2的分類結果預測
Y_predict = clf.predict(X_predict)
# 如果用第一種算法預測結果都為0,用第二種算法fit一次
if all (Y_predict == 0):
# 算法訓練第二次
clf1.fit(X_train, Y_train)
# 預測第二遍
X_predict = np.array(data_test)
Y_predict = clf1.predict(X_predict)
# 如果匹配則,按照原計劃進行
else:
X_train = np.array(data)
stock_cl_adj = stock_cl.loc[data.index,:]
Y_train = np.array(stock_cl_adj.T)[num]
# 算法訓練
clf.fit(X_train, Y_train)
# 預測
data_test_adj = data_test.copy()
X_predict = np.array(data_test_adj)
# 根據T0到T1的訓練模型,基於T1的因子數據,做T1-T2的分類結果預測
Y_predict = clf.predict(X_predict)
# 如果用第一種算法預測結果都為0,用第二種算法fit一次
if all (Y_predict == 0):
# 算法訓練第二次
clf1.fit(X_train, Y_train)
# 預測第二遍
X_predict = np.array(data_test_adj)
Y_predict = clf1.predict(X_predict)
# 構建在T1預測T2的分類表格
s = {'predict_clf': list(Y_predict)}
s = pd.DataFrame(s)
s.index = data_test_adj.index
# 獲取預測池子的平均收益率
model_predict = []
pool_list = []
# 提取預測的分類值為1的標的
for a in range(len(s)):
if s.iloc[a,0] == 1:
# 捕捉分類為1的index,添加到信的list中
pool_list.append(str(s.index[a]))
return pool_list
# 根據模型篩選的股票池子出來的平均收益率
def get_model_return(pool_list, test_timing,num):
'''
輸入:
pool_list: 篩選出來的股票池子
test_timing: 周期函數
輸出:
model_mean_return: 預測出來標的的平均收益率
'''
return_test_list = []
for j in pool_list:
j_str = str(j)
datelist_T1 = test_timing['T1']
datelist_T2 = test_timing['T2']
p1 = [float(get_price(j, datelist_T1[num],datelist_T1[num])['close'])]
p2 = [float(get_price(j, datelist_T2[num],datelist_T2[num])['close'])]
a = [(m-n)/n for m,n in zip (p2,p1)]
# 捕捉分類為1的index,添加到信的list中
return_test_list.append(a[0])
stk_test = {'pool_list':pool_list, 'return_test_list':return_test_list}
stk_test = pd.DataFrame(stk_test)
stk_test.index = stk_test['pool_list']
del stk_test['pool_list']
# 收益率獲取
model_mean_return = stk_test.mean()
return model_mean_return
# 將預測出來的股票池子轉化為dataframe格式
def predict_pool(pool_list):
# 字典化為dataframe
predict_pool ={'predict_pool':pool_list, 'predict_score':1}
predict_pool = pd.DataFrame(predict_pool)
predict_pool.index = predict_pool['predict_pool']
del predict_pool['predict_pool']
return predict_pool
# 實際分類池子函數
def actural_pool(act_cl, test_timing):
'''
輸入:
act_cl: 分類池子
test_timing: str化的時間周期
輸出:
格式為dataframe
實際分類池子
'''
# 提取真實的分類數據
act_c = act_cl.copy()
del act_c[(act_c.columns)[0]]
act_c.columns = test_timing['T2']
actural_cl = act_c[act_c.columns[num]]
# 字典化為dataframe
actural_pool = {'act_pool':actural_cl.index, 'act_score':(actural_cl)}
actural_pool = pd.DataFrame(actural_pool)
del actural_pool['act_pool']
return actural_pool
# 大模型測試函數
# 自動化生成對比收益率函數
def test_return(start_date, end_date, period, period_pre, index, clf, clf1):
'''
輸入:
index: 指數
clf: 模型
輸出:
格式為dataframe
index: T2時間節點
index_return: 大盤在T1-T2的收益
model_return: 模型在T1-T2的收益
pool_number: 模型在T1-T2預測池子數量
'''
# 獲取時間周期
p = time_handle(start_date, end_date, period, period_pre,index)
test_timing = time_fix(p)
# 輔助
data = prepare_data(start_date,index)
# 獲取 T0 到 T1 時間段,指數池子里面所有股票的周期收益率
stk_return = get_stock_return_train(data,p)
# 獲取 T0 到 T1 時間段,指數周期收益率
index_return = get_index_return_train(index,p)
# 獲取用來訓練的分類數據
stock_cl = train_classification(stk_return, index_return)
# 回測分類,實際的分類情況獲取
act_cl = actural_classification(p, stock_cl)
# 剔除最後一列(因為p的最後一行的T2是nan值)
del stock_cl[stock_cl.columns[-1]]
# 獲取T1 到 T2 時間段, 指數周期的收益率
index_return_test = get_index_return_test(index,p)
# 模型收益率提取
predict_return = []
# 每一次預測池子標的數量提取
pool_num = []
for num in range(len(test_timing['T0'])):
print('正在進行第'+str(num+1)+'次訓練、預測')
# 獲取預測的pool
pool = model_pool(stock_cl,num,clf, clf1, test_timing)
# 根據預測的pool獲取周期均收益率
model_mean_return = get_model_return(pool, test_timing, num)
# 把均收益率放到predict_return這個list中
predict_return.append(model_mean_return)
# 記錄預測池子數量
pool_num.append(len(pool))
# 對比預測池子周期均收益率和大盤周期均收益率
compare = index_return_test.copy()
cp_return = pd.DataFrame(predict_return)
cp_return.index = compare.index
cp_return['index_return'] = compare['index_return']
cp_return['model_return'] = cp_return['return_test_list']
del cp_return['return_test_list']
cp_return['premium'] = cp_return['model_return'] - cp_return['index_return']
# 記錄預測池子數量
cp_return['pool_number'] = pool_num
print('所有周期數據訓練、預測已經完成')
return cp_return
# 結果可視化
def visualization(df):
'''
輸入:
df: 自動化生成對比收益率表格
輸出:
可視化結果
'''
del df['pool_number']
df.columns = ['指數大盤收益率','模型收益率','模型超額收益率']
df.plot(figsize=(15,8),title= "模型收益率(折線圖)")
df.plot.bar(figsize=(15,8),title= "模型收益率(柱狀圖)")
# 跑贏or輸結果
# 記錄對比大盤結果的次數
count1 = []
for j in range(len(df)):
if df.iloc[j,2] < 0:
count1.append(j)
count2 = []
for j in range(len(df)):
if df.iloc[j,2] > 0:
count2.append(j)
return df.plot(figsize=(15,8),title= "模型收益率(折線圖)"), df.plot.bar(figsize=(15,8),title= "模型收益率(柱狀圖)"), '總回測次數:', len(df), '跑輸的次數:', len(count1), '跑贏的次數:', len(count2)
# 【因子測試函數】測試框架 v4
# 制作人:Howard董先森, wechat:dhl19988882,e-mail: 16hldong.stu.edu.cn
# copy並分享的時候記得給小弟一下credit就好~
'''
導入庫
'''
from sklearn import linear_model
regr = linear_model.LinearRegression()
'''
篩選輔助工具函數以及數據準備函數
'''
# 提取超額收益率函數
def premium_dataframe(index_return, stock_return):
# 構建待填補的dataframe,index和inde_return的index相同
premium_df = pd.DataFrame(index = index_return.index)
# 遍曆stk_return的所有標的
for i in stk_return[stk_return.columns]:
# 列運算,把結果賦值給premium_df
premium_df[i] = stk_return[i] - index_return['index_return']
return premium_df
# 已知標的池子,獲取T0-T1周期收益率
def get_group_return(group, test_timing):
'''
輸入:
group: 標的池子的list
test_timing: 周期管理函數出來的時間集合, dataframe
輸出:
格式為dataframe
columns:
T0: T0時間點的集合
code1: 標的1在T0到T1的收益率
code2: 標的2在T0到T1的收益率
以此類推
'''
# 獲取時間表格,剔除T1、T2和Replication列
stk_group_return = test_timing.copy()
# 收益率計算
for j in group:
j_str = str(j)
p0 = [float(get_price(j,i,i)['close'])for i in test_timing.iloc[:,1]]
p1 = [float(get_price(j,i,i)['close'])for i in test_timing.iloc[:,2]]
a = [(m-n)/n for m,n in zip (p1,p0)]
stk_group_return[str(j)] = a
# 表格整理
del stk_group_return['T0']
del stk_group_return['T1']
del stk_group_return['T2']
return stk_group_return
# 已知標的池子,獲取T0-T1周期收益率
def get_single_period_group_return(group, test_timing):
'''
輸入:
group: 標的池子的list
test_timing: 周期管理函數出來的時間集合, dataframe
輸出:
格式為dataframe
columns: 單個標的的T0-T1周期收益率
index: 各種標的代碼
'''
# 創建字典,代填入單個標的單個周期的收益率
stk_group_return = {}
# 創建list,代填入單個標的單個周期的收益率
stk_return_list = []
# 收益率計算
for j in group:
time_t0 = test_timing.iloc[num,1]
time_t1 = test_timing.iloc[num,0]
j_str = str(j)
p0 = float(get_price(j,time_t0,time_t0)['close']) # 獲得T0時間點的價格
p1 = float(get_price(j,time_t1,time_t1)['close']) # 獲得T1時間點的價格
ret = (p1-p0)/p0 # 獲得單個標的在T0-T1時間點的收益率
stk_return_list.append(ret)
# 構建表格
stk_group_return = pd.DataFrame(stk_return_list,index = group)
stk_group_return[str(test_timing.iloc[num,0])] = stk_group_return[stk_group_return.columns[0]]
del stk_group_return[stk_group_return.columns[0]]
return stk_group_return
# 提取因子大表格數據函數
def get_big_data(test_timing):
'''
輸入:
test_timing: dataframe格式, 時間周期管理函數
輸出:
big_data_t0: 字典,存放T0所有時間點的因子數據
big_data_t1: 字典,存放T1所有時間點的因子數據
'''
# 提取因子大表格數據,放在字典里面
big_data_t0 = {}
big_data_t1 = {}
for i in range(len(test_timing['T0'])):
print('提取第'+str(i+1)+'個周期的T0因子數據')
t0 = test_timing['T0'][i]
t1 = test_timing['T1'][i]
big_data_t0[t0] = prepare_data(t0, index)
print('提取第'+str(i+1)+'個周期的T1因子數據')
big_data_t1[t1] = prepare_data(t1, index)
i = i+1
print('提取成功!周期數為:'+ str(i))
return big_data_t0,big_data_t1
'''
Normal IC框架
'''
# Normal IC 情況下,因子收益率(回歸系數)、因子收益率的T值、IC值 (原始數據表格獲取)
def get_normal_ic_original_factor_testing_data(stk_return, test_timing, big_data):
'''
輸入:
stk_return: dataframe格式, 所有標的在所有周期的收益率
test_timing: dataframe格式, 時間周期管理函數
big_data: 兩個dict:
big_data_t0: 字典,存放T0所有時間點的因子數據
big_data_t1: 字典,存放T1所有時間點的因子數據
輸出:
normal_ic_df: dataframe格式, Normal_ic表格
t_df: dataframe格式,因子數值(Barra因子暴露)對標的周期收益率RLM回歸後的T值
param_df: dataframe格式, 因子收益率(回歸系數)表格
'''
# 因子收益率(回歸系數)、因子收益率序列的T值、IC值 三個表格獲取
# 周期因子IC提取
# 收益率數據準備
stk_return1 = stk_return.T
# 構建字典,待存放因子IC值
factor_for_ic = {}
# 構建字典,待存放因子t_value
factor_for_t = {}
# 構建字典,待存放因子parameter
factor_for_parameter = {}
# 遍曆所有的T0時間點
for num in range(len(test_timing['T0'])):
print('正在進行第'+str(num+1)+'周期因子測試原始數據獲取……')
# 獲取單周期的因子值
big_data_t0 = big_data[0]
big_Data_t1 = big_data[1]
t0 = test_timing['T0'][num]
t1 = test_timing['T1'][num]
data = big_data_t0[t0]
data1 = big_data_t1[t1]
# 調整stk_return的index
stk_return2 = stk_return1.loc[data.index,:]
del stk_return2[stk_return2.columns[-1]]
stk_return2.columns = test_timing['T0']
# 對於T0有因子值,T1沒有的情況,提取T0和T1兩個index的交集
data_adj = data.loc[data.index & data1.index,:]
stk_return2_adj = stk_return2.loc[data_adj.index & stk_return2.index,:]
# list1,待存放ic
list1 = []
# list2, 待存放t—value
list2 = []
# list3, 待存放parameter
list3 = []
# 求相關系數和T值,對於第i個因子
for i in range(len(data_adj.columns)):
IC_X = list(data_adj[data_adj.columns[i]])
IC_Y = list((stk_return2_adj[stk_return2_adj.columns[num]]))
# 將單個周期所有因子ic值放到cor中
cor = (list(corrcoef(IC_X, IC_Y)[0])[-1])
# 回歸
rlm_model = sm.RLM(IC_Y , IC_X , M=sm.robust.norms.HuberT()).fit()
# 存放相關系數cor
list1.append(cor)
# 存放t_value
list2.append(float(rlm_model.tvalues))
# 存放parameter
list3.append(float(rlm_model.params))
# 相關系數,放到字典里
factor_for_ic[num] = list1
# t_value, 放到字典里
factor_for_t[num] = list2
# parameter, 放到字典里
factor_for_parameter[num] =list3
# 整合
# Normal ICdf整合
normal_ic_df = pd.DataFrame(factor_for_ic)
normal_ic_df.index = data.columns
normal_ic_df.index.name = 'factor'
normal_ic_df.columns = test_timing['T0']
# t值整合
t_df = pd.DataFrame(factor_for_t)
t_df.index = data.columns
t_df.index.name ='factor'
t_df.columns = test_timing['T0']
# Barra體系下的因子收益率(回歸系數)整合
param_df = pd.DataFrame(factor_for_parameter)
param_df.index = data.columns
param_df.index.name = 'factor'
param_df.columns = test_timing['T0']
print('所有周期因子測試在Normal_IC框架下原始數據獲取完畢!')
return normal_ic_df, t_df, param_df
# 根據 Normal_IC原始數據,提取各種打分指標
def get_pre_scoring_data_for_normal_ic(normal_ic_og):
'''
輸入:
normal_ic_og: dataframe,原始數據大表格
輸出:
factor_mean_return: dataframe, 因子年化收益率均值
ic_indicator_df: dataframe, normal_ic各項指標
'''
# 初始數據準備
t_df = normal_ic_og[1]
normal_ic_df = normal_ic_og[0]
param_df = normal_ic_og[2]
# 提取因子收益率年化均值
factor_mean_list =[]
factor_return = param_df.T
# 遍曆每個因子
for i in factor_return.columns:
# 計算單個因子所有周期的均值
mean_return = (factor_return[i].mean())
# 轉為年化收益率
annualized_mean_return = (1+mean_return)**12 - 1
# 計算單個因子所有周期年化均值,放到list中去
factor_mean_list.append((annualized_mean_return))
# 轉為dataframe格式
factor_mean_return = pd.DataFrame(factor_return.columns)
factor_mean_return['abs_annualized_factor_mean_return'] = factor_mean_list
factor_mean_return.index = factor_mean_return[factor_mean_return.columns[0]]
del factor_mean_return[factor_mean_return.columns[0]]
# IC各種指標提取
ic = normal_ic_df.T
# 創建字典,待提取每一個因子的所有ic值序列
features_ic = {}
# 創建字典,待填入每一個因子的IC各種指標
ic_indicator ={}
# 創建dataframe
indicator = pd.DataFrame(columns = ic.columns)
# 提取每個factor的ic值,轉為列表
for i in ic.columns:
# 提取每一個因子的所有ic值序列,放到字典中
features_ic[i] = list(ic[i])
# 提取空值,放到字典ic_indicator中
ic_indicator[i] = list(indicator[i])
# 提取每一個因子的所有indicator
for i in ic.columns:
# 計算IC值的均值
ic_indicator[i].append(mean(features_ic[i]))
# 計算IC值的標準差
ic_indicator[i].append(std(features_ic[i]))
# IC大於0的比例
ic_indicator[i].append(sum(pd.Series(features_ic[i])>0)/len(features_ic[i]))
# IC絕對值大於0.02的比例
abs_list = []
for d in ic_indicator[i]:
abs_list.append(abs(d))
ic_indicator[i].append(sum(pd.Series(abs_list)>0.02)/len(features_ic[i]))
# IC絕對值
ic_indicator[i].append(mean(abs_list))
# IR值
ic_indicator[i].append(mean(features_ic[i])/std(features_ic[i]))
# IR絕對值
ic_indicator[i].append(abs(mean(features_ic[i])/std(features_ic[i])))
# 整合成表格
ic_indicator_df = pd.DataFrame(ic_indicator, index = ['Normal_IC_mean','Normal_IC_std','Normal_IC>0_%','Normal_IC>0.02_%','abs_Normal_IC','Normal_IR','abs_IR']).T
# T值各種指標提取
t = t_df.T
# 創建字典,待提取每一個因子的所有t值序列
features_t = {}
# 創建字典,待填入每一個因子的t各種指標
t_indicator ={}
# 創建dataframe
indicator = pd.DataFrame(columns = t.columns)
# 提取每個factor的ic值,轉為列表
for i in t.columns:
# 提取每一個因子的所有ic值序列,放到字典中
features_t[i] = list(t[i])
# 提取空值,放到字典ic_indicator中
t_indicator[i] = list(indicator[i])
# 提取每一個因子的所有indicator
for i in t.columns:
# T值均值
t_indicator[i].append(mean(features_t[i]))
# 計算T絕對值的均值
abs_list = []
for d in features_t[i]:
abs_list.append(abs(d))
t_indicator[i].append(mean(abs_list))
# T絕對值大於2的比例
t_indicator[i].append(sum(pd.Series(abs_list)> 2 )/len(features_t[i]))
# 整合成表格
t_indicator_df = pd.DataFrame(t_indicator, index = ['T_mean','abs(t_value)_mean','abs(t_value)_>_2_%']).T
return factor_mean_return, t_indicator_df, ic_indicator_df
# 參照光大的因子初步篩選打分表格
def get_scoring_data_for_normal_ic(pre_scoring_data):
'''
輸入:
pre_scoring_data: dataframe,原始數據大表格處理過後的三個各種打分指標表格
輸出:
factor_test: dataframe, 因子初步篩選打分表格
'''
# 三個表格整合,參考光大打分表格
factor_indicator = pd.concat([pre_scoring_data[0].T,pre_scoring_data[1].T, pre_scoring_data[2].T],axis =0)
# 參照光大打分框架
factor_test = pd.DataFrame(index = factor_indicator.columns)
factor_test['Factor_Mean_Return'] = factor_indicator.T['abs_annualized_factor_mean_return']
factor_test['Factor_Return_tstat'] = factor_indicator.T['T_mean']
factor_test['IC_mean'] = factor_indicator.T['Normal_IC_mean']
factor_test['IR'] = factor_indicator.T['Normal_IR']
# 保存和讀取
factor_test.to_csv('normal_ic_test.csv')
factor_test = pd.read_csv('normal_ic_test.csv')
factor_test.index = factor_test[factor_test.columns[0]]
del factor_test[factor_test.columns[0]]
return factor_test
# 單調性檢測以及分組回溯函數提取
def get_monotony_and_grouping_return_for_normal_ic(big_data):
'''
輸入:
big_data: 兩個dict:
big_data_t0: 字典,存放T0所有時間點的因子數據
big_data_t1: 字典,存放T1所有時間點的因子數據
輸出:
monotony_outcome: dict, key是每個周期的T0時間點
values是指單個周期的所有因子的單調性指標,格式為list
group_return_dict: dict, key是每個周期的T0時間點
values是指單個周期所有因子,每個因子按照數值大小排列後,提取對標的的當期收益率進行分組提取,格式為dataframe
'''
# 輔助
data = big_data_t0[test_timing['T0'][0]]
print('總的單調性數據提取周期為:'+str(len(test_timing.index)))
print('總的因子數量為:'+str(len(data.columns)))
# 創建字典,填入所有因子的單調性和分層回溯周期收益率
monotony_outcome ={}
group_return_dict = {}
# 分組數量
group_num = 5
# 遍曆所有T0
for i in range(len(test_timing['T0'])):
print('--------第'+str(i+1)+'個周期多空提取中......')
print('-----第'+str(i+1)+'個周期因子數據表格提取中......')
# 獲取單周期的因子值
data = big_data_t0[test_timing['T0'][i]]
data1 = big_data_t1[test_timing['T1'][i]]
# 對於T0有因子值,T1沒有的情況,提取T0和T1兩個index的交集
data_adj = data.loc[data.index & data1.index,:]
# 創建字典,待填入單個周期,單個因子的單調性指標
monotony = {}
# 分層回溯
group_return = {'group1':[],'group2':[],'group3':[],'group4':[],'group5':[]}
# 遍曆所有因子
for num in range(len(data_adj.columns)):
factor_list = list(data_adj.columns)
factor = factor_list[num]
print('---提取'+str(factor)+'因子')
# 提取單個因子
factor_test = data_adj[data_adj.columns[num]]
# 排序分層提取(降序)
sort_df = factor_test.sort_values(ascending = False)
# 每組分的標的數量
code_num = int(len(factor_test)/group_num)
# 創建字典pool_dict,用來放置分好組的標的池子
pool_dict = {}
# 提取降序後的所有index
sort_index = list(sort_df.index)
# 分組開始,取得dict
a = 0
for q in range((group_num)):
# 每組命名
j = 'group'+str((q+1))
# 提取
pool_dict[j] = sort_index[a:code_num]
a= code_num
code_num = code_num + int(len(data_adj)/group_num)
# 提取標的池子名字
group_name = list(pool_dict.keys())
# 創建字典mean_return_dict,用來放置分好組的標的池子的周期算術平均收益率
mean_return_dict = {}
# 根據每組的index提取所有周期的累計收益率
for group in group_name:
stk_group_return = get_group_return(pool_dict[group], test_timing)
# 提取一個周期的,選出標的池子的算術平均,放到list中
mean_return_dict[group] = (list(stk_group_return.mean(axis =1))[i])
# 轉為dataframe格式
mean_return_df = pd.DataFrame(mean_return_dict, index =[i])
# 收益率提取
R1_m = mean_return_df[mean_return_df.columns[0]]
R2_m = mean_return_df[mean_return_df.columns[1]]
R3_m = mean_return_df[mean_return_df.columns[2]]
R4_m = mean_return_df[mean_return_df.columns[3]]
R5_m = mean_return_df[mean_return_df.columns[4]]
# 將回溯周期收益率放到字典group_return中
group_return['group1'].extend(R1_m)
group_return['group2'].extend(R2_m)
group_return['group3'].extend(R3_m)
group_return['group4'].extend(R4_m)
group_return['group5'].extend(R5_m)
print("--已經獲取"+factor+"因子第"+str(i+1)+'周期的分組回溯周期收益率')
# 年化收益率提取
R1 = (1+R1_m)**12 - 1
R2 = (1+R2_m)**12 - 1
R3 = (1+R3_m)**12 - 1
R4 = (1+R4_m)**12 - 1
R5 = (1+R5_m)**12 - 1
df = pd.DataFrame(index = mean_return_df.index)
df['R1-R5'] = R1 - R5
df['R2-R4'] = R2 - R4
df['Monotony'] = df['R1-R5']/df['R2-R4']
# 將單調性指標放到字典monotony字典中
monotony[factor] = float(df['Monotony'])
print("-已經獲取"+factor+"因子單調性指標")
# 轉為 dataframe
monotony_df = pd.DataFrame(monotony,index=[0])
group_return_df = pd.DataFrame(group_return,index=data.columns)
# 將單個周期的所有因子單調性指標轉為 list,並放在字典monotony_outcome中
monotony_outcome[test_timing['T0'][i]] = list((monotony_df.T)[monotony_df.T.columns[0]])
# 將單周期的所有因子分組回溯周期收益率,放到字典group_return_dict中
group_return_dict[test_timing['T0'][i]] = group_return_df
print("所有因子所有周期的單調性指標已經獲取完畢!")
return monotony_outcome, group_return_dict
'''
Rank IC框架
'''
# Rank框架下,因子收益率(回歸系數)、因子收益率序列的T值、IC值 三個表格獲取
def get_rank_ic_original_factor_testing_data(stk_return, test_timing, big_data):
'''
輸入:
stk_return: dataframe格式, 所有標的在所有周期的收益率
test_timing: dataframe格式, 時間周期管理函數
big_data: 兩個dict:
big_data_t0: 字典,存放T0所有時間點的因子數據
big_data_t1: 字典,存放T1所有時間點的因子數據
輸出:
rank_ic_df: dataframe格式, Normal_ic表格
rank_t_df: dataframe格式,因子數值(Barra因子暴露)對標的周期收益率RLM回歸後的T值
rank_param_df: dataframe格式, 因子收益率(回歸系數)表格
'''
# 收益率數據準備
stk_return1 = stk_return.T
# 構建字典,待存放因子Rank_IC值
factor_for_ic = {}
# 構建字典,待存放因子t_value
factor_for_t = {}
# 構建字典,待存放因子parameter
factor_for_parameter = {}
# 遍曆所有的T0時間點
for num in range(len(test_timing['T0'])):
print('正在進行第'+str(num+1)+'周期因子測試原始數據獲取……')
# 獲取單周期的因子值
data = big_data_t0[test_timing['T0'][num]]
data1 = big_data_t1[test_timing['T1'][num]]
# 對於T0有因子值,T1沒有的情況,提取T0和T1兩個index的交集
data_adj = data.loc[data.index & data1.index,:]
# list1,待存放ic
list1 = []
# list2, 待存放t—value
list2 = []
# list3, 待存放parameter
list3 = []
# 求相關系數和T值,對於第i個因子
for i in range(len(data_adj.columns)):
# 提取單個因子
factor_test = pd.DataFrame(data_adj[data_adj.columns[i]])
# 提取 X
factor_test['X_Rank'] = factor_test.rank(ascending = True, method = 'dense')
X = list(factor_test['X_Rank'])
# 提取 Y
# 提取收益率
stk_group_return = get_single_period_group_return(list(factor_test.index), test_timing)
Y = list(stk_group_return[test_timing['T0'][num]])
# 將單個周期所有因子ic值放到cor中
cor = (list(corrcoef(X, Y)[0])[-1])
# 回歸
rlm_model = sm.RLM(Y , X , M=sm.robust.norms.HuberT()).fit()
# 存放相關系數cor
list1.append(cor)
# 存放t_value
list2.append(float(rlm_model.tvalues))
# 存放parameter
list3.append(float(rlm_model.params))
# 相關系數,放到字典里
factor_for_ic[num] = list1
# t_value, 放到字典里
factor_for_t[num] = list2
# parameter, 放到字典里
factor_for_parameter[num] =list3
# 整合
# Normal ICdf整合
rank_ic_df = pd.DataFrame(factor_for_ic)
rank_ic_df.index = data.columns
rank_ic_df.index.name = 'factor'
rank_ic_df.columns = test_timing['T0']
# t值整合
rank_t_df = pd.DataFrame(factor_for_t)
rank_t_df.index = data.columns
rank_t_df.index.name ='factor'
rank_t_df.columns = test_timing['T0']
# Barra體系下的因子收益率(回歸系數)整合
rank_param_df = pd.DataFrame(factor_for_parameter)
rank_param_df.index = data.columns
rank_param_df.index.name = 'factor'
rank_param_df.columns = test_timing['T0']
print('所有周期Rank_IC框架下,因子測試原始數據獲取完畢!')
return rank_ic_df, rank_t_df, rank_param_df
# 根據 Rank_IC原始數據,提取各種打分指標
def get_pre_scoring_data_for_rank_ic(rank_ic_og):
'''
輸入:
rank_ic_og: dataframe,原始數據大表格
輸出:
factor_mean_return: dataframe, 因子年化收益率均值
ic_indicator_df: dataframe, normal_ic各項指標
'''
# 初始數據準備
normal_ic_df = rank_ic_og[0]
t_df = rank_ic_og[1]
param_df = rank_ic_og[2]
# 提取因子收益率年化均值
factor_mean_list =[]
factor_return = param_df.T
# 遍曆每個因子
for i in factor_return.columns:
# 計算單個因子所有周期的均值
mean_return = (factor_return[i].mean())
# 轉為年化收益率
annualized_mean_return = (1+mean_return)**12 - 1
# 計算單個因子所有周期年化均值,放到list中去
factor_mean_list.append((annualized_mean_return))
# 轉為dataframe格式
factor_mean_return = pd.DataFrame(factor_return.columns)
factor_mean_return['abs_annualized_factor_mean_return'] = factor_mean_list
factor_mean_return.index = factor_mean_return[factor_mean_return.columns[0]]
del factor_mean_return[factor_mean_return.columns[0]]
# IC各種指標提取
ic = normal_ic_df.T
# 創建字典,待提取每一個因子的所有ic值序列
features_ic = {}
# 創建字典,待填入每一個因子的IC各種指標
ic_indicator ={}
# 創建dataframe
indicator = pd.DataFrame(columns = ic.columns)
# 提取每個factor的ic值,轉為列表
for i in ic.columns:
# 提取每一個因子的所有ic值序列,放到字典中
features_ic[i] = list(ic[i])
# 提取空值,放到字典ic_indicator中
ic_indicator[i] = list(indicator[i])
# 提取每一個因子的所有indicator
for i in ic.columns:
# 計算IC值的均值
ic_indicator[i].append(mean(features_ic[i]))
# 計算IC值的標準差
ic_indicator[i].append(std(features_ic[i]))
# IC大於0的比例
ic_indicator[i].append(sum(pd.Series(features_ic[i])>0)/len(features_ic[i]))
# IC絕對值大於0.02的比例
abs_list = []
for d in ic_indicator[i]:
abs_list.append(abs(d))
ic_indicator[i].append(sum(pd.Series(abs_list)>0.02)/len(features_ic[i]))
# IC絕對值
ic_indicator[i].append(mean(abs_list))
# IR值
ic_indicator[i].append(mean(features_ic[i])/std(features_ic[i]))
# IR絕對值
ic_indicator[i].append(abs(mean(features_ic[i])/std(features_ic[i])))
# 整合成表格
ic_indicator_df = pd.DataFrame(ic_indicator, index = ['Normal_IC_mean','Normal_IC_std','Normal_IC>0_%','Normal_IC>0.02_%','abs_Normal_IC','Normal_IR','abs_IR']).T
# T值各種指標提取
t = t_df.T
# 創建字典,待提取每一個因子的所有t值序列
features_t = {}
# 創建字典,待填入每一個因子的t各種指標
t_indicator ={}
# 創建dataframe
indicator = pd.DataFrame(columns = t.columns)
# 提取每個factor的ic值,轉為列表
for i in t.columns:
# 提取每一個因子的所有ic值序列,放到字典中
features_t[i] = list(t[i])
# 提取空值,放到字典ic_indicator中
t_indicator[i] = list(indicator[i])
# 提取每一個因子的所有indicator
for i in t.columns:
# T值均值
t_indicator[i].append(mean(features_t[i]))
# 計算T絕對值的均值
abs_list = []
for d in features_t[i]:
abs_list.append(abs(d))
t_indicator[i].append(mean(abs_list))
# T絕對值大於2的比例
t_indicator[i].append(sum(pd.Series(abs_list)> 2 )/len(features_t[i]))
# 整合成表格
t_indicator_df = pd.DataFrame(t_indicator, index = ['T_mean','abs(t_value)_mean','abs(t_value)_>_2_%']).T
return factor_mean_return, t_indicator_df, ic_indicator_df
# 參照光大的因子初步篩選打分表格
def get_scoring_data_for_rank_ic(pre_scoring_data_for_rank_ic):
'''
輸入:
pre_scoring_data: dataframe,原始數據大表格處理過後的三個各種打分指標表格
輸出:
factor_test: dataframe, 因子初步篩選打分表格
'''
# 三個表格整合,參考光大打分表格
pre_scoring_data = pre_scoring_data_for_rank_ic
factor_indicator = pd.concat([pre_scoring_data[0].T,pre_scoring_data[1].T, pre_scoring_data[2].T],axis =0)
# 參照光大打分框架
factor_test = pd.DataFrame(index = factor_indicator.columns)
factor_test['Factor_Mean_Return'] = factor_indicator.T['abs_annualized_factor_mean_return']
factor_test['Factor_Return_tstat'] = factor_indicator.T['T_mean']
factor_test['IC_mean'] = factor_indicator.T['Normal_IC_mean']
factor_test['IR'] = factor_indicator.T['Normal_IR']
# 保存和讀取
factor_test.to_csv('rank_ic_test.csv')
factor_test = pd.read_csv('rank_ic_test.csv')
factor_test.index = factor_test[factor_test.columns[0]]
del factor_test[factor_test.columns[0]]
factor_test.columns = ['Factor_Mean_Return', 'Factor_Return_tstat', 'Rank_IC', 'Rank_IR']
return factor_test
'''
打分模型
'''
# 打分模型(rank_ir)
def get_score_data_ir(scoring_rank_df):
'''
輸入:
scoring_rank_df: dataframe格式,打分前的數據
輸出:
score_result: dataframe格式,最終打分表格
'''
a = scoring_rank_df.T
a=a.T
# 構建,待填補每個因子的總得分
total_factor_score = {}
for i in range(len(a)):
# 構建list,待填補單個因子的得分
score_list = []
factor_return = a.iloc[i,0]
factor_t = a.iloc[i,1]
factor_ic = a.iloc[i,2]
factor_ir = a.iloc[i,3]
if abs(factor_return) > 0.002:
score_list.append(1)
else:
score_list.append(0)
if abs(factor_t) > 2:
score_list.append(1)
else:
score_list.append(0)
if abs(factor_ic) > 0.02:
score_list.append(1)
else:
score_list.append(0)
if abs(factor_ir) > 0.2:
score_list.append(1)
else:
score_list.append(0)
total_factor_score[str(a.index[i])] = score_list
# 轉為dataframe
score_outcome = pd.DataFrame(total_factor_score, index = a.columns)
score = score_outcome.T
score['Total_score'] = score_outcome.sum()
a['Total_score'] = score['Total_score']
score_result = a.sort_values(by='Total_score',ascending = False)
return score_result
'''
分層回溯模型
'''
# 提取每個因子的年化均收益率表格
def get_annualized_factor_return(gr):
'''
輸入:
gr: dict:
key為時間周期
values為每個周期的所有因子的組合收益率
輸出:
annual_df: dataframe, 所有因子在所有周期的年化收益率
'''
# 提取keys
key = list(gr.keys())
# 多空組合提取
for k in list(key):
# 單個多空組合因子收益率提取
gr[k]['long_short'] = gr[k]['group5']-gr[k]['group1']
# 創建取值為0的表格,待累加
sum_gr = pd.DataFrame(index = gr[key[2]].index, columns = gr[key[2]].columns)
sum_gr = sum_gr.fillna(0)
# 遍曆 key
for k in list(key):
# 累加
sum_gr = sum_gr + gr[k]
# 平均值
mean_gr = sum_gr/len(key)
# 年化
annualized_mean = {}
for i in range(len(mean_gr.index)):
factor = mean_gr.index[i]
annualized_return = []
for j in range(len(mean_gr.columns)):
# 年化
annualized_return.append((mean_gr.iloc[i,j] + 1)**12 -1)
annualized_mean[factor] = annualized_return
# 轉為dataframe
annual_df = pd.DataFrame(annualized_mean, index = mean_gr.columns).T
return annual_df
# 提取每個因子年化收益率的大表格
def get_annualized_grouping_return(gr):
'''
輸入:
gr: dict:
key為時間周期
values為每個周期的所有因子的組合收益率
輸出:
factor_dict:
key為因子名字
values為單個因子所有周期的組合收益率
'''
# 提取單個因子的多空組合累計收益率,可視化
# 創建字典group_dict, keys為因子名字,values為單個因子所有周期內的,分組收益
group_dict = {}
# 提取keys
key = list(gr.keys())
# 多空組合提取
for k in list(key):
# 單個多空組合因子收益率提取
gr[k]['long_short'] = gr[k]['group5']-gr[k]['group1']
# 重新提取factor字典
factor_dict= {}
factor_list = list(gr[list(gr.keys())[0]].index)
for factor in factor_list:
# 創建分組收益率字典,key為時間,values為分組收益
factor_return_dict = {}
# 遍曆分組收益率
for k in list(gr.keys()):
factor_return_dict[k] = list(gr[k].ix[factor])
# 轉為dataframe,作為單個因子,所有周期的分組收益率
return_df = pd.DataFrame(factor_return_dict).T
return_df.columns = gr[k].ix[factor].index
# 年化
annualized = {}
for i in range(len(return_df.index)):
factor_time = return_df.index[i]
annualized_return = []
for j in range(len(return_df.columns)):
# 年化
annualized_return.append((return_df.iloc[i,j] + 1)**12 -1)
annualized[factor_time] = annualized_return
# 轉為dataframe,作為單個因子,所有周期的分組收益率
return_df = pd.DataFrame(annualized).T
return_df.columns = gr[k].ix[factor].index
# 將單個因子所有周期的分組收益率表格,存放到字典中
factor_dict[factor] = return_df
return factor_dict
# 提取每個因子周期收益率的大表格
def get_grouping_return(gr):
'''
輸入:
gr: dict:
key為時間周期
values為每個周期的所有因子的組合收益率
輸出:
factor_dict:
key為因子名字
values為單個因子所有周期的組合收益率
'''
# 提取單個因子的多空組合累計收益率,可視化
# 創建字典group_dict, keys為因子名字,values為單個因子所有周期內的,分組收益
group_dict = {}
# 提取keys
key = list(gr.keys())
# 多空組合提取
for k in list(key):
# 單個多空組合因子收益率提取
gr[k]['long_short'] = gr[k]['group5']-gr[k]['group1']
# 重新提取factor字典
factor_dict= {}
factor_list = list(gr[list(gr.keys())[0]].index)
for factor in factor_list:
# 創建分組收益率字典,key為時間,values為分組收益
factor_return_dict = {}
# 遍曆分組收益率
for k in list(gr.keys()):
factor_return_dict[k] = list(gr[k].ix[factor])
# 轉為dataframe,作為單個因子,所有周期的分組收益率
return_df = pd.DataFrame(factor_return_dict).T
return_df.columns = gr[k].ix[factor].index
# 將單個因子所有周期的分組收益率表格,存放到字典中
factor_dict[factor] = return_df
return factor_dict
# 時間等參數準備
import time
start = time.clock()
print('--------------------基本框架--------------------')
# 參數
date = '2016-01-01'
start_date ='2016-01-01'
end_date = '2019-03-01'
period = 'M'
period_pre = 'D'
S = '399905.XSHE' # 中證500 指數
index = S
# 行業:申萬一級
industry_old_code = ['801010','801020','801030','801040','801050','801080','801110','801120','801130','801140','801150',\
'801160','801170','801180','801200','801210','801230']
industry_new_code = ['801010','801020','801030','801040','801050','801080','801110','801120','801130','801140','801150',\
'801160','801170','801180','801200','801210','801230','801710','801720','801730','801740','801750',\
'801760','801770','801780','801790','801880','801890']
# 行業獲取
industry_code = get_industry(date,industry_old_code,industry_new_code)
from sklearn.neural_network import MLPClassifier
# 算法模型
# 算法一
clf = MLPClassifier(solver='lbfgs', alpha=1e-5,
hidden_layer_sizes=(5, 2), random_state=1)
# 輔助算法二,用於預測值都為0的狀況
regressor = svm.SVC()
# SVM算法參數設計
parameters = {'kernel':['rbf'],'C':[0.01,0.03,0.1,0.3,1,3,10],\
'gamma':[1e-4,3e-4,1e-3,3e-3,0.01,0.03,0.1,0.3,1]}
# 創建網格搜索 scoring:指定多個評估指標 cv: N折交叉驗證
clf1 = GridSearchCV(regressor,parameters,cv=10)
# 各種準備
# 獲取時間周期
print('時間周期獲取中……')
p = time_handle(start_date, end_date, period, period_pre,index)
test_timing = time_fix(p)
# 輔助因子表格
print('輔助因子表格獲取中……')
data = prepare_data(date,index)
# 獲取指數周期收益率
print('指數周期收益率獲取中……')
index_return = get_index_return_train(S,p)
# 獲取中證500在所有周期內的池子,然後取不重複set的交集,變成新的池子
print('分類表格獲取中……')
stock_list = get_all_index_stock(index, p)
# 獲取該池子的stk_return
stk_return = get_index_big_pool_return(stock_list,p)
# 獲取所有周期所有指數池子在所有周期的分類值
stock_cl = train_classification(stk_return, index_return)
# 剔除最後一列(因為T2是nan值)
del stock_cl[stock_cl.columns[-1]]
# 獲取T1-T2指數收益率數據,用以測試
print('T1-T2收益率表格獲取中……')
index_return_test = get_index_return_test(index,p)
elapsed = (time.clock()- start)
print('所有周期數量為:'+str(len(test_timing)))
print('---------------基本框架準備工作完成!-------------------')
print('所用時間:', elapsed)
下面一行的代碼是提取所有周期所有因子的數據,提取時間漫長,可以直接用保存好的pickle
# 因子測試數據準備
import time
start = time.clock()
print('--------------------因子測試框架--------------------')
print('因子大表格dict提取中……')
big_data = get_big_data(test_timing)
big_data_t0 = big_data[0]
big_data_t1 = big_data[1]
print('----------------因子測試框架數據準備完成!--------------------')
elapsed = (time.clock()- start)
print('所用時間:', elapsed)
# 【保存】pickle
import pickle
dic_file = open('big_data_t0.pickle','wb')
pickle.dump(big_data[0], dic_file)
dic_file.close()
dic_file = open('big_data_t1.pickle','wb')
pickle.dump(big_data[1], dic_file)
dic_file.close()
# 【讀取】pickle
df_file=open('big_data_t0.pickle','rb')
big_data_t0 = pickle.load(df_file)
df_file=open('big_data_t1.pickle','rb')
big_data_t1 = pickle.load(df_file)
big_data = tuple([big_data_t0, big_data_t1])
2. Normal IC 框架下的各種指標¶
雖然以下代碼會提取Normal IC,但是我們只用因子收益率和對應T值作為打分項
因為數據提取需要漫長的時間,小夥伴們可以直接print下面的代碼讀取我保存好的數據
# 提取因子收益率、Normal_IC
import time
start = time.clock()
print('Normal_IC 除單調性指標提取中……')
# 因子收益率(回歸系數)、因子收益率序列的T值、IC值 三個表格獲取
normal_ic_og = get_normal_ic_original_factor_testing_data(stk_return, test_timing, big_data)
# 根據 Normal_IC原始數據,提取各種打分指標
pre_scoring_data_for_normal_ic = get_pre_scoring_data_for_normal_ic(normal_ic_og)
# 提取參照光大的打分表格(單調性指標除外)
normal_ic_prescoring = get_scoring_data_for_normal_ic(pre_scoring_data_for_normal_ic)
# 保存
normal_ic_prescoring.to_csv('normal_ic_pre.csv')
elapsed = (time.clock()- start)
print('所用時間:', elapsed)
# 【讀取】 Normal_IC打分表格以及單調性表格 ,並且將兩個表格合並
normal_ic_pre = pd.read_csv('normal_ic_pre.csv')
normal_ic_pre.index = data.columns
del normal_ic_pre['factor']
scoring_df = normal_ic_pre
scoring_df.T
3. Rank IC 框架下的因子測試¶
下面一行運行時間有點長,可以直接用我保存好的數據
# 提取除單調性之外的打分表格
# 收益率數據準備
stk_return1 = stk_return.T
# 構建字典,待存放因子Rank_IC值
factor_for_ic = {}
# 構建字典,待存放因子t_value
factor_for_t = {}
# 構建字典,待存放因子parameter
factor_for_parameter = {}
# 遍曆所有的T0時間點
for num in range(len(test_timing['T0'])):
print('正在進行第'+str(num+1)+'周期因子測試原始數據獲取……')
# 獲取單周期的因子值
data = big_data_t0[test_timing['T0'][num]]
data1 = big_data_t1[test_timing['T1'][num]]
# 對於T0有因子值,T1沒有的情況,提取T0和T1兩個index的交集
data_adj = data.loc[data.index & data1.index,:]
# list1,待存放ic
list1 = []
# list2, 待存放t—value
list2 = []
# list3, 待存放parameter
list3 = []
# 求相關系數和T值,對於第i個因子
for i in range(len(data_adj.columns)):
# 提取單個因子
factor_test = pd.DataFrame(data_adj[data_adj.columns[i]])
# 提取 X
factor_test['X_Rank'] = factor_test.rank(ascending = True, method = 'dense')
X = list(factor_test['X_Rank'])
# 提取 Y
# 提取收益率
stk_group_return = get_single_period_group_return(list(factor_test.index), test_timing)
Y = list(stk_group_return[test_timing['T0'][num]])
# 將單個周期所有因子ic值放到cor中
cor = (list(corrcoef(X, Y)[0])[-1])
# 回歸
rlm_model = sm.RLM(Y , X , M=sm.robust.norms.HuberT()).fit()
# 存放相關系數cor
list1.append(cor)
# 存放t_value
list2.append(float(rlm_model.tvalues))
# 存放parameter
list3.append(float(rlm_model.params))
# 相關系數,放到字典里
factor_for_ic[num] = list1
# t_value, 放到字典里
factor_for_t[num] = list2
# parameter, 放到字典里
factor_for_parameter[num] =list3
# 整合
# Normal ICdf整合
rank_ic_df = pd.DataFrame(factor_for_ic)
rank_ic_df.index = data.columns
rank_ic_df.index.name = 'factor'
rank_ic_df.columns = test_timing['T0']
# t值整合
rank_t_df = pd.DataFrame(factor_for_t)
rank_t_df.index = data.columns
rank_t_df.index.name ='factor'
rank_t_df.columns = test_timing['T0']
# Barra體系下的因子收益率(回歸系數)整合
rank_param_df = pd.DataFrame(factor_for_parameter)
rank_param_df.index = data.columns
rank_param_df.index.name = 'factor'
rank_param_df.columns = test_timing['T0']
print('所有周期Rank_IC框架下,因子測試原始數據獲取完畢!')
# 【讀取】分層回溯的數據,pickle文檔
df_file= open('rank_ic_og.pickle','rb')
rank_ic_og = pickle.load(df_file)
# 【打分】根據 Normal_IC原始數據,提取各種打分指標
pre_scoring_data_for_rank_ic = get_pre_scoring_data_for_rank_ic(rank_ic_og)
# 提取參照光大的打分表格(單調性指標除外)
rank_ic_prescoring = get_scoring_data_for_rank_ic(pre_scoring_data_for_rank_ic)
# 打分
scoring_rank_df = pd.DataFrame()
scoring_rank_df['Factor_Mean_Return'] = normal_ic_pre['Factor_Mean_Return']
scoring_rank_df['Factor_Return_tstat'] = normal_ic_pre['Factor_Return_tstat']
scoring_rank_df['Rank_IC'] = rank_ic_prescoring['Rank_IC']
scoring_rank_df['Rank_IR'] = rank_ic_prescoring['Rank_IR']
score_result = get_score_data_ir(scoring_rank_df)
score_result.head()
# 將總分大於3分的因子提取出來
features_list = []
for i in range(len(score_result)):
if score_result.iloc[i,4] >2:
features_list.append(score_result.index[i])
print('共有'+str(len(features_list))+'個因子分數為三分')
print(features_list)
4. 單個因子的單調性分層回測¶
# 回測框架
class parameter_analysis(object):
# 定義函數中不同的變量
def __init__(self, algorithm_id=None):
self.algorithm_id = algorithm_id # 回測id
self.params_df = pd.DataFrame() # 回測中所有調參備選值的內容,列名字為對應修改面兩名稱,對應回測中的 g.XXXX
self.results = {} # 回測結果的回報率,key 為 params_df 的行序號,value 為
self.evaluations = {} # 回測結果的各項指標,key 為 params_df 的行序號,value 為一個 dataframe
self.backtest_ids = {} # 回測結果的 id
# 新加入的基準的回測結果 id,可以默認為空 '',則使用回測中設定的基準
self.benchmark_id = ''
self.benchmark_returns = [] # 新加入的基準的回測回報率
self.returns = {} # 記錄所有回報率
self.excess_returns = {} # 記錄超額收益率
self.log_returns = {} # 記錄收益率的 log 值
self.log_excess_returns = {} # 記錄超額收益的 log 值
self.dates = [] # 回測對應的所有日期
self.excess_max_drawdown = {} # 計算超額收益的最大回撤
self.excess_annual_return = {} # 計算超額收益率的年化指標
self.evaluations_df = pd.DataFrame() # 記錄各項回測指標,除日回報率外
# 定義排隊運行多參數回測函數
def run_backtest(self, #
algorithm_id=None, # 回測策略id
running_max=10, # 回測中同時巡行最大回測數量
start_date='2016-01-01', # 回測的起始日期
end_date='2016-04-30', # 回測的結束日期
frequency='day', # 回測的運行頻率
initial_cash='1000000', # 回測的初始持倉金額
param_names=[], # 回測中調整參數涉及的變量
param_values=[] # 回測中每個變量的備選參數值
):
# 當此處回測策略的 id 沒有給出時,調用類輸入的策略 id
if algorithm_id == None: algorithm_id=self.algorithm_id
# 生成所有參數組合並加載到 df 中
# 包含了不同參數具體備選值的排列組合中一組參數的 tuple 的 list
param_combinations = list(itertools.product(*param_values))
# 生成一個 dataframe, 對應的列為每個調參的變量,每個值為調參對應的備選值
to_run_df = pd.DataFrame(param_combinations)
# 修改列名稱為調參變量的名字
to_run_df.columns = param_names
to_run_df['backtestID']=''
to_run_df['state']='waiting'
to_run_df['times']=0
# 設定運行起始時間和保存格式
start = time.time()
# 記錄結束的運行回測
finished_backtests = {}
# 記錄運行中的回測
running_backtests = {}
failed_backtests={}
running_count=0
# 總運行回測數目,等於排列組合中的元素個數
total_backtest_num = len(param_combinations)
# 記錄回測結果的回報率
all_results = {}
# 記錄回測結果的各項指標
all_evaluations = {}
# 在運行開始時顯示
print('【已完成|運行中|待運行||失敗】:' )
# 當運行回測開始後,如果沒有全部運行完全的話:
while len(to_run_df[(to_run_df.state=='waiting') | (to_run_df.state=='running')].index)>0:
# 顯示運行、完成和待運行的回測個數
print('[%s|%s|%s||%s].' % (len(finished_backtests),
len(running_backtests),
(total_backtest_num-len(finished_backtests)-len(running_backtests)- len(failed_backtests)),
len(failed_backtests)
)),
# 把可用的空位進行跑回測
for index in (to_run_df[to_run_df.state=='waiting'].index):
# 備選的參數排列組合的 df 中第 i 行變成 dict,每個 key 為列名字,value 為 df 中對應的值
if running_count>=running_max:
continue
params = to_run_df.ix[index,param_names].to_dict()
# 記錄策略回測結果的 id,調整參數 extras 使用 params 的內容
backtest = create_backtest(algorithm_id = algorithm_id,
start_date = start_date,
end_date = end_date,
frequency = frequency,
initial_cash = initial_cash,
extras = params,
# 再回測中把改參數的結果起一個名字,包含了所有涉及的變量參數值
name =str( params).replace('{','').replace('}','').replace('\'','')
)
# 記錄運行中 i 回測的回測 id
to_run_df.at[index,'backtestID'] = backtest
to_run_df.at[index,'state']='running'
to_run_df.at[index,'times']=to_run_df.at[index,'times']+1
running_count=running_count+1
# 獲取回測結果
failed = []
finished = []
# 對於運行中的回測,key 為 to_run_df 中所有排列組合中的序數
for index in to_run_df[to_run_df.state=='running'].index:
# 研究調用回測的結果,running_backtests[key] 為運行中保存的結果 id
bt = get_backtest(to_run_df.at[index,'backtestID'])
# 獲得運行回測結果的狀態,成功和失敗都需要運行結束後返回,如果沒有返回則運行沒有結束
status = bt.get_status()
# 當運行回測失敗
if status in [ 'failed','canceled','deleted']:
# 失敗 list 中記錄對應的回測結果 id
failed.append(index)
# 當運行回測成功時
elif status == 'done':
# 成功 list 記錄對應的回測結果 id,finish 僅記錄運行成功的
finished.append(index)
# 回測回報率記錄對應回測的回報率 dict, key to_run_df 中所有排列組合中的序數, value 為回報率的 dict
# 每個 value 一個 list 每個對象為一個包含時間、日回報率和基準回報率的 dict
all_results[index] = bt.get_results()
# 回測回報率記錄對應回測結果指標 dict, key to_run_df 中所有排列組合中的序數, value 為回測結果指標的 dataframe
all_evaluations[index] = bt.get_risk()
# 記錄運行中回測結果 id 的 list 中刪除失敗的運行
for index in failed:
if to_run_df.at[index,'times']<3:
to_run_df.at[index,'state']='waiting'
else:
to_run_df.at[index,'state']='failed'
# 在結束回測結果 dict 中記錄運行成功的回測結果 id,同時在運行中的記錄中刪除該回測
for index in finished:
to_run_df.at[index,'state']='done'
running_count=len(to_run_df[to_run_df.state=='running'].index)
running_backtests=to_run_df[to_run_df.state=='running']['backtestID'].to_dict()
finished_backtests=to_run_df[to_run_df.state=='done']['backtestID'].to_dict()
failed_backtests=to_run_df[to_run_df.state=='failed']['backtestID'].to_dict()
# 當一組同時運行的回測結束時報告時間
if len(finished_backtests) != 0 and len(finished_backtests) % running_max == 0 :
# 記錄當時時間
middle = time.time()
# 計算剩餘時間,假設沒工作量時間相等的話
remain_time = (middle - start) * (total_backtest_num - len(finished_backtests)) / len(finished_backtests)
# print 當前運行時間
print('[已用%s時,尚餘%s時,請不要關閉瀏覽器].' % (str(round((middle - start) / 60.0 / 60.0,3)),
str(round(remain_time / 60.0 / 60.0,3)))),
# 5秒鍾後再跑一下
time.sleep(5)
# 記錄結束時間
end = time.time()
print('')
print('【回測完成】總用時:%s秒(即%s小時)。' % (str(int(end-start)),
str(round((end-start)/60.0/60.0,2)))),
# 對應修改類內部對應
self.params_df = to_run_df.ix[:,param_names]
self.results = all_results
self.evaluations = all_evaluations
self.backtest_ids = finished_backtests
#7 最大回撤計算方法
def find_max_drawdown(self, returns):
# 定義最大回撤的變量
result = 0
# 記錄最高的回報率點
historical_return = 0
# 遍曆所有日期
for i in range(len(returns)):
# 最高回報率記錄
historical_return = max(historical_return, returns[i])
# 最大回撤記錄
drawdown = 1-(returns[i] + 1) / (historical_return + 1)
# 記錄最大回撤
result = max(drawdown, result)
# 返回最大回撤值
return result
# log 收益、新基準下超額收益和相對與新基準的最大回撤
def organize_backtest_results(self, benchmark_id=None):
# 若新基準的回測結果 id 沒給出
if benchmark_id==None:
# 使用默認的基準回報率,默認的基準在回測策略中設定
self.benchmark_returns = [x['benchmark_returns'] for x in self.results[0]]
# 當新基準指標給出後
else:
# 基準使用新加入的基準回測結果
self.benchmark_returns = [x['returns'] for x in get_backtest(benchmark_id).get_results()]
# 回測日期為結果中記錄的第一項對應的日期
self.dates = [x['time'] for x in self.results[0]]
# 對應每個回測在所有備選回測中的順序 (key),生成新數據
# 由 {key:{u'benchmark_returns': 0.022480100091729405,
# u'returns': 0.03184566700000002,
# u'time': u'2006-02-14'}} 格式轉化為:
# {key: []} 格式,其中 list 為對應 date 的一個回報率 list
for key in self.results.keys():
self.returns[key] = [x['returns'] for x in self.results[key]]
# 生成對於基準(或新基準)的超額收益率
for key in self.results.keys():
self.excess_returns[key] = [(x+1)/(y+1)-1 for (x,y) in zip(self.returns[key], self.benchmark_returns)]
# 生成 log 形式的收益率
for key in self.results.keys():
self.log_returns[key] = [log(x+1) for x in self.returns[key]]
# 生成超額收益率的 log 形式
for key in self.results.keys():
self.log_excess_returns[key] = [log(x+1) for x in self.excess_returns[key]]
# 生成超額收益率的最大回撤
for key in self.results.keys():
self.excess_max_drawdown[key] = self.find_max_drawdown(self.excess_returns[key])
# 生成年化超額收益率
for key in self.results.keys():
self.excess_annual_return[key] = (self.excess_returns[key][-1]+1)**(252./float(len(self.dates)))-1
# 把調參數據中的參數組合 df 與對應結果的 df 進行合並
self.evaluations_df = pd.concat([self.params_df, pd.DataFrame(self.evaluations).T], axis=1)
# self.evaluations_df =
# 獲取最總分析數據,調用排隊回測函數和數據整理的函數
def get_backtest_data(self,
algorithm_id=None, # 回測策略id
benchmark_id=None, # 新基準回測結果id
file_name='results1.pkl', # 保存結果的 pickle 文件名字
running_max=10, # 最大同時運行回測數量
start_date='2006-01-01', # 回測開始時間
end_date='2016-11-30', # 回測結束日期
frequency='day', # 回測的運行頻率
initial_cash='1000000', # 回測初始持倉資金
param_names=[], # 回測需要測試的變量
param_values=[] # 對應每個變量的備選參數
):
# 調運排隊回測函數,傳遞對應參數
self.run_backtest(algorithm_id=algorithm_id,
running_max=running_max,
start_date=start_date,
end_date=end_date,
frequency=frequency,
initial_cash=initial_cash,
param_names=param_names,
param_values=param_values
)
# 回測結果指標中加入 log 收益率和超額收益率等指標
self.organize_backtest_results(benchmark_id)
# 生成 dict 保存所有結果。
results = {'returns':self.returns,
'excess_returns':self.excess_returns,
'log_returns':self.log_returns,
'log_excess_returns':self.log_excess_returns,
'dates':self.dates,
'benchmark_returns':self.benchmark_returns,
'evaluations':self.evaluations,
'params_df':self.params_df,
'backtest_ids':self.backtest_ids,
'excess_max_drawdown':self.excess_max_drawdown,
'excess_annual_return':self.excess_annual_return,
'evaluations_df':self.evaluations_df}
# 保存 pickle 文件
pickle_file = open(file_name, 'wb')
pickle.dump(results, pickle_file)
pickle_file.close()
# 讀取保存的 pickle 文件,賦予類中的對象名對應的保存內容
def read_backtest_data(self, file_name='results.pkl'):
pickle_file = open(file_name, 'rb')
results = pickle.load(pickle_file)
self.returns = results['returns']
self.excess_returns = results['excess_returns']
self.log_returns = results['log_returns']
self.log_excess_returns = results['log_excess_returns']
self.dates = results['dates']
self.benchmark_returns = results['benchmark_returns']
self.evaluations = results['evaluations']
self.params_df = results['params_df']
self.backtest_ids = results['backtest_ids']
self.excess_max_drawdown = results['excess_max_drawdown']
self.excess_annual_return = results['excess_annual_return']
self.evaluations_df = results['evaluations_df']
# 回報率折線圖
def plot_returns(self):
# 通過figsize參數可以指定繪圖對象的寬度和高度,單位為英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作圖
for key in self.returns.keys():
ax.plot(range(len(self.returns[key])), self.returns[key], label=key)
# 設定benchmark曲線並標記
ax.plot(range(len(self.benchmark_returns)), self.benchmark_returns, label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 設置圖例樣式
ax.legend(loc = 2, fontsize = 10)
# 設置y標簽樣式
ax.set_ylabel('returns',fontsize=20)
# 設置x標簽樣式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 設置圖片標題樣式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.returns[0]))
# 多空組合圖
def plot_long_short(self):
# 通過figsize參數可以指定繪圖對象的寬度和高度,單位為英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作圖
a1 = [i+1 for i in self.returns[0]]
a2 = [i+1 for i in self.returns[4]]
a1.insert(0,1)
a2.insert(0,1)
b = []
for i in range(len(a1)-1):
b.append((a1[i+1]/a1[i]-a2[i+1]/a2[i])/2)
c = []
c.append(1)
for i in range(len(b)):
c.append(c[i]*(1+b[i]))
ax.plot(range(len(c)), c)
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 設置圖例樣式
ax.legend(loc = 2, fontsize = 10)
ax.set_title("Strategy's long_short performances",fontsize=20)
# 設置圖片標題樣式
plt.xlim(0, len(c))
# 獲取不同年份的收益及排名分析
def get_profit_year(self):
profit_year = {}
for key in self.returns.keys():
temp = []
date_year = []
for i in range(len(self.dates)-1):
if self.dates[i][:4] != self.dates[i+1][:4]:
temp.append(self.returns[key][i])
date_year.append(self.dates[i][:4])
temp.append(self.returns[key][-1])
date_year.append(self.dates[-1][:4])
temp1 = []
temp1.append(temp[0])
for i in range(len(temp)-1):
temp1.append((temp[i+1]+1)/(temp[i]+1)-1)
profit_year[key] = temp1
result = pd.DataFrame(index = list(self.returns.keys()), columns = date_year)
for key in self.returns.keys():
result.loc[key,:] = profit_year[key]
return result
# 超額收益率圖
def plot_excess_returns(self):
# 通過figsize參數可以指定繪圖對象的寬度和高度,單位為英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作圖
for key in self.returns.keys():
ax.plot(range(len(self.excess_returns[key])), self.excess_returns[key], label=key)
# 設定benchmark曲線並標記
ax.plot(range(len(self.benchmark_returns)), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 設置圖例樣式
ax.legend(loc = 2, fontsize = 10)
# 設置y標簽樣式
ax.set_ylabel('excess returns',fontsize=20)
# 設置x標簽樣式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 設置圖片標題樣式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.excess_returns[0]))
# log回報率圖
def plot_log_returns(self):
# 通過figsize參數可以指定繪圖對象的寬度和高度,單位為英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作圖
for key in self.returns.keys():
ax.plot(range(len(self.log_returns[key])), self.log_returns[key], label=key)
# 設定benchmark曲線並標記
ax.plot(range(len(self.benchmark_returns)), [log(x+1) for x in self.benchmark_returns], label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 設置圖例樣式
ax.legend(loc = 2, fontsize = 10)
# 設置y標簽樣式
ax.set_ylabel('log returns',fontsize=20)
# 設置圖片標題樣式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.log_returns[0]))
# 超額收益率的 log 圖
def plot_log_excess_returns(self):
# 通過figsize參數可以指定繪圖對象的寬度和高度,單位為英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作圖
for key in self.returns.keys():
ax.plot(range(len(self.log_excess_returns[key])), self.log_excess_returns[key], label=key)
# 設定benchmark曲線並標記
ax.plot(range(len(self.benchmark_returns)), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 設置圖例樣式
ax.legend(loc = 2, fontsize = 10)
# 設置y標簽樣式
ax.set_ylabel('log excess returns',fontsize=20)
# 設置圖片標題樣式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.log_excess_returns[0]))
# 回測的4個主要指標,包括總回報率、最大回撤夏普率和波動
def get_eval4_bar(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
fig = plt.figure(figsize=(20,7))
# 定義位置
ax1 = fig.add_subplot(221)
# 設定橫軸為對應分位,縱軸為對應指標
ax1.bar(range(len(indices)),
[self.evaluations[x]['algorithm_return'] for x in indices], 0.6, label = 'Algorithm_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 設置圖例樣式
ax1.legend(loc='best',fontsize=15)
# 設置y標簽樣式
ax1.set_ylabel('Algorithm_return', fontsize=15)
# 設置y標簽樣式
ax1.set_yticklabels([str(x*100)+'% 'for x in ax1.get_yticks()])
# 設置圖片標題樣式
ax1.set_title("Strategy's of Algorithm_return performances of different quantile", fontsize=15)
# x軸範圍
plt.xlim(0, len(indices))
# 定義位置
ax2 = fig.add_subplot(224)
# 設定橫軸為對應分位,縱軸為對應指標
ax2.bar(range(len(indices)),
[self.evaluations[x]['max_drawdown'] for x in indices], 0.6, label = 'Max_drawdown')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 設置圖例樣式
ax2.legend(loc='best',fontsize=15)
# 設置y標簽樣式
ax2.set_ylabel('Max_drawdown', fontsize=15)
# 設置x標簽樣式
ax2.set_yticklabels([str(x*100)+'% 'for x in ax2.get_yticks()])
# 設置圖片標題樣式
ax2.set_title("Strategy's of Max_drawdown performances of different quantile", fontsize=15)
# x軸範圍
plt.xlim(0, len(indices))
# 定義位置
ax3 = fig.add_subplot(223)
# 設定橫軸為對應分位,縱軸為對應指標
ax3.bar(range(len(indices)),
[self.evaluations[x]['sharpe'] for x in indices], 0.6, label = 'Sharpe')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 設置圖例樣式
ax3.legend(loc='best',fontsize=15)
# 設置y標簽樣式
ax3.set_ylabel('Sharpe', fontsize=15)
# 設置x標簽樣式
ax3.set_yticklabels([str(x*100)+'% 'for x in ax3.get_yticks()])
# 設置圖片標題樣式
ax3.set_title("Strategy's of Sharpe performances of different quantile", fontsize=15)
# x軸範圍
plt.xlim(0, len(indices))
# 定義位置
ax4 = fig.add_subplot(222)
# 設定橫軸為對應分位,縱軸為對應指標
ax4.bar(range(len(indices)),
[self.evaluations[x]['algorithm_volatility'] for x in indices], 0.6, label = 'Algorithm_volatility')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 設置圖例樣式
ax4.legend(loc='best',fontsize=15)
# 設置y標簽樣式
ax4.set_ylabel('Algorithm_volatility', fontsize=15)
# 設置x標簽樣式
ax4.set_yticklabels([str(x*100)+'% 'for x in ax4.get_yticks()])
# 設置圖片標題樣式
ax4.set_title("Strategy's of Algorithm_volatility performances of different quantile", fontsize=15)
# x軸範圍
plt.xlim(0, len(indices))
#14 年化回報和最大回撤,正負雙色表示
def get_eval(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
# 大小
fig = plt.figure(figsize = (20, 8))
# 圖1位置
ax = fig.add_subplot(111)
# 生成圖超額收益率的最大回撤
ax.bar([x+0.3 for x in range(len(indices))],
[-self.evaluations[x]['max_drawdown'] for x in indices], color = '#32CD32',
width = 0.6, label = 'Max_drawdown', zorder=10)
# 圖年化超額收益
ax.bar([x for x in range(len(indices))],
[self.evaluations[x]['annual_algo_return'] for x in indices], color = 'r',
width = 0.6, label = 'Annual_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 設置圖例樣式
ax.legend(loc='best',fontsize=15)
# 基準線
plt.plot([0, len(indices)], [0, 0], c='k',
linestyle='--', label='zero')
# 設置圖例樣式
ax.legend(loc='best',fontsize=15)
# 設置y標簽樣式
ax.set_ylabel('Max_drawdown', fontsize=15)
# 設置x標簽樣式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 設置圖片標題樣式
ax.set_title("Strategy's performances of different quantile", fontsize=15)
# 設定x軸長度
plt.xlim(0, len(indices))
#14 超額收益的年化回報和最大回撤
# 加入新的benchmark後超額收益和
def get_excess_eval(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
# 大小
fig = plt.figure(figsize = (20, 8))
# 圖1位置
ax = fig.add_subplot(111)
# 生成圖超額收益率的最大回撤
ax.bar([x+0.3 for x in range(len(indices))],
[-self.excess_max_drawdown[x] for x in indices], color = '#32CD32',
width = 0.6, label = 'Excess_max_drawdown')
# 圖年化超額收益
ax.bar([x for x in range(len(indices))],
[self.excess_annual_return[x] for x in indices], color = 'r',
width = 0.6, label = 'Excess_annual_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 設置圖例樣式
ax.legend(loc='best',fontsize=15)
# 基準線
plt.plot([0, len(indices)], [0, 0], c='k',
linestyle='--', label='zero')
# 設置圖例樣式
ax.legend(loc='best',fontsize=15)
# 設置y標簽樣式
ax.set_ylabel('Max_drawdown', fontsize=15)
# 設置x標簽樣式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 設置圖片標題樣式
ax.set_title("Strategy's performances of different quantile", fontsize=15)
# 設定x軸長度
plt.xlim(0, len(indices))
def group_backtest(start_date,end_date):
warnings.filterwarnings("ignore")
pa = parameter_analysis('78c119654ae4be5678a2e92ac6565b3f')
pa.get_backtest_data(file_name = 'results_1.pkl',
running_max = 5,
start_date=start_date,
end_date=end_date,
frequency = 'day',
initial_cash = '10000000',
param_names = ['factor', 'quantile'],#變量名,即在策略中的g.xxxx變量
param_values = [['svm'], tuple(zip(range(0,50,10), range(10,51,10)))]
)
# 寫入自己的回測id
pa = parameter_analysis('f66b0e48aa76014f1b68a6823b3a123e')
# 回測開始(需要回測拿一個就用拿一個的facor)
# PS: 有一些因子在研究環境可以print,但是在策略環境中就不行,這個有點尷尬,不過絕大多數的因子還是可以共用兩個環境的
factor = 'gross_profit_margin'
pa.get_backtest_data(file_name = 'results.pkl',
running_max = 10,
benchmark_id = None,
start_date = '2012-01-01',#回測開始日期,自己設定
end_date = '2019-01-23',#回測結束日期,自己設定
frequency = 'day',
initial_cash = '1000000',
param_names = ['factor', 'quantile'],
param_values = [[factor], tuple(zip(range(0,100,10), range(10,101,10)))]#因子,自己設定,測試哪個就寫哪個
)
# 分層回測結果
pa.plot_returns()
# 多空淨值看單調
pa.plot_long_short()
# 其他結果
pa.get_eval4_bar()
後記:¶
下面說一下對於這個因子篩選框架的待解決問題和對於該問題的一些思考:
- 因子預測有效性的持久度:有些因子在樣本時間段內有很好的表現,可能在別的時間段就沒有這種表現,那麼這種 “泛化” 問題如何解決?能不能用半衰期 (先計算T0因子暴露對T0-T1、T1-T2……各種滯後時間周期的標的收益率的IC值,然後記錄IC減少到初始值的一半所用時間為半衰期), 用半衰期的各種統計指標如平均值,來衡量待測因子的預測有效性?
- 篩選本質問題:對於篩選掉(剔除掉)的因子,難道它們對於以後的標的收益率的預測就沒有貢獻嗎?能不能通過降維來達到 “篩選” 的目的?
參考資料:¶
上網查了一下知乎,發現有一篇講因子測試的知乎推文講的蠻好的(知乎求職廣告打得很溜,學習了):https://zhuanlan.zhihu.com/p/31733061
Fama和Barra兩個體系下的因子指標定義or介紹(主要是兩位大佬對因子暴露理解不一樣,其他個人認為還好),鏈接: https://henix.github.io/feeds/zhuanlan.factor-investing/2018-08-05-41339443.html
IC相關介紹,請看:《關於多因子模型的“IC信息系數”那些事》 https://zhuanlan.zhihu.com/p/24616859