在上一篇文章《曾经,我找到了投资的圣杯》中,发表了一下对小市值策略的缅怀和感慨之后,不得不面临着一个问题:
策略渐渐失效的情况下,账户的里面的资金何去何从。
这时候,我想起了被我遗忘在脑海角落里面的多因子策略。市值大小是选股因子的其中一个,那么其他因子呢?是否也能为我们投资提供指引?
毫无疑问,肯定是可以的。无论是市值因子,还是前不久火爆全场的“雄安”因子,对投资都有明显的指引,只是适用时间的长短不一而已。
不管是什么因子,可能都存在失效的时间。假如我们能够动态的获悉因子收益的情况,知道哪些因子有效、最近收益好,然后使用这些按照这些因子进行选股投资,效果会怎么样?
激动了一个晚上,开始将想法进行实施。
1 对各个因子的历史收益进行回测
这一点本来让我纠结了很久,因为想要在回测中利用回测的结果,除了人工处理,好像没有比较好的自动化方式进行实现。
直到在网上看到了一篇文章《【研究】量化选股-因子检验和多因子模型的构建》。
文章通过计算一个股票组合总的流通市值,实现了不同日期下的对比,明确了组合相对的涨跌幅。
重复对比获得涨跌幅,并累积连乘,很容易就得到了一段时间内的回测结果。
虽然中间没有计算手续费,不过我需要的,是对比的是不同因子之间的差异,并不一定要接近实盘的回测结果。所以,这样的回测,也就能满足我动态获取历史数据的需要。
2 按照一定的规则对回测结果进行评分
因为目的是想要找到最近比较有效的因子,所以考虑的都是因子近期的收益情况。
将各个因子在最近一年、半年、一季度、一个月、一周的收益情况进行降序排序,并且按照一定的权重,计算排序得分,分值越小排名越前。目前,我的计分的权重如下:
最近一年:0.1
最近半年:0.2
最近一季度:0.25
最近一月:0.3
最近一周:0.15
如果你非要问,这个权重到底是怎么得来的,我只能说,鬼使神差,^_^。
这里列出来,仅供大家参考。
3 回测的结果
我对小市值策略的幻想时间,2015年3月23日到2017年3月23日:
我的二八轮动小市值策略实盘时间,2016年6月6日到2017年4月20日:
更长的周期,2007年1月1日到2017年4月20日:
通过和小市值策略那好得难以想象的收益相比,这个策略确实还略显不足,不过应该还有优化和提升的空间。
目前的回测仅仅是通过单个因子获得的结果,肯定会有更好的收益评分权重、更多的因子组合方式、以及多因子评分权重,我正在不断地测试,大家期待我的结果吧。
众人拾柴火焰高,如果您也有兴趣的话,一起研究下吧。
代码能力有限,改的也是别人的代码,比较乱,凑合着看吧。另外,计分的权重在1721行,这个改起来容易^_^。
为了能让回测和研究共用数据,也烦恼了一阵子,然后又就有了这个:打通回测与研究的文件通道
十二年的历史回测数据,大概需要运行16个小时左右,可以将百度云盘中的文件"values_return_dict_history_20170429.pkl"上传到研究,直接使用。
附录:
用于回测的因子:
成长类因子:
营业收入同比增长率
营业收入环比增长率
净利润同比增长率
净利润环比增长率
营业利润率
销售净利率
销售毛利率
每元利润总额
每元营业利润
每元营业收入
每元所有者净利润
每元未分配利润
每元资本公积
每股盈余
规模类因子:
总市值
流通市值
总股本
流通股本
收盘价
每元资产总额
每元所有者权益
价值类因子:
市净率
市销率
市现率
动态市盈率
静态市盈率
质量类因子:
净资产收益率
总资产收益率
资产负债率
流动比率
总资产周转率
文章来源于公众号:止一之路
注册账号链接:注册joinquant账号
# 导包
import pandas as pd
from pandas import Series, DataFrame
import numpy as np
import statsmodels.api as sm
import scipy.stats as scs
import matplotlib.pyplot as plt
import time
from datetime import timedelta
import pickle
# 得到因子数据
def get_factors(fdate,factors):
#stock_set = get_index_stocks('000300.XSHG',fdate)
q = query(
valuation.code, # 股票代码
valuation.circulating_market_cap, # CMC 流通市值
valuation.market_cap, # MC 总市值
valuation.circulating_market_cap/valuation.capitalization*10000, # CMC/C 流通市值(亿)/总股本(万) (收盘价)
balance.total_owner_equities/valuation.market_cap/100000000, # TOE/MC 每元所有者权益
valuation.pb_ratio, # PB 市净率
income.net_profit/valuation.market_cap/100000000, # NP/MC 每元所有者净利润
income.total_profit/valuation.market_cap/100000000, # TP/MC 每元利润总额
balance.total_assets/valuation.market_cap/100000000, # TA/MC 每元资产总额
income.operating_profit/valuation.market_cap/100000000, # OP/MC 每元营业利润
balance.capital_reserve_fund/valuation.market_cap/100000000, # CRF/MC 每元资本公积
valuation.ps_ratio, # PS 市销率
income.operating_revenue/valuation.market_cap/100000000, # OR/MC 每元营业收入
balance.retained_profit/valuation.market_cap/100000000, # RP/MC 每元未分配利润
balance.total_liability/balance.total_sheet_owner_equities,# TL/TA 资产负债率
balance.total_current_assets/balance.total_current_liability, # TCA/TCL 流动比率
valuation.pe_ratio, # PE 市盈率
income.operating_revenue*indicator.roa/income.net_profit, # OR*ROA/NP 总资产周转率
indicator.gross_profit_margin, # GPM 销售毛利率
indicator.inc_revenue_year_on_year, # IRYOY 营业收入同比增长率(%)
indicator.inc_revenue_annual, # IRA 营业收入环比增长率(%)
indicator.inc_net_profit_year_on_year, # INPYOY 净利润同比增长率(%)
indicator.inc_net_profit_annual, # INPA 净利润环比增长率(%)
indicator.net_profit_margin, # NPM 销售净利率(%)
indicator.operation_profit_to_total_revenue, # OPTTR 营业利润/营业总收入(%)
valuation.capitalization,# C 总股本
valuation.circulating_cap, # CC 流通股本(万股)
valuation.pcf_ratio, # PR 市现率
valuation.pe_ratio_lyr, # PRL 市盈率LYR
indicator.roe, # ROE 净资产收益率ROE(%)
indicator.roa, # ROA 总资产净利率ROA(%)
indicator.eps # EPS 每股盈余
).filter(
#valuation.code.in_(stock_set),
valuation.circulating_market_cap
)
fdf = get_fundamentals(q, date=fdate)
fdf.index = fdf['code']
fdf.columns = ['code'] + factors
# 行:选择全部,列,返回除了股票代码所有因子
return fdf.iloc[:,1:]
# 计算股票回报
# 这里的想法很妙,通过对比当前时间的流通市值和下一个时间的流通市值,计算得到涨幅
def caculate_port_return(port,startdate,enddate,CMC):
close1 = get_price(port, startdate, startdate, 'daily', ['close'])
close2 = get_price(port, enddate, enddate, 'daily',['close'])
# 个股涨跌幅*流通市值,得到流通市值涨跌额
# 所有股票流通市值涨跌额加总,得到流通市值总涨跌额
# 流通市值总涨跌额 和 之前的流通市值之比,得到流通市值涨跌幅度
weighted_m_return = ((close2['close'].ix[0,:]/close1['close'].ix[0,:]-1)*CMC).sum()/(CMC.ix[port].sum())
return weighted_m_return
# 计算基准回报
def caculate_benchmark_return(startdate,enddate):
close1 = get_price(['000001.XSHG'],startdate,startdate,'daily',['close'])['close']
close2 = get_price(['000001.XSHG'],enddate, enddate, 'daily',['close'])['close']
benchmark_return = (close2.ix[0,:]/close1.ix[0,:]-1).sum()
return benchmark_return
# 计算收益并形成dict
def get_return_values(date_list, factors, values_return_dict_history):
try:
date_history = values_return_dict_history.keys()
result = values_return_dict_history
except:
date_history = []
result = {}
date_temp_list = [ i for i in date_list[:-1] if i not in date_history ]
if len(date_temp_list) == 0 :
return values_return_dict_history
# 只要date_temp_list在date_list中有上一个,就增加
for date in date_temp_list:
if date_list.index(date) > 0 :
date_add = date_list[date_list.index(date)-1]
date_temp_list = [date_add] + date_temp_list
# 如果date_temp_list的最后一个在data_list中是倒数第二个,则去掉
if date_list.index(date_temp_list[-1]) == len(date_list) - 2 :
date_temp_list.remove(date_temp_list[-1])
print "date_temp_list: %s" %date_temp_list
for date in date_temp_list:
startdate = date_list[date_list.index(date)]
enddate = date_list[date_list.index(date) + 1]
# 如果已经存在,则不计算
if enddate in result.keys():
continue
print "回测日期: %s" % enddate
# 因子列表
fdf = get_factors(startdate,factors)
# 流通市值
CMC = fdf['CMC']
# 5个组合
df = DataFrame(np.zeros(6*len(factors)).reshape(6,len(factors)),index = ['port1','port2','port3','port4','port5','benchmark'],columns = factors)
for fac in factors:
# 根据因子升序排序
score = fdf[fac].order()
# 将股票划分为5部分
port1 = list(score.index)[: len(score)/5]
port2 = list(score.index)[ len(score)/5+1: 2*len(score)/5]
port3 = list(score.index)[ 2*len(score)/5+1: -2*len(score)/5]
port4 = list(score.index)[ -2*len(score)/5+1: -len(score)/5]
port5 = list(score.index)[ -len(score)/5+1: ]
# 获得每一部分的收益
df.ix['port1',fac] = caculate_port_return(port1,startdate,enddate,CMC)
df.ix['port2',fac] = caculate_port_return(port2,startdate,enddate,CMC)
df.ix['port3',fac] = caculate_port_return(port3,startdate,enddate,CMC)
df.ix['port4',fac] = caculate_port_return(port4,startdate,enddate,CMC)
df.ix['port5',fac] = caculate_port_return(port5,startdate,enddate,CMC)
# 获得指数的收益
df.ix['benchmark',fac] = caculate_benchmark_return(startdate,enddate)
# 赋值给字典
result[enddate]=df
return result
# 因子有效性检验
def effect_test(values_return, factors):
total_return = {} # 总回报
annual_return = {} # 年化符合收益
excess_return = {} # 超额收益
win_prob = {} # 胜率
loss_prob = {} # 负率
effect_test = {} # 有效性检验
MinCorr = 0.3 # 最小相关阀值
Minbottom = -0.05 # 最小超额亏损
Mintop = 0.05 # 最小超额收益
for fac in factors:
effect_test[fac] = {} # 每个因子的有效性检验建立字典
daily = values_return[:,:,fac] # 获得各个月回报
#print daily
#print "fac :%s" % fac
total_return[fac] = (daily+1).T.cumprod().iloc[-1,:]-1 # 总收益
annual_return[fac] = total_return[fac] # 计算一年
excess_return[fac] = annual_return[fac]- annual_return[fac][-1] # 减去指数收益,获得超额收益
#判断因子有效性
#1.年化收益与组合序列的相关性 大于 阀值
# 一找你的有效性检验字典-收益相关性
effect_test[fac]['corr'] = annual_return[fac][0:5].corr(Series([1,2,3,4,5],index = annual_return[fac][0:5].index))
#2.高收益组合跑赢概率
#因子小,收益小,port1是输家组合,port5是赢家组合
if total_return[fac][0] < total_return[fac][-2]:
loss_excess = daily.iloc[0,:]-daily.iloc[-1,:] # 相对指数,每个月的超额损失
loss_prob[fac] = loss_excess[loss_excess<0].count()/float(len(loss_excess)) # 出现超额损失的概率
win_excess = daily.iloc[-2,:]-daily.iloc[-1,:] # 相对指数,每个月的超额收益
win_prob[fac] = win_excess[win_excess>0].count()/float(len(win_excess)) # 出现超额收益的概率
# 因子的有效性检验字典-胜负率
effect_test[fac]['prob_win'] = win_prob[fac]
effect_test[fac]['prob_lose'] = loss_prob[fac]
# 超额收益
# 因子的有效性检验字典-赢家组合的年化收益,输家组合的年化收益
effect_test[fac]["excess_return_win"] = excess_return[fac][-2]*100
effect_test[fac]["excess_return_lose"] = excess_return[fac][0]*100
#因子小,收益大,port1是赢家组合,port5是输家组合
else:
loss_excess = daily.iloc[-2,:]-daily.iloc[-1,:] # 相对指数,每个月的超额损失
loss_prob[fac] = loss_excess[loss_excess<0].count()/float(len(loss_excess)) # 出现超额损失的概率
win_excess = daily.iloc[0,:]-daily.iloc[-1,:] # 相对指数,每个月的超额收益
win_prob[fac] = win_excess[win_excess>0].count()/float(len(win_excess)) # 出现超额收益的概率
# 因子的有效性检验字典-胜负率
effect_test[fac]['prob_win'] = win_prob[fac]
effect_test[fac]['prob_lose'] = loss_prob[fac]
#超额收益
# 因子的有效性检验字典-赢家组合的年化收益,输家组合的年化收益
effect_test[fac]["excess_return_win"] = excess_return[fac][0]*100
effect_test[fac]["excess_return_lose"] = excess_return[fac][-2]*100
#effect_test[1]记录因子相关性,>0.5或<-0.5合格
#effect_test[2]记录【赢家组合超额收益,输家组合超额收益】
#effect_test[3]记录赢家组合跑赢概率和输家组合跑输概率。【>0.5,>0.4】合格(因实际情况,跑输概率暂时不考虑)
effect_test_df = DataFrame(effect_test)
effect_test_df_T = effect_test_df.T
# 条件1 相关性绝对值大于0.5,值越大越有效
#effect_test_df_T = effect_test_df_T[abs(effect_test_df_T['corr']) > 0.5]
# 条件3 胜率大于0.5,胜率越大效果越好
#effect_test_df_T = effect_test_df_T[effect_test_df_T['prob_win'] > 0.4]
return effect_test_df_T
# 计算得分
def caculate_score(scores_return_panel, date_score, effect_test_df_T, effective_factors):
# 有效因子年,半年,季,月,周 回报结果
timesOfReturn = ['year','halfyear','season','month','week','day']
score_df = DataFrame(np.zeros(len(effective_factors)*len(timesOfReturn)).reshape(len(effective_factors),len(timesOfReturn)),index = effective_factors,columns = timesOfReturn)
for fac in effective_factors:
# 相关度为负数,因子小收益大;相关度为正数,因子大收益大
if effect_test_df_T.ix[fac,"corr"] < 0:
strCorr = "port1"
else:
strCorr = "port5"
returns = []
date_list = scores_return_panel.items.tolist()
# 从需要计分的开始日期,一直到最后一天,累积连乘得到这一段时间的收益
for date in date_score[:-1]:
i = date_list.index(date)+1
returns.append((scores_return_panel[date_list[i:],strCorr,fac].T+1).cumprod()[scores_return_panel.items.tolist()[-1]]-1)
score_df.ix[fac] = returns
# 因子回报排名
for column in timesOfReturn:
score_df = score_df.sort(columns=[column], ascending=[False])
score_df.reset_index(range(1,len(score_df) + 1), inplace = True)
score_df.index = score_df.index + 1
score_df[column + '_rank'] = score_df.index
if column == 'year':
score_df.rename(columns={"index":"Factors"}, inplace = True)
else:
score_df.drop(['index'], axis=1, inplace=True)
score_df['year_rank'] = score_df.index
# 计分
score_df['score'] = score_df['year_rank']*0.1 + score_df['halfyear_rank']*0.2 + score_df['season_rank']*0.25 + score_df['month_rank']*0.3 + score_df['week_rank']*0.15 #+ score_df['day_rank']*0.1
score_df = score_df.sort(columns=['score'], ascending=[True])
return score_df
# 运行的内容
print "开始时间 %s " % time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
startTotal = time.time()
# 今天和一年前的今天
today = pd.datetime.today()
yearBefore = today-timedelta(days=365*12) # 前面12年
start = yearBefore.strftime('%Y-%m-%d') # 开始日期
end = today.strftime('%Y-%m-%d') # 截止日期
print "start: %s" % start
print "end: %s" % end
# 获得一年的交易日列表
date_df = get_price('000001.XSHG', start_date=start, end_date=end, frequency='daily', fields=['close'], fq=None)
date_list = date_df.index.tolist()
print date_list[-5:]
# 用于计分的日期
i = -2
date_score = [date_list[-240 + i]]# 1年前
date_score.append(date_list[-120 + i]) # 半年度前
date_score.append(date_list[-60 + i]) # 1季度前
date_score.append(date_list[-20 + i]) # 1月前
date_score.append(date_list[-5 + i]) # 1周前
date_score.append(date_list[-1 + i]) # 1天前
date_score.append(date_list[i]) # 当日
# 因子
factors = ['CMC','MC','CMC/C','TOE/MC','PB','NP/MC','TP/MC','TA/MC','OP/MC','CRF/MC','PS','OR/MC','RP/MC','TL/TA','TCA/TCL','PE','OR*ROA/NP','GPM','IRYOY','IRA','INPYOY','INPA','NPM','OPTTR','C','CC','PR','PRL','ROE','ROA','EPS']
# 获得每月回报
try:
# values_return_dict_history 不为空的情况
values_return_dict = get_return_values(date_list, factors, values_return_dict_history)
except:
# 读取文件
pkl_file = open('values_return_dict_history_20170421.pkl', 'rb')
values_return_dict_history = pickle.load(pkl_file)
pkl_file.close()
values_return_dict = get_return_values(date_list, factors, values_return_dict_history)
# dict格式转换成panel
values_return_panel = pd.Panel(values_return_dict)[date_list[1:],:,:]
# 记录历史,方便回测
values_return_dict_history = values_return_dict
output = open('values_return_dict_history_20170421.pkl', 'wb')
pickle.dump(values_return_dict_history, output)
output.close()
print "序列化保存对象:values_return_dict_history"
# 得到年、季、月、周、日收益
indexStart = date_list.index(date_score[0]) # 第一个日期
indexEnd = date_list.index(date_score[-1]) # 最后一个日期
# 得到date_score对应的日期,并筛选对应用于计分的数据
scores_return_panel = pd.Panel(values_return_dict_history)[date_list[indexStart:indexEnd+1],:,:]
# 因子检验 (添加了有效性条件后,发现回测效果更差。目前的作用是,计算因子的正负相关性)
effect_test_df_T = effect_test(values_return_panel, factors)
# 有效因子
effective_factors = effect_test_df_T.index.tolist()
# 计算得分
score_df = caculate_score(scores_return_panel, date_score, effect_test_df_T, effective_factors)
# 排名前25%的因子
fac_num = int(len(factors)*0.25)
print "fac_num %s"%fac_num
list3 = score_df['Factors'].tolist()[:fac_num]
effect_test_df_T['ascending'] = effect_test_df_T['corr'] < 0 # 相关性小于0,为True,大于0为False
result3 = effect_test_df_T['ascending'].T[list3]
print "第一个因子:%s" % result3.index[0]
print "第一个因子的排序:%s" % result3.ix[0]
print "排名前25%的因子情况:"
print result3
# 根据得到的因子进行选股
fac = result3.index[0]
ascending = result3.ix[fac]
fdf = get_factors(end,factors)
score_stock = fdf[fac].order(ascending = ascending).index.tolist()[:100]
endTotal = time.time()
print "结束时间 %s " % time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
secondUseTotal = endTotal - startTotal
print "所用时间(秒) %s" % secondUseTotal
print "所用时间(分) %s" % (secondUseTotal / 60 )
print "所用时间(时) %s" % (secondUseTotal / 60 /60 )
# 结果转换为中文名称
# 行名
correspondenceRow = {'CMC':'流通市值',
'MC':'总市值',
'CMC/C':'收盘价',
'TOE/MC':'每元所有者权益',
'PB':'市净率',
'NP/MC':'每元所有者净利润',
'TP/MC':'每元利润总额',
'TA/MC':'每元资产总额',
'OP/MC':'每元营业利润',
'CRF/MC':'每元资本公积',
'PS':'市销率',
'OR/MC':'每元营业收入',
'RP/MC':'每元未分配利润',
'TL/TA':'资产负债率',
'TCA/TCL':'流动比率',
'PE':'市盈率',
'OR*ROA/NP':'总资产周转率',
'GPM':'销售毛利率',
'IRYOY':'营业收入同比增长率',
'IRA':'营业收入环比增长率',
'INPYOY':'净利润同比增长率',
'INPA':'净利润环比增长率',
'NPM':'销售净利率',
'OPTTR':'营业利润/营业总收入',
'C':'总股本',
'CC':'流通股本(万股)',
'PR':'市现率',
'PRL':'市盈率LYR',
'ROE':'净资产收益率ROE',
'ROA':'总资产净利率ROA',
'EPS':'每股盈余'}
# 列名
correspondenceColumn = {'year':"一年收益(%)",
'halfyear':"半年收益(%)",
'season':"一季收益(%)",
'month':"一月收益(%)",
'week':"一周收益(%)",
'day':"一日收益(%)",
'year_rank':"年排序",
'halfyear_rank':"半年排序",
'season_rank':"季度排序",
'month_rank':"月排序",
'week_rank':"周排序",
'day_rank':"日排序",
'score':"排序计分",}
result = score_df
result = result.set_index('Factors')
result[['year','halfyear','season','month','week','day']] = result[['year','halfyear','season','month','week','day']]*100
# 列标题修改
for k,v in correspondenceColumn.items():
result.rename(columns={k:v}, inplace = True)
result_T = result.T
# 行标题修改
for k,v in correspondenceRow.items():
result_T.rename(columns={k:v}, inplace = True)
result = result_T.T
print "日期:%s" % today.strftime('%Y-%m-%d')
result
# 导包
import pandas as pd
from pandas import Series, DataFrame
import numpy as np
import statsmodels.api as sm
import scipy.stats as scs
import matplotlib.pyplot as plt
import time
from datetime import timedelta
import pickle
time1 = datetime.datetime(2017,4,25)
time2 = datetime.datetime(2017,4,26)
time3 = datetime.datetime(2017,4,21)
# 读取文件
pkl_file = open('values_return_dict_history_20170421.pkl', 'rb')
values_return_dict_history = pickle.load(pkl_file)
pkl_file.close()
values_return_dict_history[time3]
'''
values_return_dict_history.pop(time1)
values_return_dict_history.pop(time2)
values_return_dict_history.pop(time3)
output = open('values_return_dict_history_20170421.pkl', 'wb')
pickle.dump(values_return_dict_history, output)
output.close()
'''