本文参考中国银河证券黎鹏研报《多因子系列:基于多因子框架的收益预测模型》,再次感谢分析师 黎鹏 在研报中提供的思路和方法。
研究目的:
上一篇报告《多因子系列:多因子模型体系之因子组合的确定》中,介绍了多因子模型建模的第一步:单因子的效果测试以及确定纳入多因子回归公式中的因子组合。本文将对多因子模型的第二步收益预测模型进行研究,分别对收益移动均值模型、收益加权移动均值模型以及残差模型进行分析,从中筛选出最佳模型用于对收益进行预测,给研究人员提供了一定思路进行因子组合。
研究内容:
(1)首先,上篇报告《多因子系列:多因子模型体系之因子组合的确定》中按照因子收益能力、因子暴露以及相关性几个维度确定了三组因子组合:换手率和EPS,ROE 和换手率,股本和换手率,其中最佳的为股本和换手率组别。本文基于该因子组合进行相应回测以及预测分析。
(2)分析收益移动均值模型对未来收益的预测能力,分别通过分层回测以及预测周期参数两方面,对模型的预测能力进行分析;
(3)采用与收益移动均值模型类似的方式,通过分层回测和预测周期参数调整这两方面,分析收益加权移动均值模型的预测能力;
(4)分析残差模型的收益能力;
(5)根据模型分析结果,选择出最佳预测模型,根据该模型,当选择股票数量下降时,分析模型的收益能力。
研究结论:
基于回归模型对日频因子收益率进行计算,然后根据收益移动均值模型,收益加权移动均值模型,以及残差模型这三种模型分析模型对股票未来收益的预测能力。
(1)收益移动均值模型:通过回归模型计算日频的因子收益率,然后向前滚动 N(N=20)个交易日,计算因子收益率均值,根据实时因子数据预测个股未来收益,根据 10 层分层回测结果来看,该方法分层能力较强,通过分析向前滚动交易日参数,结果表明该模型对短期数据较有效。
(2)收益加权移动均值模型:通过回归模型计算日频的因子收益率,然后向前滚动 N(N=20)个交易日,计算因子收益率加权均值,通过加入衰减系数来改变数据权重。从测试情况看,该方法分层能力比收益移动均值模型更强,且通过分析向前滚动交易日参数,结果表明该模型仍然对短期数据较有效。
(3)残差模型:通过回归模型计算所得的残差收益,然后向前滚动 N(N=20)个交易日,取其均值作为个股收益预测值。从测试情况看,该方法更适合去除风险个股。
1 收益移动均值模型¶
本文基于收益移动均值模型的构建原理对此模型进行复现,复现过程如下所示。
策略步骤:
(1)按照日频进行因子的截面数据回归,从而获得因子的日频因子收益;
(2)向前滚动 N 个交易日,计算 N 个交易日的因子收益平均值;
(3)用因子收益平均值作为因子收益的预测值,代入因子模型从而得出个股的收益预测值;
(4)个股按照预测值排序打分。
1.1 分层回测¶
为了测试一下收益移动均值模型的优劣,通过分层回测的方式对改模型进行分析。本文进行 10 层分层回测。
评价方法: 回测年化收益率、夏普比率、最大回撤、胜率等。
回测年化收益率: 年化收益率通常指投资一年后能够获得的收益率,由于回测时间的长短,往往会由于复利的影响导致长时间的总收益率更大,此时可通过年化收益率衡量模型的收益能力。
夏普比率: 目的是计算投资组合每承受一单位总风险,会产生多少的超额报酬。
最大回撤: 最大回撤是指模型在过去的某一段时间可能出现的最大亏损程度,通常用来衡量模型的风险。在实际投资中,若是出现最大回撤较大的情况,往往会导致投资者对模型丧失信心,因此合理控制模型的最大回撤显得尤为重要。
#1 先导入所需要的程序包
import datetime
import numpy as np
import pandas as pd
import time
from jqdata import *
from pandas import Series, DataFrame
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import copy
import pickle
# 定义类'参数分析'
class parameter_analysis(object):
# 定义函数中不同的变量
def __init__(self, algorithm_id=None):
self.algorithm_id = algorithm_id # 回测id
self.params_df = pd.DataFrame() # 回测中所有调参备选值的内容,列名字为对应修改面两名称,对应回测中的 g.XXXX
self.results = {} # 回测结果的回报率,key 为 params_df 的行序号,value 为
self.evaluations = {} # 回测结果的各项指标,key 为 params_df 的行序号,value 为一个 dataframe
self.backtest_ids = {} # 回测结果的 id
# 新加入的基准的回测结果 id,可以默认为空 '',则使用回测中设定的基准
self.benchmark_id = 'ae0684d86e9e7128b1ab9c7d77893029'
self.benchmark_returns = [] # 新加入的基准的回测回报率
self.returns = {} # 记录所有回报率
self.excess_returns = {} # 记录超额收益率
self.log_returns = {} # 记录收益率的 log 值
self.log_excess_returns = {} # 记录超额收益的 log 值
self.dates = [] # 回测对应的所有日期
self.excess_max_drawdown = {} # 计算超额收益的最大回撤
self.excess_annual_return = {} # 计算超额收益率的年化指标
self.evaluations_df = pd.DataFrame() # 记录各项回测指标,除日回报率外
# 定义排队运行多参数回测函数
def run_backtest(self, #
algorithm_id=None, # 回测策略id
running_max=10, # 回测中同时巡行最大回测数量
start_date='2006-01-01', # 回测的起始日期
end_date='2016-11-30', # 回测的结束日期
frequency='day', # 回测的运行频率
initial_cash='1000000', # 回测的初始持仓金额
param_names=[], # 回测中调整参数涉及的变量
param_values=[] # 回测中每个变量的备选参数值
):
# 当此处回测策略的 id 没有给出时,调用类输入的策略 id
if algorithm_id == None: algorithm_id=self.algorithm_id
# 生成所有参数组合并加载到 df 中
# 包含了不同参数具体备选值的排列组合中一组参数的 tuple 的 list
param_combinations = list(itertools.product(*param_values))
# 生成一个 dataframe, 对应的列为每个调参的变量,每个值为调参对应的备选值
to_run_df = pd.DataFrame(param_combinations)
# 修改列名称为调参变量的名字
to_run_df.columns = param_names
# 设定运行起始时间和保存格式
start = time.time()
# 记录结束的运行回测
finished_backtests = {}
# 记录运行中的回测
running_backtests = {}
# 计数器
pointer = 0
# 总运行回测数目,等于排列组合中的元素个数
total_backtest_num = len(param_combinations)
# 记录回测结果的回报率
all_results = {}
# 记录回测结果的各项指标
all_evaluations = {}
# 在运行开始时显示
print '【已完成|运行中|待运行】:',
# 当运行回测开始后,如果没有全部运行完全的话:
while len(finished_backtests)<total_backtest_num:
# 显示运行、完成和待运行的回测个数
print('[%s|%s|%s].' % (len(finished_backtests),
len(running_backtests),
(total_backtest_num-len(finished_backtests)-len(running_backtests)) )),
# 记录当前运行中的空位数量
to_run = min(running_max-len(running_backtests), total_backtest_num-len(running_backtests)-len(finished_backtests))
# 把可用的空位进行跑回测
for i in range(pointer, pointer+to_run):
# 备选的参数排列组合的 df 中第 i 行变成 dict,每个 key 为列名字,value 为 df 中对应的值
params = to_run_df.ix[i].to_dict()
# 记录策略回测结果的 id,调整参数 extras 使用 params 的内容
backtest = create_backtest(algorithm_id = algorithm_id,
start_date = start_date,
end_date = end_date,
frequency = frequency,
initial_cash = initial_cash,
extras = params,
# 再回测中把改参数的结果起一个名字,包含了所有涉及的变量参数值
name = str(params)
)
# 记录运行中 i 回测的回测 id
running_backtests[i] = backtest
# 计数器计数运行完的数量
pointer = pointer+to_run
# 获取回测结果
failed = []
finished = []
# 对于运行中的回测,key 为 to_run_df 中所有排列组合中的序数
for key in running_backtests.keys():
# 研究调用回测的结果,running_backtests[key] 为运行中保存的结果 id
bt = get_backtest(running_backtests[key])
# 获得运行回测结果的状态,成功和失败都需要运行结束后返回,如果没有返回则运行没有结束
status = bt.get_status()
# 当运行回测失败
if status == 'failed':
# 失败 list 中记录对应的回测结果 id
failed.append(key)
# 当运行回测成功时
elif status == 'done':
# 成功 list 记录对应的回测结果 id,finish 仅记录运行成功的
finished.append(key)
# 回测回报率记录对应回测的回报率 dict, key to_run_df 中所有排列组合中的序数, value 为回报率的 dict
# 每个 value 一个 list 每个对象为一个包含时间、日回报率和基准回报率的 dict
all_results[key] = bt.get_results()
# 回测回报率记录对应回测结果指标 dict, key to_run_df 中所有排列组合中的序数, value 为回测结果指标的 dataframe
all_evaluations[key] = bt.get_risk()
# 记录运行中回测结果 id 的 list 中删除失败的运行
for key in failed:
running_backtests.pop(key)
# 在结束回测结果 dict 中记录运行成功的回测结果 id,同时在运行中的记录中删除该回测
for key in finished:
finished_backtests[key] = running_backtests.pop(key)
# 当一组同时运行的回测结束时报告时间
if len(finished_backtests) != 0 and len(finished_backtests) % running_max == 0 and to_run !=0:
# 记录当时时间
middle = time.time()
# 计算剩余时间,假设没工作量时间相等的话
remain_time = (middle - start) * (total_backtest_num - len(finished_backtests)) / len(finished_backtests)
# print 当前运行时间
print('[已用%s时,尚余%s时,请不要关闭浏览器].' % (str(round((middle - start) / 60.0 / 60.0,3)),
str(round(remain_time / 60.0 / 60.0,3)))),
# 5秒钟后再跑一下
time.sleep(5)
# 记录结束时间
end = time.time()
print ''
print('【回测完成】总用时:%s秒(即%s小时)。' % (str(int(end-start)),
str(round((end-start)/60.0/60.0,2)))),
# 对应修改类内部对应
self.params_df = to_run_df
self.results = all_results
self.evaluations = all_evaluations
self.backtest_ids = finished_backtests
#7 最大回撤计算方法
def find_max_drawdown(self, returns):
# 定义最大回撤的变量
result = 0
# 记录最高的回报率点
historical_return = 0
# 遍历所有日期
for i in range(len(returns)):
# 最高回报率记录
historical_return = max(historical_return, returns[i])
# 最大回撤记录
drawdown = 1-(returns[i] + 1) / (historical_return + 1)
# 记录最大回撤
result = max(drawdown, result)
# 返回最大回撤值
return result
# log 收益、新基准下超额收益和相对与新基准的最大回撤
def organize_backtest_results(self, benchmark_id=None):
# 若新基准的回测结果 id 没给出
if benchmark_id==None:
# 使用默认的基准回报率,默认的基准在回测策略中设定
self.benchmark_returns = [x['benchmark_returns'] for x in self.results[0]]
# 当新基准指标给出后
else:
# 基准使用新加入的基准回测结果
self.benchmark_returns = [x['returns'] for x in get_backtest(benchmark_id).get_results()]
# 回测日期为结果中记录的第一项对应的日期
self.dates = [x['time'] for x in self.results[0]]
# 对应每个回测在所有备选回测中的顺序 (key),生成新数据
# 由 {key:{u'benchmark_returns': 0.022480100091729405,
# u'returns': 0.03184566700000002,
# u'time': u'2006-02-14'}} 格式转化为:
# {key: []} 格式,其中 list 为对应 date 的一个回报率 list
for key in self.results.keys():
self.returns[key] = [x['returns'] for x in self.results[key]]
# 生成对于基准(或新基准)的超额收益率
for key in self.results.keys():
self.excess_returns[key] = [(x+1)/(y+1)-1 for (x,y) in zip(self.returns[key], self.benchmark_returns)]
# 生成 log 形式的收益率
for key in self.results.keys():
self.log_returns[key] = [log(x+1) for x in self.returns[key]]
# 生成超额收益率的 log 形式
for key in self.results.keys():
self.log_excess_returns[key] = [log(x+1) for x in self.excess_returns[key]]
# 生成超额收益率的最大回撤
for key in self.results.keys():
self.excess_max_drawdown[key] = self.find_max_drawdown(self.excess_returns[key])
# 生成年化超额收益率
for key in self.results.keys():
self.excess_annual_return[key] = (self.excess_returns[key][-1]+1)**(252./float(len(self.dates)))-1
# 把调参数据中的参数组合 df 与对应结果的 df 进行合并
self.evaluations_df = pd.concat([self.params_df, pd.DataFrame(self.evaluations).T], axis=1)
# self.evaluations_df =
# 获取最总分析数据,调用排队回测函数和数据整理的函数
def get_backtest_data(self,
algorithm_id=None, # 回测策略id
benchmark_id=None, # 新基准回测结果id
file_name='results.pkl', # 保存结果的 pickle 文件名字
running_max=10, # 最大同时运行回测数量
start_date='2006-01-01', # 回测开始时间
end_date='2016-11-30', # 回测结束日期
frequency='day', # 回测的运行频率
initial_cash='1000000', # 回测初始持仓资金
param_names=[], # 回测需要测试的变量
param_values=[] # 对应每个变量的备选参数
):
# 调运排队回测函数,传递对应参数
self.run_backtest(algorithm_id=algorithm_id,
running_max=running_max,
start_date=start_date,
end_date=end_date,
frequency=frequency,
initial_cash=initial_cash,
param_names=param_names,
param_values=param_values
)
# 回测结果指标中加入 log 收益率和超额收益率等指标
self.organize_backtest_results(benchmark_id)
# 生成 dict 保存所有结果。
results = {'returns':self.returns,
'excess_returns':self.excess_returns,
'log_returns':self.log_returns,
'log_excess_returns':self.log_excess_returns,
'dates':self.dates,
'benchmark_returns':self.benchmark_returns,
'evaluations':self.evaluations,
'params_df':self.params_df,
'backtest_ids':self.backtest_ids,
'excess_max_drawdown':self.excess_max_drawdown,
'excess_annual_return':self.excess_annual_return,
'evaluations_df':self.evaluations_df}
# 保存 pickle 文件
pickle_file = open(file_name, 'wb')
pickle.dump(results, pickle_file)
pickle_file.close()
# 读取保存的 pickle 文件,赋予类中的对象名对应的保存内容
def read_backtest_data(self, file_name='results.pkl'):
pickle_file = open(file_name, 'rb')
results = pickle.load(pickle_file)
self.returns = results['returns']
self.excess_returns = results['excess_returns']
self.log_returns = results['log_returns']
self.log_excess_returns = results['log_excess_returns']
self.dates = results['dates']
self.benchmark_returns = results['benchmark_returns']
self.evaluations = results['evaluations']
self.params_df = results['params_df']
self.backtest_ids = results['backtest_ids']
self.excess_max_drawdown = results['excess_max_drawdown']
self.excess_annual_return = results['excess_annual_return']
self.evaluations_df = results['evaluations_df']
# 回报率折线图
def plot_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in self.returns.keys():
ax.plot(range(len(self.returns[key])), self.returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(range(len(self.benchmark_returns)), self.benchmark_returns, label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('returns',fontsize=20)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.returns[0]))
# 多空组合图
def plot_long_short(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
a1 = [i+1 for i in self.returns[0]]
a2 = [i+1 for i in self.returns[4]]
a1.insert(0,1)
a2.insert(0,1)
b = []
for i in range(len(a1)-1):
b.append((a1[i+1]/a1[i]-a2[i+1]/a2[i])/2)
c = []
c.append(1)
for i in range(len(b)):
c.append(c[i]*(1+b[i]))
ax.plot(range(len(c)), c)
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
ax.set_title("Strategy's long_short performances",fontsize=20)
# 设置图片标题样式
plt.xlim(0, len(c))
return c
# 多空组合图
def plot_long_short_bench(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
a1 = [i+1 for i in self.returns[0]]
a2 = [i+1 for i in self.benchmark_returns]
a1.insert(0,1)
a2.insert(0,1)
b = []
for i in range(len(a1)-1):
b.append((a1[i+1]/a1[i]-a2[i+1]/a2[i])/2)
c = []
c.append(1)
for i in range(len(b)):
c.append(c[i]*(1+b[i]))
ax.plot(range(len(c)), c)
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
ax.set_title("Strategy's long_short_bench performances",fontsize=20)
# 设置图片标题样式
plt.xlim(0, len(c))
return c
# 获取不同年份的收益及排名分析
def get_profit_year(self):
profit_year = {}
for key in self.returns.keys():
temp = []
date_year = []
for i in range(len(self.dates)-1):
if self.dates[i][:4] != self.dates[i+1][:4]:
temp.append(self.returns[key][i])
date_year.append(self.dates[i][:4])
temp.append(self.returns[key][-1])
date_year.append(self.dates[-1][:4])
temp1 = []
temp1.append(temp[0])
for i in range(len(temp)-1):
temp1.append((temp[i+1]+1)/(temp[i]+1)-1)
profit_year[key] = temp1
result = pd.DataFrame(index = list(self.returns.keys()), columns = date_year)
for key in self.returns.keys():
result.loc[key,:] = profit_year[key]
return result
# 超额收益率图
def plot_excess_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in self.returns.keys():
ax.plot(range(len(self.excess_returns[key])), self.excess_returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(range(len(self.benchmark_returns)), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('excess returns',fontsize=20)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.excess_returns[0]))
# log回报率图
def plot_log_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in self.returns.keys():
ax.plot(range(len(self.log_returns[key])), self.log_returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(range(len(self.benchmark_returns)), [log(x+1) for x in self.benchmark_returns], label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('log returns',fontsize=20)
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.log_returns[0]))
# 超额收益率的 log 图
def plot_log_excess_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in self.returns.keys():
ax.plot(range(len(self.log_excess_returns[key])), self.log_excess_returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(range(len(self.benchmark_returns)), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('log excess returns',fontsize=20)
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.log_excess_returns[0]))
# 回测的4个主要指标,包括总回报率、最大回撤夏普率和波动
def get_eval4_bar(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
fig = plt.figure(figsize=(20,7))
# 定义位置
ax1 = fig.add_subplot(221)
# 设定横轴为对应分位,纵轴为对应指标
ax1.bar(range(len(indices)),
[self.evaluations[x]['algorithm_return'] for x in indices], 0.6, label = 'Algorithm_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax1.legend(loc='best',fontsize=15)
# 设置y标签样式
ax1.set_ylabel('Algorithm_return', fontsize=15)
# 设置y标签样式
ax1.set_yticklabels([str(x*100)+'% 'for x in ax1.get_yticks()])
# 设置图片标题样式
ax1.set_title("Strategy's of Algorithm_return performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax2 = fig.add_subplot(224)
# 设定横轴为对应分位,纵轴为对应指标
ax2.bar(range(len(indices)),
[self.evaluations[x]['max_drawdown'] for x in indices], 0.6, label = 'Max_drawdown')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax2.legend(loc='best',fontsize=15)
# 设置y标签样式
ax2.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax2.set_yticklabels([str(x*100)+'% 'for x in ax2.get_yticks()])
# 设置图片标题样式
ax2.set_title("Strategy's of Max_drawdown performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax3 = fig.add_subplot(223)
# 设定横轴为对应分位,纵轴为对应指标
ax3.bar(range(len(indices)),
[self.evaluations[x]['sharpe'] for x in indices], 0.6, label = 'Sharpe')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax3.legend(loc='best',fontsize=15)
# 设置y标签样式
ax3.set_ylabel('Sharpe', fontsize=15)
# 设置x标签样式
ax3.set_yticklabels([str(x*100)+'% 'for x in ax3.get_yticks()])
# 设置图片标题样式
ax3.set_title("Strategy's of Sharpe performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax4 = fig.add_subplot(222)
# 设定横轴为对应分位,纵轴为对应指标
ax4.bar(range(len(indices)),
[self.evaluations[x]['algorithm_volatility'] for x in indices], 0.6, label = 'Algorithm_volatility')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax4.legend(loc='best',fontsize=15)
# 设置y标签样式
ax4.set_ylabel('Algorithm_volatility', fontsize=15)
# 设置x标签样式
ax4.set_yticklabels([str(x*100)+'% 'for x in ax4.get_yticks()])
# 设置图片标题样式
ax4.set_title("Strategy's of Algorithm_volatility performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
#14 年化回报和最大回撤,正负双色表示
def get_eval(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
# 大小
fig = plt.figure(figsize = (20, 8))
# 图1位置
ax = fig.add_subplot(111)
# 生成图超额收益率的最大回撤
ax.bar([x+0.3 for x in range(len(indices))],
[-self.evaluations[x]['max_drawdown'] for x in indices], color = '#32CD32',
width = 0.6, label = 'Max_drawdown', zorder=10)
# 图年化超额收益
ax.bar([x for x in range(len(indices))],
[self.evaluations[x]['annual_algo_return'] for x in indices], color = 'r',
width = 0.6, label = 'Annual_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 基准线
plt.plot([0, len(indices)], [0, 0], c='k',
linestyle='--', label='zero')
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 设置y标签样式
ax.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances of different quantile", fontsize=15)
# 设定x轴长度
plt.xlim(0, len(indices))
#14 超额收益的年化回报和最大回撤
# 加入新的benchmark后超额收益和
def get_excess_eval(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
# 大小
fig = plt.figure(figsize = (20, 8))
# 图1位置
ax = fig.add_subplot(111)
# 生成图超额收益率的最大回撤
ax.bar([x+0.3 for x in range(len(indices))],
[-self.excess_max_drawdown[x] for x in indices], color = '#32CD32',
width = 0.6, label = 'Excess_max_drawdown')
# 图年化超额收益
ax.bar([x for x in range(len(indices))],
[self.excess_annual_return[x] for x in indices], color = 'r',
width = 0.6, label = 'Excess_annual_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 基准线
plt.plot([0, len(indices)], [0, 0], c='k',
linestyle='--', label='zero')
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 设置y标签样式
ax.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances of different quantile", fontsize=15)
# 设定x轴长度
plt.xlim(0, len(indices))
def group_backtest(start_date,end_date,num):
warnings.filterwarnings("ignore")
pa = parameter_analysis()
pa.get_backtest_data(file_name = 'results1.pkl',
running_max = 10,
algorithm_id = '6e89f5f55c0d8578ad14b2e10007c90e', # 收益移动均值模型策略ID
start_date=start_date,
end_date=end_date,
frequency = 'day',
initial_cash = '10000000',
param_names = ['num'],
param_values = [num]
)
start_date = '2014-01-01'
end_date = '2019-01-01'
num = range(1,11)
group_backtest(start_date,end_date,num)
1.1.1 分层回测策略模型收益指标¶
pa = parameter_analysis()
pa.read_backtest_data('results1.pkl')
pa.evaluations_df
1.1.2 分层回测净值¶
为了进一步更直观的对 10 个组合进行分析,绘制了 10 个组合及 ZZ500 基准的净值收益曲线,具体下图所示。
pa.plot_returns()
由图可以看出,组合 1 能够明显跑赢组合 10,且组合 1 能够跑赢 ZZ500 指数,组合 1 能够明显获得更高的收益。可见符合单因子有效性的检验,即证明模型用于选股是有效的。
1.1.3 模型策略组合回测分析表¶
pa.get_eval4_bar()
上面几张表分析了每个投资组合的评价指标,根据年化收益、年化波动率、夏普比率及最大回撤分析,组合 1 的效果要远远好于组合 10,且基本上满足随着组合数的递增,收益能力下降且风险控制能力下降的趋势,由此符合单因子有效性的检验。
综上所述,排序最高的组别收益最高,并且分层效果明显。排序最低的组别收益最低,几乎跑输基准。从年度收益情况图也可知,排序较大的组别的年度收益较高且胜率也较高。由此可见,移动均值模型是有效的并且具有较好的分层能力。
1.2 不同预测周期¶
设定股票池为预测收益最高的组别,即预测前 10% 的股票池,分别测算了向前滚动 2-5 日的收益情况。
具体每个模型的收益情况如下所示。
def group_backtest(start_date,end_date,Day):
warnings.filterwarnings("ignore")
pa = parameter_analysis()
pa.get_backtest_data(file_name = 'results2.pkl',
running_max = 10,
algorithm_id = '6e89f5f55c0d8578ad14b2e10007c90e', # 收益移动均值模型策略ID
start_date=start_date,
end_date=end_date,
frequency = 'day',
initial_cash = '10000000',
param_names = ['Day'],
param_values = [Day]
)
start_date = '2014-01-01'
end_date = '2019-01-01'
Day = range(2,6)
group_backtest(start_date,end_date,Day)
1.2.1 不同预测期模型收益指标¶
pa = parameter_analysis()
pa.read_backtest_data('results2.pkl')
pa.evaluations_df
1.2.2 不同预测期模型回测净值¶
pa.plot_returns()
1.2.3 不同预测期模型策略组合回测分析表¶
pa.get_eval4_bar()
分别测算了向前滚动 2-5 日的收益情况。由图可见,收益最高的时间窗口为 3 个交易日。由图可见,时间区间较短的组别收益较好,收益最高的时间窗口为3 个交易日。反之,超过了 3 个交易日时间窗口的收益急速下降。由此可见,日预测的时间有效性较强。
综上结果来看,收益移动均值模型中向前滚动较短数据期且预测收益最高的组别为最佳组别。
1.2.4 最佳预测期模型不同年份收益指标¶
# 获取具体模型不同年份的收益概述
def get_profit_num_year(pa, num):
profit_year = {}
returns = pa.returns[num]
benchmark_returns = pa.benchmark_returns
temp = []
temp_bench = []
date_year = []
pchgYear = []
pchg = []
for i in range(len(pa.dates)-1):
tempPchg = (returns[i+1] + 1) / (returns[i] + 1) - 1
pchg.append(tempPchg)
if pa.dates[i][:4] != pa.dates[i+1][:4]:
pchgYear.append(pchg)
pchg = []
if i == len(pa.dates)-2:
pchgYear.append(pchg)
temp_vola = []
for i in range(len(pchgYear)):
a = pchgYear[i]
a = [np.power(j - np.mean(a), 2) for j in a]
vola = np.sqrt(np.sum(a)*250/(len(a) - 1))
temp_vola.append(vola)
for i in range(len(pa.dates)-1):
if pa.dates[i][:4] != pa.dates[i+1][:4]:
temp.append(returns[i])
temp_bench.append(benchmark_returns[i])
date_year.append(pa.dates[i][:4])
temp.append(returns[-1])
temp_bench.append(benchmark_returns[-1])
date_year.append(pa.dates[-1][:4])
temp1 = []
temp1.append(temp[0])
temp2 = []
temp2.append(temp_bench[0])
for i in range(len(temp)-1):
temp1.append((temp[i+1]+1)/(temp[i]+1)-1)
profit_year['model'] = temp1
temp2.append((temp_bench[i+1]+1)/(temp_bench[i]+1)-1)
profit_year['bench'] = temp2
result = pd.DataFrame(columns = date_year)
result.loc['model',:] = profit_year['model']
result.loc['bench',:] = profit_year['bench']
result.loc['excess',:] = result.loc['model',:] - result.loc['bench',:]
result.loc['vola',:] = temp_vola
result.loc['sharp',:] = (result.loc['model',:] - 0.04)/result.loc['vola',:]
result = result.T
return result
result = get_profit_num_year(pa, 2)
result
上表为根据 3 个交易日计算因子收益率,选择预测收益最高的组别时,对应模型的不同年份的收益概述,根据表中内容可知,模型在 5 年内有 3 年能够跑赢指数,且在 2015 年能够获得最高的超额收益,即为 63.98%。
2 收益加权移动均值模型¶
根据研报内容,收益加权移动均值模型的介绍如下:不同于收益移动平均值策略,收益加权移动平均值策略认为越近期的数据越能放映当前市场状况,从而预测效果越好。所以越新的数据权重越高,反之越低。计算公式如下所示:
前值的影响由 $\alpha$ 权重值决定,$\alpha$ 越大,近期值的占比越高。$\alpha$ 的取值方法有以下几种:
其中 $\lambda$ 为衰减的时间跨度,$\lambda$ 越小衰减的越快。本文 $\lambda$ 取 19。
本文基于该内容对此模型进行复现,复现过程如下所示。
策略步骤:
(1)按照日频进行因子的截面数据回归,从而获得因子的日频因子收益;
(2)向前滚动N 个交易日,计算N 个交易日的因子收益的加权平均值;
(3)用因子收益加权平均值作为因子收益的预测值,代入因子模型从而得出个股的收益预测值;
(4)个股按照预测值排序打分。
2.1 分层回测¶
为了测试一下收益加权移动均值模型的优劣,通过分层回测的方式对改模型进行分析。本文进行 10 层分层回测。
评价方法: 回测年化收益率、夏普比率、最大回撤、胜率等。
def group_backtest(start_date,end_date,num):
warnings.filterwarnings("ignore")
pa = parameter_analysis()
pa.get_backtest_data(file_name = 'results3.pkl',
running_max = 10,
algorithm_id = '92afb8ccceaecec3d0d947044190dc8f', # 收益加权移动均值模型策略ID
start_date=start_date,
end_date=end_date,
frequency = 'day',
initial_cash = '10000000',
param_names = ['num'],
param_values = [num]
)
start_date = '2014-01-01'
end_date = '2019-01-01'
num = range(1,11)
group_backtest(start_date,end_date,num)
2.1.1 分层回测策略模型收益指标¶
pa = parameter_analysis()
pa.read_backtest_data('results3.pkl')
pa.evaluations_df
2.1.2 分层回测净值¶
为了进一步更直观的对 10 个组合进行分析,绘制了 10 个组合及 ZZ500 基准的净值收益曲线,具体下图所示。
pa.plot_returns()
由图可以看出,组合 1 能够明显跑赢组合 10,且组合 1 能够跑赢 ZZ500 指数,组合 1 能够明显获得更高的收益。可见符合单因子有效性的检验,即证明模型用于选股是有效的。
2.1.3 模型策略组合回测分析表¶
pa.get_eval4_bar()
上面几张表分析了每个投资组合的评价指标,根据年化收益、年化波动率、夏普比率及最大回撤分析,组合 1 的效果要远远好于组合 10,且基本上满足随着组合数的递增,收益能力下降且风险控制能力下降的趋势,由此符合单因子有效性的检验。
综上所述,排序最高的组别收益最高,并且分层效果明显。排序最低的组别收益最低,几乎跑输基准。从年度收益情况图也可知,排序较大的组别的年度收益较高且胜率也较高。由此可见,收益加权移动均值模型是有效的并且具有较好的分层能力。
2.2 不同预测周期¶
设定股票池为预测收益最高的组别,即预测前 10% 的股票池,分别测算了向前滚动 2-5 日的收益情况。
具体每个模型的收益情况如下所示。
def group_backtest(start_date,end_date,num):
warnings.filterwarnings("ignore")
pa = parameter_analysis()
pa.get_backtest_data(file_name = 'results4.pkl',
running_max = 10,
algorithm_id = '92afb8ccceaecec3d0d947044190dc8f', # 收益加权移动均值模型策略ID
start_date=start_date,
end_date=end_date,
frequency = 'day',
initial_cash = '10000000',
param_names = ['Day'],
param_values = [Day]
)
start_date = '2014-01-01'
end_date = '2019-01-01'
Day = range(2,6)
group_backtest(start_date,end_date,Day)
2.2.1 不同预测期模型收益指标¶
pa = parameter_analysis()
pa.read_backtest_data('results4.pkl')
pa.evaluations_df
2.2.2 不同预测期模型回测净值¶
pa.plot_returns()
2.2.3 不同预测期模型策略组合回测分析表¶
pa.get_eval4_bar()
分别测算了向前滚动 2-5 日的收益情况。由图可见,收益最高的时间窗口为 3 个交易日。由图可见,时间区间较短的组别收益较好,收益最高的时间窗口为3 个交易日。反之,超过了 3 个交易日时间窗口的收益急速下降。由此可见,日预测的时间有效性较强。 综上结果来看,收益移动均值模型中向前滚动较短数据期且预测收益最高的组别为最佳组别。
2.2.4 最佳预测期模型不同年份收益指标¶
result = get_profit_num_year(pa, 2)
result
上表为根据 3 个交易日计算因子收益率,选择预测收益最高的组别时,对应模型的不同年份的收益概述,根据表中内容可知,模型在 5 年内有 3 年能够跑赢指数,且在 2015 年能够获得最高的超额收益,即为 63.30%。
3 残差模型¶
根据研报内容,残差模型的介绍如下:在多因子模型中残差收益为个股的特质收益,是不能用共同因子解释的那部分收益。特质收益越高也说明了该股票自身形势越好。所以从残差收益率的均值,或者是特质波动率角度来进行收益预测也是多因子预测模型的变体方法之一。
本文基于该内容对此模型进行复现,复现过程如下所示。
策略步骤:
(1)按照日频进行因子的截面数据回归,从而获得因子的日频因子收益,从而计算每只个股的日频残差收益;
(2)向前滚动 N 个交易日,计算每只个股 N 个交易日的残差收益平均值;
(3)个股按照残差收益平均值排序打分。
为了测试一下残差模型的优劣,通过分层回测的方式对改模型进行分析。本文进行 10 层分层回测。
评价方法: 回测年化收益率、夏普比率、最大回撤、胜率等。
def group_backtest(start_date,end_date,num):
warnings.filterwarnings("ignore")
pa = parameter_analysis()
pa.get_backtest_data(file_name = 'results5.pkl',
running_max = 10,
algorithm_id = '23e1655cfa99512807f77b5a3f7570f4', # 残差模型策略ID
start_date=start_date,
end_date=end_date,
frequency = 'day',
initial_cash = '10000000',
param_names = ['num'],
param_values = [num]
)
start_date = '2014-01-01'
end_date = '2019-01-01'
num = range(1,11)
group_backtest(start_date,end_date,num)
3.1 分层回测策略模型收益指标¶
pa = parameter_analysis()
pa.read_backtest_data('results5.pkl')
pa.evaluations_df
3.2 分层回测净值¶
为了进一步更直观的对 10 个组合进行分析,绘制了 10 个组合及 ZZ500 基准的净值收益曲线,具体下图所示。
pa.plot_returns()
从净值图中可见,分层效果并不明显,由此可知残差模型用于对收益进行预测效果较差,但最差组别收益明显较差可用于去除风险个股。
因此本文后续不再对残差模型的预测周期参数进行分析。
4 策略模型¶
综合以上分析结果,收益移动加权平均模型显得效果更好,因此接下来继续采用收益移动加权平均模型,根据预测结果并进行排序,然后选择排名靠前的 100 只股票进行买入,对应的模型的净值收益如下所示。
def group_backtest(start_date,end_date,num):
warnings.filterwarnings("ignore")
pa = parameter_analysis()
pa.get_backtest_data(file_name = 'results6.pkl',
running_max = 10,
algorithm_id = 'c765d00805d7db22c9cae4e4e7bb69af', # 收益移动加权均值 - 前 100 只股票 策略ID
start_date=start_date,
end_date=end_date,
frequency = 'day',
initial_cash = '10000000',
param_names = ['num'],
param_values = [num]
)
start_date = '2014-01-01'
end_date = '2019-01-01'
num = [100]
group_backtest(start_date,end_date,num)
pa = parameter_analysis()
pa.read_backtest_data('results6.pkl')
pa.evaluations_df
pa.plot_returns()
由上可知,当选择排名靠前的 100 只股票进行买入时,模型能够获得 110.65% 的收益,高于选择前 10% 股票所对应的的 75.62%,可见在股票数量减少的情况下,模型收益能够得到进一步增加。
接下来对模型相对于 ZZ500 指数的多空收益进行分析,结果如下图所示。
long_short = pa.plot_long_short_bench()
def MaxDrawdown(return_list):
'''最大回撤率'''
i = np.argmax((np.maximum.accumulate(return_list) - return_list) / np.maximum.accumulate(return_list)) # 结束位置
if i == 0:
return 0
j = np.argmax(return_list[:i]) # 开始位置
return (return_list[j] - return_list[i]) / (return_list[j])
def cal_indictor(long_short):
total_return = long_short[-1] / long_short[0] - 1
ann_return = pow((1+total_return), 250/float(len(long_short)))-1
pchg = []
#计算收益率
for i in range(1, len(long_short)):
pchg.append(long_short[i]/long_short[i-1] - 1)
temp = 0
for i in pchg:
temp += pow(i-mean(pchg), 2)
annualVolatility = sqrt(250/float((len(pchg)-1))*temp)
sharpe_ratio = (ann_return - 0.04)/annualVolatility
print "总收益: ", total_return
print "年化收益: ", ann_return
print "年化收益波动率: ", annualVolatility
print "夏普比率: ",sharpe_ratio
print "最大回撤: ",MaxDrawdown(long_short)
cal_indictor(long_short)
如上图所示,模型相对 ZZ500 指数的多空收益在 5 年间总收益为 31.07%,年化波动率为 9.56%,最大回撤为 14.61%,可见多空组合的波动性得到明显下降。
接下来对模型的每年收益指标进行分析。
result = get_profit_num_year(pa, 0)
result
上表为模型的不同年份的收益概述,根据表中内容可知,模型在 5 年内有 3 年能够跑赢指数,且在 2015 年能够获得最高的超额收益,即为 66.81%,跑输指数的幅度不高,均在 4% - 5% 左右,可见模型稳定性较高。
结论¶
通过对因子暴露度、因子相关强度和因子选股能力的判定,最终确定因子组合里包括换手率以及股本。为了研究不同模型的收益能力,本文基于回归模型的日频因子收益率,测算了三种收益预测模型:收益移动均值模型,收益加权移动均值模型,以及残差模型。
本文通过上述分析,得到了以下结论:
(1)收益移动均值模型:通过回归模型计算日频的因子收益率,然后向前滚动 N(N=20)个交易日,计算因子收益率均值,根据实时因子数据预测个股未来收益,根据 10 层分层回测结果来看,该方法分层能力较强,通过分析向前滚动交易日参数,结果表明该模型对短期数据较有效。
(2)收益加权移动均值模型:通过回归模型计算日频的因子收益率,然后向前滚动 N(N=20)个交易日,计算因子收益率加权均值,通过加入衰减系数来改变数据权重。从测试情况看,该方法分层能力比收益移动均值模型更强,且通过分析向前滚动交易日参数,结果表明该模型仍然对短期数据较有效。
(3)残差模型:通过回归模型计算所得的残差收益,然后向前滚动 N(N=20)个交易日,取其均值作为个股收益预测值。从测试情况看,该方法更适合去除风险个股。