研究目的:
本文参考东吴证券研报《“订单簿的温度”系列研究(一):反转因子的精细结构》,根据研报分析,A股市场是订单驱动型市场。从动力学的角度讲,股票行情的所有演化过程,都能由订单簿(orderbook)自下而上精确决定。逐笔成交与逐笔委托数据的信息量非常丰富。本篇报告我们从最简单的数据入手,考察了“成交笔数”这个指标。所谓成交笔数,即撮合交易的次数,是从逐笔成交数据中汇总出来的统计量借助成交笔数的信息,对传统反转因子进行切割,首次提出一个理想反转因子, 实现对未来收益的预测,为订单簿因子挖掘提供了一定思路。
研究内容:
(1)研究在订单簿数据中挖掘 alpha 因子,考虑到传统反转因子在稳定性上的困难,本文认为传统反转因子存在动量效应与反转效应,因此借助单笔成交金额信息用于实现 W 切割,切割后形成的新因子称为理想反转因子。
(2)针对全 A 股数据,对理想反转因子进行单因子有效性测试,分别从因子有效性显著性检验、因子 IC 分析以及分层回测这三个角度分析因子有效性。
(3)进一步分析理想反转因子,分别就参数 N 的取值、样本空间的选择、因子收益的累积以及分组比例这四个角度对理想反转因子进行分析。
研究结论:
(1)对理想反转因子进行单因子有效性分析,根据因子收益率显著性检验结果,t 值绝对值序列的均值为 3.96,因子 IC 分析结果为 IC 序列均值为 0.0492,IR 值为 0.54,分层回测结果如下:组合 1 能够明显跑赢组合 5,且每个组合都能够跑赢 HS300 指数,且组合 1 能够吗明显获得更高的收益。
(2)对理想反转因子进行深入分析,当 N 分别取 20、40、60 时,IC 分析结果同样效果出色,但是相对而言,随着 N 取值越大,因子有效性不断降低,可见合理选择 N 的取值有利于获取对预测未来收益更为有效性的因子。
(3)针对 HS300 股票池,理想反转因子的五分组多空对冲总收益为 47.74%,年化波动 31.38%,夏普比率为 0.314,最大回撤
为 9.69%;原始反转因子 Ret20 的五分组多空对冲总收益为 16.22%,年化波动 53.88%,夏普比率为 -0.016,最大回撤为 20.88%。
(4)由于多空组合收益累积过程比较均匀,因此可以尝试做周频调仓或半月调仓。针对 N=60 的情况,当高 D 组的分组比例 X 取值在 25 - 50 之间时,均能够取得较好的结果,当 X 为 42 时,IC 值能够获得最大值。
注:个股每日成交的逐笔数据见 trades.csv
1 数据准备¶
1.1 日期列表获取¶
在每个月的月末对因子数据进行提取,因此需要对每个月的月末日期进行统计。
输入参数分别为 peroid、start_date 和 end_date,其中 peroid 进行周期选择,可选周期为周(W)、月(M)和季(Q),start_date和end_date 分别为开始日期和结束日期。
函数返回值为对应的月末日期。本文选取开始日期为 2013.1.1,结束日期为 2018.1.1。
from jqdata import *
import datetime
import pandas as pd
import numpy as np
from six import StringIO
import warnings
import time
import pickle
from jqfactor import winsorize_med
from jqfactor import neutralize
from jqfactor import standardlize
import statsmodels.api as sm
warnings.filterwarnings("ignore")
#获取指定周期的日期列表 'W、M、Q'
def get_period_date(peroid,start_date, end_date):
#设定转换周期period_type 转换为周是'W',月'M',季度线'Q',五分钟'5min',12天'12D'
stock_data = get_price('000001.XSHE',start_date,end_date,'daily',fields=['close'])
#记录每个周期中最后一个交易日
stock_data['date']=stock_data.index
#进行转换,周线的每个变量都等于那一周中最后一个交易日的变量值
period_stock_data=stock_data.resample(peroid,how='last')
date=period_stock_data.index
pydate_array = date.to_pydatetime()
date_only_array = np.vectorize(lambda s: s.strftime('%Y-%m-%d'))(pydate_array )
date_only_series = pd.Series(date_only_array)
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d")
start_date=start_date-datetime.timedelta(days=1)
start_date = start_date.strftime("%Y-%m-%d")
date_list=date_only_series.values.tolist()
date_list.insert(0,start_date)
return date_list
get_period_date('M','2017-01-01', '2018-01-01')
1.2 股票列表获取¶
股票池: 全 A 股
股票筛选:剔除 ST 股票,剔除上市 3 个月内的股票,每只股票视作一个样本
取 2016-08-31 当天的股票成分股
#去除上市距beginDate不足2个月的股票
def delect_stop(stocks,beginDate,n=30*2):
stockList = []
beginDate = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
for stock in stocks:
start_date = get_security_info(stock).start_date
if start_date < (beginDate-datetime.timedelta(days = n)).date():
stockList.append(stock)
return stockList
#获取股票池
def get_stock_A(begin_date):
begin_date = str(begin_date)
stockList = get_index_stocks('000002.XSHG',begin_date)+get_index_stocks('399107.XSHE',begin_date)
#剔除ST股
st_data = get_extras('is_st', stockList, count = 1, end_date=begin_date)
stockList = [stock for stock in stockList if not st_data[stock][0]]
#剔除停牌、新股及退市股票
stockList = delect_stop(stockList, begin_date)
return stockList
get_stock_A("2018-12-31")
1.3 数据获取¶
具体因子的计算步骤如下所示:
(1)在每个月底,对于股票 s 回溯其过去 N 个交易日的数据(为方便处理, N 取偶数);
(2)对于股票 s 逐日计算平均单笔成交金额 D(D 当日成交金额 当日成交笔数),将 N 个交易日按 D 值从大到小排序,前 N/2 个交易日称为高 D 组,后 N/2 个交易日称为低 D组;
(3)对于股票 s ,将高 D 组交易日的涨跌幅加总,得到因子 M_high;将低 D 组交易日的涨跌幅加总,得到因子 M_low;
(4)对于所有股票,分别按照上述流程计算因子值。
反转因子的计算公式如下所示:
begin_date = '2013-01-01'
end_date = '2018-01-01'
dateList = get_period_date('M',begin_date, end_date)
Trades = pd.read_csv("trades.csv", index_col = 0)
Trades.columns = [normalize_code(code) for code in Trades.columns]
Trades.index = [datetime.datetime.strptime(str(i), "%Y%m%d") for i in Trades.index]
factorData = {}
for date in dateList:
stockList = get_stock_A(date)
df_data = get_price(stockList, count = 21, end_date=date, frequency='1d', fields=['money','close'])
Amount = df_data["money"]
Amount = Amount.iloc[1:]
Pchg = df_data["close"].pct_change()
Pchg = Pchg.iloc[1:]
trade = Trades.loc[Pchg.index,Pchg.columns]
SingleAmount = Amount / trade
result = pd.DataFrame(index = SingleAmount.columns)
M_high = []
M_low = []
for i in SingleAmount.columns:
temp = SingleAmount.sort([i], ascending = False)
M_high.append((1+Pchg.loc[temp.index[:10], i]).cumprod()[-1] - 1)
M_low.append((1+Pchg.loc[temp.index[10:], i]).cumprod()[-1] - 1)
result["M_high"] = M_high
result["M_low"] = M_low
result["reverse"] = -1 *(result["M_high"] - result["M_low"])
result["ret20"] = df_data["close"].iloc[-1] / df_data["close"].iloc[0] - 1
factorData[date] = result
content = pickle.dumps(factorData)
write_file('factorData.pkl', content, append=False)
factorData['2017-12-31'].head()
2 单因子有效性分析¶
2.1 因子收益率显著性检验¶
主要通过 T 检验分析,根据APT模型,对历史数据进行进行多元线性回归,从而得到需要分析的因子收益率的 t 值,然后进行以下两个方面的分析:
(1)t 值绝对值序列的均值: 之所以要取绝对值,是因为只要 t 值显著不等于 0 即可以认为在当期,因子和收益率存在明显的相关性。但是这种相关性有的时候为正,有的时候为负,如果不取绝对值,则很多正负抵消,会低估因子的有效性;
(2)t 值绝对值序列大于2的比例: 检验 |t| > 2 的比例主要是为了保证 |t| 平均值的稳定性, 避免出现少数数值特别大的样本值拉高均值。
def factor_t_test(factorData, begin_date, end_date):
dateList = get_period_date('M', begin_date, end_date)
WLS_params = {}
WLS_t_test = {}
for date in dateList[:-1]:
R_T = pd.DataFrame()
#取股票池
stockList = list(factorData[date].index)
#获取横截面收益率
df_close = get_price(stockList, date, dateList[dateList.index(date)+1], 'daily', ['close'])
if df_close.empty:
continue
df_pchg=df_close['close'].iloc[-1,:]/df_close['close'].iloc[0,:]-1
R_T['pchg'] = df_pchg
#获取因子数据
factor_data = -1*factorData[date]["reverse"]
#factor_data = winsorize_med(factor_data, scale=1, inclusive=True, inf2nan=True, axis=0)
# 行业市值中性化
#factor_data = neutralize(factor_data, how=['sw_l1', 'market_cap'], date=dateList[0], axis=0)
#数据标准化
factor_data = standardlize(factor_data, inf2nan=True, axis=0)
R_T['factor'] = factor_data
R_T = R_T.dropna()
X = R_T['factor']
y = R_T['pchg']
# WLS回归
wls = sm.OLS(y, X)
result = wls.fit()
WLS_params[date] = result.params[-1]
WLS_t_test[date] = result.tvalues[-1]
t_test = pd.Series(WLS_t_test).dropna()
print 't值序列绝对值平均值: ',np.sum(np.abs(t_test.values))/len(t_test)
n = [x for x in t_test.values if np.abs(x)>2]
print 't值序列均值的绝对值除以t值序列的标准差: ',np.abs(t_test.mean())/t_test.std()
return WLS_t_test
WLS_t_test = factor_t_test(factorData, begin_date, end_date)
根据上面结果分析,t 值绝对值序列的均值为 3.96,符合大于 2 的特征,且 t 值绝对值序列大于 2 的比例为 59.27%,根据因子收益率显著性检验的标准,该因子为有效因子。
2.2 因子 IC 分析¶
因子 k 的 IC 值一般是指个股第T期在因子k上的暴露度与 T + 1期的收益率的相关系数。当得到因子 IC 值序列后,我们可以仿照上一小节 t 检验的分析方法进行计算:
(1)IC 值序列的均值及绝对值均值: 判断因子有效性;
(2)IC 值序列的标准差:判断因子稳定性;
(3)IC 值系列的均值与标准差比值(IR):分析分析有效性
(4)IC 值序列大于零(或小于零)的占比:判断因子效果的一致性。
import scipy.stats as st
def factor_IC_analysis(factorData, begin_date, end_date, rule='normal'):
dateList = get_period_date('M', begin_date, end_date)
IC = {}
R_T = pd.DataFrame()
for date in dateList[:-1]:
#取股票池
stockList = list(factorData[date].index)
#获取横截面收益率
df_close=get_price(stockList, date, dateList[dateList.index(date)+1], 'daily', ['close'])
if df_close.empty:
continue
df_pchg=df_close['close'].iloc[-1,:]/df_close['close'].iloc[0,:]-1
R_T['pchg']=df_pchg
#获取因子数据
factor_data = factorData[date]["reverse"]
#factor_data = winsorize_med(factor_data, scale=1, inclusive=True, inf2nan=True, axis=0)
# 行业市值中性化
#factor_data = neutralize(factor_data, how=['sw_l1', 'market_cap'], date=dateList[0], axis=0)
#数据标准化
factor_data = standardlize(factor_data, inf2nan=True, axis=0)
R_T['factor'] = factor_data
R_T = R_T.dropna()
if rule=='normal':
IC[date]=st.pearsonr(R_T.pchg, R_T['factor'])[0]
elif rule=='rank':
IC[date]=st.pearsonr(R_T.pchg.rank(), R_T['factor'].rank())[0]
IC = pd.Series(IC).dropna()
print 'IC 值序列的均值大小',IC.mean()
print 'IC值序列绝对值的均值大小',np.mean(np.abs(IC))
print 'IC 值序列的标准差',IC.std()
print 'IR 比率(IC值序列均值与标准差的比值)',IC.mean()/IC.std()
n = [x for x in IC.values if x>0]
print 'IC 值序列大于零的占比',len(n)/float(len(IC))
factor_IC_analysis(factorData, begin_date, end_date)
由上可知,IC 序列均值为 0.0492,IR 值为 0.54,IC 值序列大于 0 占比为 65%,由这几个指标可以看出,该因子收益预测稳定性较高,符合因子 IC 分析的筛选条件,判断该因子为有效因子。
2.3 分层回测¶
策略步骤:
(1)在每个月最后一个交易日,统计全 A 股反转因子值(M)的值;
(2)根据反转因子(M)值按照从小到大的顺序排序,并将其等分为 5 组
(3)每个调仓日对每组股票池进行调仓交易,从而获得 5 组股票组合的收益曲线
评价方法: 回测年化收益率、夏普比率、最大回撤、胜率等。
#1 先导入所需要的程序包
import datetime
import numpy as np
import pandas as pd
import time
from jqdata import *
from pandas import Series, DataFrame
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import copy
import pickle
# 定义类'参数分析'
class parameter_analysis(object):
# 定义函数中不同的变量
def __init__(self, algorithm_id=None):
self.algorithm_id = algorithm_id # 回测id
self.params_df = pd.DataFrame() # 回测中所有调参备选值的内容,列名字为对应修改面两名称,对应回测中的 g.XXXX
self.results = {} # 回测结果的回报率,key 为 params_df 的行序号,value 为
self.evaluations = {} # 回测结果的各项指标,key 为 params_df 的行序号,value 为一个 dataframe
self.backtest_ids = {} # 回测结果的 id
# 新加入的基准的回测结果 id,可以默认为空 '',则使用回测中设定的基准
self.benchmark_id = 'ae0684d86e9e7128b1ab9c7d77893029'
self.benchmark_returns = [] # 新加入的基准的回测回报率
self.returns = {} # 记录所有回报率
self.excess_returns = {} # 记录超额收益率
self.log_returns = {} # 记录收益率的 log 值
self.log_excess_returns = {} # 记录超额收益的 log 值
self.dates = [] # 回测对应的所有日期
self.excess_max_drawdown = {} # 计算超额收益的最大回撤
self.excess_annual_return = {} # 计算超额收益率的年化指标
self.evaluations_df = pd.DataFrame() # 记录各项回测指标,除日回报率外
# 定义排队运行多参数回测函数
def run_backtest(self, #
algorithm_id=None, # 回测策略id
running_max=10, # 回测中同时巡行最大回测数量
start_date='2006-01-01', # 回测的起始日期
end_date='2016-11-30', # 回测的结束日期
frequency='day', # 回测的运行频率
initial_cash='1000000', # 回测的初始持仓金额
param_names=[], # 回测中调整参数涉及的变量
param_values=[] # 回测中每个变量的备选参数值
):
# 当此处回测策略的 id 没有给出时,调用类输入的策略 id
if algorithm_id == None: algorithm_id=self.algorithm_id
# 生成所有参数组合并加载到 df 中
# 包含了不同参数具体备选值的排列组合中一组参数的 tuple 的 list
param_combinations = list(itertools.product(*param_values))
# 生成一个 dataframe, 对应的列为每个调参的变量,每个值为调参对应的备选值
to_run_df = pd.DataFrame(param_combinations)
# 修改列名称为调参变量的名字
to_run_df.columns = param_names
# 设定运行起始时间和保存格式
start = time.time()
# 记录结束的运行回测
finished_backtests = {}
# 记录运行中的回测
running_backtests = {}
# 计数器
pointer = 0
# 总运行回测数目,等于排列组合中的元素个数
total_backtest_num = len(param_combinations)
# 记录回测结果的回报率
all_results = {}
# 记录回测结果的各项指标
all_evaluations = {}
# 在运行开始时显示
print '【已完成|运行中|待运行】:',
# 当运行回测开始后,如果没有全部运行完全的话:
while len(finished_backtests)<total_backtest_num:
# 显示运行、完成和待运行的回测个数
print('[%s|%s|%s].' % (len(finished_backtests),
len(running_backtests),
(total_backtest_num-len(finished_backtests)-len(running_backtests)) )),
# 记录当前运行中的空位数量
to_run = min(running_max-len(running_backtests), total_backtest_num-len(running_backtests)-len(finished_backtests))
# 把可用的空位进行跑回测
for i in range(pointer, pointer+to_run):
# 备选的参数排列组合的 df 中第 i 行变成 dict,每个 key 为列名字,value 为 df 中对应的值
params = to_run_df.ix[i].to_dict()
# 记录策略回测结果的 id,调整参数 extras 使用 params 的内容
backtest = create_backtest(algorithm_id = algorithm_id,
start_date = start_date,
end_date = end_date,
frequency = frequency,
initial_cash = initial_cash,
extras = params,
# 再回测中把改参数的结果起一个名字,包含了所有涉及的变量参数值
name = str(params)
)
# 记录运行中 i 回测的回测 id
running_backtests[i] = backtest
# 计数器计数运行完的数量
pointer = pointer+to_run
# 获取回测结果
failed = []
finished = []
# 对于运行中的回测,key 为 to_run_df 中所有排列组合中的序数
for key in running_backtests.keys():
# 研究调用回测的结果,running_backtests[key] 为运行中保存的结果 id
bt = get_backtest(running_backtests[key])
# 获得运行回测结果的状态,成功和失败都需要运行结束后返回,如果没有返回则运行没有结束
status = bt.get_status()
# 当运行回测失败
if status == 'failed':
# 失败 list 中记录对应的回测结果 id
failed.append(key)
# 当运行回测成功时
elif status == 'done':
# 成功 list 记录对应的回测结果 id,finish 仅记录运行成功的
finished.append(key)
# 回测回报率记录对应回测的回报率 dict, key to_run_df 中所有排列组合中的序数, value 为回报率的 dict
# 每个 value 一个 list 每个对象为一个包含时间、日回报率和基准回报率的 dict
all_results[key] = bt.get_results()
# 回测回报率记录对应回测结果指标 dict, key to_run_df 中所有排列组合中的序数, value 为回测结果指标的 dataframe
all_evaluations[key] = bt.get_risk()
# 记录运行中回测结果 id 的 list 中删除失败的运行
for key in failed:
running_backtests.pop(key)
# 在结束回测结果 dict 中记录运行成功的回测结果 id,同时在运行中的记录中删除该回测
for key in finished:
finished_backtests[key] = running_backtests.pop(key)
# 当一组同时运行的回测结束时报告时间
if len(finished_backtests) != 0 and len(finished_backtests) % running_max == 0 and to_run !=0:
# 记录当时时间
middle = time.time()
# 计算剩余时间,假设没工作量时间相等的话
remain_time = (middle - start) * (total_backtest_num - len(finished_backtests)) / len(finished_backtests)
# print 当前运行时间
print('[已用%s时,尚余%s时,请不要关闭浏览器].' % (str(round((middle - start) / 60.0 / 60.0,3)),
str(round(remain_time / 60.0 / 60.0,3)))),
# 5秒钟后再跑一下
time.sleep(5)
# 记录结束时间
end = time.time()
print ''
print('【回测完成】总用时:%s秒(即%s小时)。' % (str(int(end-start)),
str(round((end-start)/60.0/60.0,2)))),
# 对应修改类内部对应
self.params_df = to_run_df
self.results = all_results
self.evaluations = all_evaluations
self.backtest_ids = finished_backtests
#7 最大回撤计算方法
def find_max_drawdown(self, returns):
# 定义最大回撤的变量
result = 0
# 记录最高的回报率点
historical_return = 0
# 遍历所有日期
for i in range(len(returns)):
# 最高回报率记录
historical_return = max(historical_return, returns[i])
# 最大回撤记录
drawdown = 1-(returns[i] + 1) / (historical_return + 1)
# 记录最大回撤
result = max(drawdown, result)
# 返回最大回撤值
return result
# log 收益、新基准下超额收益和相对与新基准的最大回撤
def organize_backtest_results(self, benchmark_id=None):
# 若新基准的回测结果 id 没给出
if benchmark_id==None:
# 使用默认的基准回报率,默认的基准在回测策略中设定
self.benchmark_returns = [x['benchmark_returns'] for x in self.results[0]]
# 当新基准指标给出后
else:
# 基准使用新加入的基准回测结果
self.benchmark_returns = [x['returns'] for x in get_backtest(benchmark_id).get_results()]
# 回测日期为结果中记录的第一项对应的日期
self.dates = [x['time'] for x in self.results[0]]
# 对应每个回测在所有备选回测中的顺序 (key),生成新数据
# 由 {key:{u'benchmark_returns': 0.022480100091729405,
# u'returns': 0.03184566700000002,
# u'time': u'2006-02-14'}} 格式转化为:
# {key: []} 格式,其中 list 为对应 date 的一个回报率 list
for key in self.results.keys():
self.returns[key] = [x['returns'] for x in self.results[key]]
# 生成对于基准(或新基准)的超额收益率
for key in self.results.keys():
self.excess_returns[key] = [(x+1)/(y+1)-1 for (x,y) in zip(self.returns[key], self.benchmark_returns)]
# 生成 log 形式的收益率
for key in self.results.keys():
self.log_returns[key] = [log(x+1) for x in self.returns[key]]
# 生成超额收益率的 log 形式
for key in self.results.keys():
self.log_excess_returns[key] = [log(x+1) for x in self.excess_returns[key]]
# 生成超额收益率的最大回撤
for key in self.results.keys():
self.excess_max_drawdown[key] = self.find_max_drawdown(self.excess_returns[key])
# 生成年化超额收益率
for key in self.results.keys():
self.excess_annual_return[key] = (self.excess_returns[key][-1]+1)**(252./float(len(self.dates)))-1
# 把调参数据中的参数组合 df 与对应结果的 df 进行合并
self.evaluations_df = pd.concat([self.params_df, pd.DataFrame(self.evaluations).T], axis=1)
# self.evaluations_df =
# 获取最总分析数据,调用排队回测函数和数据整理的函数
def get_backtest_data(self,
algorithm_id=None, # 回测策略id
benchmark_id=None, # 新基准回测结果id
file_name='results.pkl', # 保存结果的 pickle 文件名字
running_max=10, # 最大同时运行回测数量
start_date='2006-01-01', # 回测开始时间
end_date='2016-11-30', # 回测结束日期
frequency='day', # 回测的运行频率
initial_cash='1000000', # 回测初始持仓资金
param_names=[], # 回测需要测试的变量
param_values=[] # 对应每个变量的备选参数
):
# 调运排队回测函数,传递对应参数
self.run_backtest(algorithm_id=algorithm_id,
running_max=running_max,
start_date=start_date,
end_date=end_date,
frequency=frequency,
initial_cash=initial_cash,
param_names=param_names,
param_values=param_values
)
# 回测结果指标中加入 log 收益率和超额收益率等指标
self.organize_backtest_results(benchmark_id)
# 生成 dict 保存所有结果。
results = {'returns':self.returns,
'excess_returns':self.excess_returns,
'log_returns':self.log_returns,
'log_excess_returns':self.log_excess_returns,
'dates':self.dates,
'benchmark_returns':self.benchmark_returns,
'evaluations':self.evaluations,
'params_df':self.params_df,
'backtest_ids':self.backtest_ids,
'excess_max_drawdown':self.excess_max_drawdown,
'excess_annual_return':self.excess_annual_return,
'evaluations_df':self.evaluations_df}
# 保存 pickle 文件
pickle_file = open(file_name, 'wb')
pickle.dump(results, pickle_file)
pickle_file.close()
# 读取保存的 pickle 文件,赋予类中的对象名对应的保存内容
def read_backtest_data(self, file_name='results.pkl'):
pickle_file = open(file_name, 'rb')
results = pickle.load(pickle_file)
self.returns = results['returns']
self.excess_returns = results['excess_returns']
self.log_returns = results['log_returns']
self.log_excess_returns = results['log_excess_returns']
self.dates = results['dates']
self.benchmark_returns = results['benchmark_returns']
self.evaluations = results['evaluations']
self.params_df = results['params_df']
self.backtest_ids = results['backtest_ids']
self.excess_max_drawdown = results['excess_max_drawdown']
self.excess_annual_return = results['excess_annual_return']
self.evaluations_df = results['evaluations_df']
# 回报率折线图
def plot_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in self.returns.keys():
ax.plot(range(len(self.returns[key])), self.returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(range(len(self.benchmark_returns)), self.benchmark_returns, label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('returns',fontsize=20)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.returns[0]))
# 多空组合图
def plot_long_short(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
a1 = [i+1 for i in self.returns[0]]
a2 = [i+1 for i in self.returns[4]]
a1.insert(0,1)
a2.insert(0,1)
b = []
for i in range(len(a1)-1):
b.append((a1[i+1]/a1[i]-a2[i+1]/a2[i])/2)
c = []
c.append(1)
for i in range(len(b)):
c.append(c[i]*(1+b[i]))
ax.plot(range(len(c)), c)
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
ax.set_title("Strategy's long_short performances",fontsize=20)
# 设置图片标题样式
plt.xlim(0, len(c))
return c
# 获取不同年份的收益及排名分析
def get_profit_year(self):
profit_year = {}
for key in self.returns.keys():
temp = []
date_year = []
for i in range(len(self.dates)-1):
if self.dates[i][:4] != self.dates[i+1][:4]:
temp.append(self.returns[key][i])
date_year.append(self.dates[i][:4])
temp.append(self.returns[key][-1])
date_year.append(self.dates[-1][:4])
temp1 = []
temp1.append(temp[0])
for i in range(len(temp)-1):
temp1.append((temp[i+1]+1)/(temp[i]+1)-1)
profit_year[key] = temp1
result = pd.DataFrame(index = list(self.returns.keys()), columns = date_year)
for key in self.returns.keys():
result.loc[key,:] = profit_year[key]
return result
# 超额收益率图
def plot_excess_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in self.returns.keys():
ax.plot(range(len(self.excess_returns[key])), self.excess_returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(range(len(self.benchmark_returns)), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('excess returns',fontsize=20)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.excess_returns[0]))
# log回报率图
def plot_log_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in self.returns.keys():
ax.plot(range(len(self.log_returns[key])), self.log_returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(range(len(self.benchmark_returns)), [log(x+1) for x in self.benchmark_returns], label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('log returns',fontsize=20)
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.log_returns[0]))
# 超额收益率的 log 图
def plot_log_excess_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in self.returns.keys():
ax.plot(range(len(self.log_excess_returns[key])), self.log_excess_returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(range(len(self.benchmark_returns)), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('log excess returns',fontsize=20)
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.log_excess_returns[0]))
# 回测的4个主要指标,包括总回报率、最大回撤夏普率和波动
def get_eval4_bar(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
fig = plt.figure(figsize=(20,7))
# 定义位置
ax1 = fig.add_subplot(221)
# 设定横轴为对应分位,纵轴为对应指标
ax1.bar(range(len(indices)),
[self.evaluations[x]['algorithm_return'] for x in indices], 0.6, label = 'Algorithm_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax1.legend(loc='best',fontsize=15)
# 设置y标签样式
ax1.set_ylabel('Algorithm_return', fontsize=15)
# 设置y标签样式
ax1.set_yticklabels([str(x*100)+'% 'for x in ax1.get_yticks()])
# 设置图片标题样式
ax1.set_title("Strategy's of Algorithm_return performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax2 = fig.add_subplot(224)
# 设定横轴为对应分位,纵轴为对应指标
ax2.bar(range(len(indices)),
[self.evaluations[x]['max_drawdown'] for x in indices], 0.6, label = 'Max_drawdown')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax2.legend(loc='best',fontsize=15)
# 设置y标签样式
ax2.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax2.set_yticklabels([str(x*100)+'% 'for x in ax2.get_yticks()])
# 设置图片标题样式
ax2.set_title("Strategy's of Max_drawdown performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax3 = fig.add_subplot(223)
# 设定横轴为对应分位,纵轴为对应指标
ax3.bar(range(len(indices)),
[self.evaluations[x]['sharpe'] for x in indices], 0.6, label = 'Sharpe')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax3.legend(loc='best',fontsize=15)
# 设置y标签样式
ax3.set_ylabel('Sharpe', fontsize=15)
# 设置x标签样式
ax3.set_yticklabels([str(x*100)+'% 'for x in ax3.get_yticks()])
# 设置图片标题样式
ax3.set_title("Strategy's of Sharpe performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax4 = fig.add_subplot(222)
# 设定横轴为对应分位,纵轴为对应指标
ax4.bar(range(len(indices)),
[self.evaluations[x]['algorithm_volatility'] for x in indices], 0.6, label = 'Algorithm_volatility')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax4.legend(loc='best',fontsize=15)
# 设置y标签样式
ax4.set_ylabel('Algorithm_volatility', fontsize=15)
# 设置x标签样式
ax4.set_yticklabels([str(x*100)+'% 'for x in ax4.get_yticks()])
# 设置图片标题样式
ax4.set_title("Strategy's of Algorithm_volatility performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
#14 年化回报和最大回撤,正负双色表示
def get_eval(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
# 大小
fig = plt.figure(figsize = (20, 8))
# 图1位置
ax = fig.add_subplot(111)
# 生成图超额收益率的最大回撤
ax.bar([x+0.3 for x in range(len(indices))],
[-self.evaluations[x]['max_drawdown'] for x in indices], color = '#32CD32',
width = 0.6, label = 'Max_drawdown', zorder=10)
# 图年化超额收益
ax.bar([x for x in range(len(indices))],
[self.evaluations[x]['annual_algo_return'] for x in indices], color = 'r',
width = 0.6, label = 'Annual_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 基准线
plt.plot([0, len(indices)], [0, 0], c='k',
linestyle='--', label='zero')
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 设置y标签样式
ax.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances of different quantile", fontsize=15)
# 设定x轴长度
plt.xlim(0, len(indices))
#14 超额收益的年化回报和最大回撤
# 加入新的benchmark后超额收益和
def get_excess_eval(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
# 大小
fig = plt.figure(figsize = (20, 8))
# 图1位置
ax = fig.add_subplot(111)
# 生成图超额收益率的最大回撤
ax.bar([x+0.3 for x in range(len(indices))],
[-self.excess_max_drawdown[x] for x in indices], color = '#32CD32',
width = 0.6, label = 'Excess_max_drawdown')
# 图年化超额收益
ax.bar([x for x in range(len(indices))],
[self.excess_annual_return[x] for x in indices], color = 'r',
width = 0.6, label = 'Excess_annual_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 基准线
plt.plot([0, len(indices)], [0, 0], c='k',
linestyle='--', label='zero')
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 设置y标签样式
ax.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances of different quantile", fontsize=15)
# 设定x轴长度
plt.xlim(0, len(indices))
def group_backtest(start_date,end_date,num):
warnings.filterwarnings("ignore")
pa = parameter_analysis()
pa.get_backtest_data(file_name = 'results.pkl',
running_max = 10,
algorithm_id = 'df3c8774e33e3f94ad068574276d94a3',
start_date=start_date,
end_date=end_date,
frequency = 'day',
initial_cash = '10000000',
param_names = ['num'],
param_values = [num]
)
start_date = '2013-01-01'
end_date = '2018-01-01'
num = range(1,6)
group_backtest(start_date,end_date,num)
2.3.1 分层回测策略模型收益指标¶
pa = parameter_analysis()
pa.read_backtest_data('results.pkl')
pa.evaluations_df
2.3.2 分层回测净值¶
为了进一步更直观的对 5 个组合进行分析,绘制了 5 个组合及 HS300 基准的净值收益曲线,具体下图所示。
pa.plot_returns()
由图可以看出,组合 1 能够明显跑赢组合 5,且每个组合都能够跑赢 HS300 指数,且组合 1 能够吗明显获得更高的收益。可见符合单因子有效性的检验,即证明反转因子是有效的。
2.3.3 模型策略组合回测分析表¶
pa.get_eval4_bar()
pa.get_profit_year()
从 5 组的具体绩效分析来看,年化收益率以及夏普比率基本呈现出单调的走势,组合 1 的效果远远优于组合 5,且最大回撤也体现出组合 1 的风险控制能力更强。从各年的收益来看,组合 1 至组合 5 每一年基本上也呈现出单调的走势,可见因子在每一年都具有较好的选股效果,体现出因子有效性的稳定性。
2.3.4 多空组合净值¶
从分层组合回测净值曲线图来看,每个组合波动性较大,策略存在较大的风险,因此考虑建立多空组合。多空组合是买入组合 1、卖空组合 5 (月度调仓)的一个资产组合,为了方便统计,多空组合每日收益率为(组合 1 每日收益率 - 组合 5 每日收益率)/2,然后获得多空组合的净值收益曲线。
long_short = pa.plot_long_short()
def MaxDrawdown(return_list):
'''最大回撤率'''
i = np.argmax((np.maximum.accumulate(return_list) - return_list) / np.maximum.accumulate(return_list)) # 结束位置
if i == 0:
return 0
j = np.argmax(return_list[:i]) # 开始位置
return (return_list[j] - return_list[i]) / (return_list[j])
def cal_indictor(long_short):
total_return = long_short[-1] / long_short[0] - 1
ann_return = pow((1+total_return), 250/float(len(long_short)))-1
pchg = []
#计算收益率
for i in range(1, len(long_short)):
pchg.append(long_short[i]/long_short[i-1] - 1)
temp = 0
for i in pchg:
temp += pow(i-mean(pchg), 2)
annualVolatility = sqrt(250/float((len(pchg)-1))*temp)
sharpe_ratio = (ann_return - 0.04)/annualVolatility
print "总收益: ", total_return
print "年化收益: ", ann_return
print "年化收益波动率: ", annualVolatility
print "夏普比率: ",sharpe_ratio
print "最大回撤: ",MaxDrawdown(long_short)
cal_indictor(long_short)
如图所示,多空组合净值收益曲线明显比任何一个组合的波动性更低,能够获得更为稳定的收益,风险控制效果较好。
综上所述,从分层回测的分析来看,反转因子(M)有效性较强。
3 深入分析¶
3.1 参数 N 的敏感度¶
本文选择过去 N(N=20) 天的数据用于计算反转因子,但是参数 N 的选择对因子有效性的影响仍然不是非常清晰,因此接下来针对参数 N 的不同选择,对因子有效性进行分析。具体分析过程如下所示。
def GetData(N):
factordata = {}
for date in dateList:
stockList = get_stock_A(date)
df_data = get_price(stockList, count = N+1, end_date=date, frequency='1d', fields=['money','close'])
Amount = df_data["money"]
Amount = Amount.iloc[1:]
Pchg = df_data["close"].pct_change()
Pchg = Pchg.iloc[1:]
trade = Trades.loc[Pchg.index,Pchg.columns]
SingleAmount = Amount / trade
result = pd.DataFrame(index = SingleAmount.columns)
M_high = []
M_low = []
for i in SingleAmount.columns:
temp = SingleAmount.sort([i], ascending = False)
M_high.append((1+Pchg.loc[temp.index[:10], i]).cumprod()[-1] - 1)
M_low.append((1+Pchg.loc[temp.index[10:], i]).cumprod()[-1] - 1)
result["M_high"] = M_high
result["M_low"] = M_low
result["reverse"] = -1 *(result["M_high"] - result["M_low"])
result["ret"] = df_data["close"].iloc[-1] / df_data["close"].iloc[0] - 1
factordata[date] = result
return factordata
begin_date = '2013-01-01'
end_date = '2018-01-01'
dateList = get_period_date('M',begin_date, end_date)
Trades = pd.read_csv("trades.csv", index_col = 0)
Trades.columns = [normalize_code(code) for code in Trades.columns]
Trades.index = [datetime.datetime.strptime(str(i), "%Y%m%d") for i in Trades.index]
factorData_40 = GetData(40)
factorData_60 = GetData(60)
import scipy.stats as st
def factor_IC(factorData, begin_date, end_date, rule='normal'):
dateList = get_period_date('M', begin_date, end_date)
IC = {}
R_T = pd.DataFrame()
for date in dateList[:-1]:
#取股票池
stockList = list(factorData[date].index)
#获取横截面收益率
df_close=get_price(stockList, date, dateList[dateList.index(date)+1], 'daily', ['close'])
if df_close.empty:
continue
df_pchg=df_close['close'].iloc[-1,:]/df_close['close'].iloc[0,:]-1
R_T['pchg']=df_pchg
#获取因子数据
factor_data = factorData[date]["reverse"]
#factor_data = winsorize_med(factor_data, scale=1, inclusive=True, inf2nan=True, axis=0)
# 行业市值中性化
#factor_data = neutralize(factor_data, how=['sw_l1', 'market_cap'], date=dateList[0], axis=0)
#数据标准化
factor_data = standardlize(factor_data, inf2nan=True, axis=0)
R_T['factor'] = factor_data
R_T = R_T.dropna()
if rule=='normal':
IC[date]=st.pearsonr(R_T.pchg, R_T['factor'])[0]
elif rule=='rank':
IC[date]=st.pearsonr(R_T.pchg.rank(), R_T['factor'].rank())[0]
IC = pd.Series(IC).dropna()
return IC.mean()
IC_20 = factor_IC(factorData, begin_date, end_date)
IC_40 = factor_IC(factorData_40, begin_date, end_date)
IC_60 = factor_IC(factorData_60, begin_date, end_date)
print "N = 20 IC 均值: ", IC_20
print "N = 40 IC 均值: ", IC_40
print "N = 60 IC 均值: ", IC_60
当 N 分别取 20、40、60 时,IC 分析结果同样效果出色,但是相对而言,随着 N 取值越大,因子有效性不断降低,可见合理选择 N 的取值有利于获取对预测未来收益更为有效性的因子。
根据上述分析结果,当 N = 20 时,因子有效性最高。
3.2 其他样本空间的测试¶
本文进行了理想反转因子在全 A 股的测试,为了进一步证明因子的有效性,在其他样本空间,对该因子进行分析,针对五分组的情况构建多空组合,分析理想反转因子与原始反转因子 Ret20 之间的区别。
begin_date = '2013-01-01'
end_date = '2018-01-01'
dateList = get_period_date('M',begin_date, end_date)
HS300Data = {}
for date in dateList:
stockList = get_index_stocks('000300.XSHG',date)
df_data = get_price(stockList, count = 21, end_date=date, frequency='1d', fields=['money','close'])
Amount = df_data["money"]
Amount = Amount.iloc[1:]
Pchg = df_data["close"].pct_change()
Pchg = Pchg.iloc[1:]
trade = Trades.loc[Pchg.index,Pchg.columns]
SingleAmount = Amount / trade
result = pd.DataFrame(index = SingleAmount.columns)
M_high = []
M_low = []
for i in SingleAmount.columns:
temp = SingleAmount.sort([i], ascending = False)
M_high.append((1+Pchg.loc[temp.index[:10], i]).cumprod()[-1] - 1)
M_low.append((1+Pchg.loc[temp.index[10:], i]).cumprod()[-1] - 1)
result["M_high"] = M_high
result["M_low"] = M_low
result["reverse"] = -1 *(result["M_high"] - result["M_low"])
result["ret20"] = -1*(df_data["close"].iloc[-1] / df_data["close"].iloc[0] - 1)
HS300Data[date] = result
HS300Data[date].head()
def GetPchg(Filed):
pchg = []
for date in dateList[:-1]:
tempData = HS300Data[date].sort([Filed], ascending = False)
top5 = list(tempData.index[:60])
last5 = list(tempData.index[-60:])
df_close = get_price(top5, date, dateList[dateList.index(date)+1], 'daily', ['close'])['close']
top5_pchg = df_close.iloc[-1] / df_close.iloc[0] - 1
df_close = get_price(last5, date, dateList[dateList.index(date)+1], 'daily', ['close'])['close']
last5_pchg = df_close.iloc[-1] / df_close.iloc[0] - 1
pchg.append((mean(top5_pchg) - mean(last5_pchg)) / 2)
return pchg
pchgReverse = GetPchg('reverse')
pchRet20 = GetPchg('ret20')
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
netValue1 = []
netValue1.append(1)
netValue2 = []
netValue2.append(1)
for i in range(len(pchgReverse)):
netValue1.append(netValue1[i]*(1+pchgReverse[i]))
for i in range(len(pchRet20)):
netValue2.append(netValue2[i]*(1+pchRet20[i]))
ax.plot(range(len(netValue1)), netValue1)
ax.plot(range(len(netValue2)), netValue2)
ticks = [int(x) for x in np.linspace(0, len(dateList)-1, 11)]
plt.xticks(ticks, [dateList[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
ax.set_title("Strategy's long_short performances",fontsize=20)
原始反转因子 Ret20 与理想反转因子的五分组多空对冲净值走势如下所示。理想反转因子的五分组多空对冲总收益为 47.74%,年化波动 31.38%,夏普比率为 0.314,最大回撤为 9.69%;原始反转因子 Ret20 的五分组多空对冲总收益为 16.22%,年化波动 53.88%,夏普比率为 -0.016,最大回撤为 20.88%。
3.3 因子收益的累积过程¶
在本报告中,因子回测均采用月频调仓,但是更高频率的调仓可能也会有需求,针对该情况,本文进行因子收益的累积计算,分别计算从调仓日开始,第一个交易日至第二十个交易日多空组合的累积收益。
pchg = []
for i in range(0, len(long_short) - 20, 20):
tempPchg = []
for j in range(20):
tempPchg.append(long_short[i+j] / long_short[i] - 1)
pchg.append(tempPchg)
pchgTotal = []
for i in range(20):
pchgTotal.append(mean(np.array(pchg)[:,i]))
plt.bar(range(len(pchgTotal)), pchgTotal)
plt.show()
如上图所示,展示了 N=20 时理想反转因子在月初建仓后(全市场股票、分五组),多空对冲收益的累积过程。由于收益累积过程比较均匀,我们定性地判断,可以尝试做周频调仓或半月调仓。
3.4 分组比例的影响¶
在本文中,高 D 组与低 D 组的交易日各占回溯交易日的一半,也即 N/2 个。如果调整分组的比例,效果会有多大的区别呢?
接下来以 N=60 为例,将单笔成交金额大的 X 个交易日作为高 D 组,将剩余 60-X 个交易日作为低 D 组,遍历 X 的值,分别计算 M 因子的信息比率(IR)
begin_date = '2013-01-01'
end_date = '2018-01-01'
dateList = get_period_date('M',begin_date, end_date)
Trades = pd.read_csv("trades.csv", index_col = 0)
Trades.columns = [normalize_code(code) for code in Trades.columns]
Trades.index = [datetime.datetime.strptime(str(i), "%Y%m%d") for i in Trades.index]
factorData_60 = {}
for date in dateList:
stockList = get_stock_A(date)
df_data = get_price(stockList, count = 61, end_date=date, frequency='1d', fields=['money','close'])
Amount = df_data["money"]
Amount = Amount.iloc[1:]
Pchg = df_data["close"].pct_change()
Pchg = Pchg.iloc[1:]
trade = Trades.loc[Pchg.index,Pchg.columns]
SingleAmount = Amount / trade
result = pd.DataFrame(index = SingleAmount.columns)
for j in range(10,51,4):
M_high = []
M_low = []
for i in SingleAmount.columns:
temp = SingleAmount.sort([i], ascending = False)
M_high.append((1+Pchg.loc[temp.index[:j], i]).cumprod()[-1] - 1)
M_low.append((1+Pchg.loc[temp.index[j:], i]).cumprod()[-1] - 1)
result["M_high"] = M_high
result["M_low"] = M_low
result[j] = -1 *(result["M_high"] - result["M_low"])
factorData_60[date] = result
import scipy.stats as st
def factor_IC_analysis(factorData, begin_date, end_date):
dateList = get_period_date('M', begin_date, end_date)
R_T = pd.DataFrame()
result = []
for date in dateList[:-1]:
#取股票池
stockList = list(factorData[date].index)
#获取横截面收益率
df_close=get_price(stockList, date, dateList[dateList.index(date)+1], 'daily', ['close'])
if df_close.empty:
continue
df_pchg=df_close['close'].iloc[-1,:]/df_close['close'].iloc[0,:]-1
R_T['pchg']=df_pchg
IC = []
for j in range(10,51,4):
#获取因子数据
factor_data = factorData[date][j]
#数据标准化
factor_data = standardlize(factor_data, inf2nan=True, axis=0)
R_T['factor'] = factor_data
R_T = R_T.dropna()
IC.append(st.pearsonr(R_T.pchg, R_T['factor'])[0])
result.append(IC)
return result
result = factor_IC_analysis(factorData_60, begin_date, end_date)
xtick = range(10,51,4)
IC = []
for j in range(len(xtick)):
IC.append(np.mean(np.array(result)[:,j]))
plt.plot(xtick, IC)
结果如上图所示。从图中可以发现,当 X 取值在 25 - 50 之间时,均能够取得较好的结果,当 X 为 42 时,IC 值能够获得最大值。
总结¶
以上我们对理想反转因子进行了有效性分析的具体测试,初步得到以下几个结论:
(1)对理想反转因子进行单因子有效性分析,根据因子收益率显著性检验结果,t 值绝对值序列的均值为 3.96,因子 IC 分析结果为 IC 序列均值为 0.0492,IR 值为 0.54,分层回测结果如下:组合 1 能够明显跑赢组合 5,且每个组合都能够跑赢 HS300 指数,且组合 1 能够吗明显获得更高的收益。
(2)对理想反转因子进行深入分析,当 N 分别取 20、40、60 时,IC 分析结果同样效果出色,但是相对而言,随着 N 取值越大,因子有效性不断降低,可见合理选择 N 的取值有利于获取对预测未来收益更为有效性的因子。
(3)针对 HS300 股票池,理想反转因子的五分组多空对冲总收益为 47.74%,年化波动 31.38%,夏普比率为 0.314,最大回撤为 9.69%;原始反转因子 Ret20 的五分组多空对冲总收益为 16.22%,年化波动 53.88%,夏普比率为 -0.016,最大回撤为 20.88%。
(4)由于多空组合收益累积过程比较均匀,因此可以尝试做周频调仓或半月调仓。针对 N=60 的情况,当高 D 组的分组比例 X 取值在 25 - 50 之间时,均能够取得较好的结果,当 X 为 42 时,IC 值能够获得最大值。
(5)为挖掘订单簿信息提供了一定思路,给高频交易数据的挖掘提供一定参考价值。