前文基于遗传规划自动挖掘因子讲解了怎么基于行情数据怎么自动生成因子表达式,但存在的问题是,需要手动根据表达式编辑因子的函数。本文在前文基础上,使用Python的exec和eval,基于表达式自动生成因子的函数,并计算了对应函数的因子值。再基于因子数据使用并行回测展示不同参数的回测结果。总结下步骤:
(1)获取指数数据行情数据;
(2)采用gplearn基于指数行情数据自动生成了因子表达式;
(3)使用exce和eval基于因子表达式自动生成表达式对应的因子函数;
(4)基于因子函数计算不同标的的因子值,并存储到研究中;
(5)创建回测策略,调用研究中因子数据进行回测;
(6)采用并行回测框架,展示不同参数的结果;
本文只是抛砖引玉,给出从原始行情数据生成因子并回测的过程,没有优化,欢迎留言讨论~
# 先根据指数数据生成表达式,再根据表达式计算因子数据,再根据计算的因子运行策略
# 需要自己安装下gplearn库
# 需要自己在研究中新建文件data
import numpy as np
import pandas as pd
import graphviz
from scipy.stats import rankdata
import pickle
from jqdata import *
import time
import datetime as dt
from gplearn import genetic
from gplearn.functions import make_function
from gplearn.genetic import SymbolicTransformer, SymbolicRegressor
from gplearn.fitness import make_fitness
import warnings
warnings.filterwarnings("ignore")
自定义make_function函数群¶
# 重写gplearn自带的make_function函数群
def _protected_division(x1, x2):
"""Closure of division (x1/x2) for zero denominator."""
with np.errstate(divide='ignore', invalid='ignore'):
return np.where(np.abs(x2) > 0.001, np.divide(x1, x2), 1.)
def _protected_sqrt(x1):
"""Closure of square root for negative arguments."""
return np.sqrt(np.abs(x1))
def _protected_log(x1):
"""Closure of log for zero arguments."""
with np.errstate(divide='ignore', invalid='ignore'):
return np.where(np.abs(x1) > 0.001, np.log(np.abs(x1)), 0.)
def _protected_inverse(x1):
"""Closure of log for zero arguments."""
with np.errstate(divide='ignore', invalid='ignore'):
return np.where(np.abs(x1) > 0.001, 1. / x1, 0.)
def _sigmoid(x1):
"""Special case of logistic function to transform to probabilities."""
with np.errstate(over='ignore', under='ignore'):
return 1 / (1 + np.exp(-x1))
# def _gp_add(data1, data2):
# return np.add(data1, data2)
# def _gp_sub(data1, data2):
# return np.subtract(data1, data2)
# def _gp_mul(data1, data2):
# return np.multiply(data1, data2)
# def _gp_div(data1, data2):
# return _protected_division(data1, data2)
# def _gp_max(data1, data2):
# return np.maximum(data1, data2)
# def _gp_min(data1, data2):
# return np.minimum(data1, data2)
def gp_sqrt(data):
return _protected_sqrt(data)
def gp_log(data):
return _protected_log(data)
def gp_neg(data):
return np.negative(data)
def gp_inv(data):
return _protected_inverse(data)
def gp_abs(data):
return np.abs(data)
def gp_sin(data):
return np.sin(data)
def gp_cos(data):
return np.cos(data)
def gp_tan(data):
return np.tan(data)
def gp_sig(data):
return _sigmoid(data)
# make_function函数群
# gp_add = make_function(function=_gp_add, name='gp_add', arity=2)
# gp_sub = make_function(function=_gp_sub, name='gp_sub', arity=2)
# gp_mul = make_function(function=_gp_mul, name='gp_mul', arity=2)
# gp_div = make_function(function=_gp_div, name='gp_div', arity=2)
# gp_max = make_function(function=_gp_max, name='gp_max', arity=2)
# gp_min = make_function(function=_gp_min, name='gp_min', arity=2)
gp_sqrt = make_function(function=gp_sqrt, name='gp_sqrt', arity=1)
gp_log = make_function(function=gp_log, name='gp_log', arity=1)
gp_neg = make_function(function=gp_neg, name='gp_neg', arity=1)
gp_inv = make_function(function=gp_inv, name='gp_inv', arity=1)
gp_abs = make_function(function=gp_abs, name='gp_abs', arity=1)
gp_sin = make_function(function=gp_sin, name='gp_sin', arity=1)
gp_cos = make_function(function=gp_cos, name='gp_cos', arity=1)
gp_tan = make_function(function=gp_tan, name='gp_tan', arity=1)
gp_sig = make_function(function=gp_sig, name='gp_sig', arity=1)
# 自定义函数, make_function函数群
def _rolling_rank(data):
value = rankdata(data)[-1]
return value
def _rolling_prod(data):
return np.prod(data)
def ts_sum(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).sum().tolist())
value = np.nan_to_num(value)
return value
def sma(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).mean().tolist())
value = np.nan_to_num(value)
return value
def stddev(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).std().tolist())
value = np.nan_to_num(value)
return value
def ts_rank(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(10).apply(_rolling_rank).tolist())
value = np.nan_to_num(value)
return value
def product(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(10).apply(_rolling_prod).tolist())
value = np.nan_to_num(value)
return value
def ts_min(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).min().tolist())
value = np.nan_to_num(value)
return value
def ts_max(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).max().tolist())
value = np.nan_to_num(value)
return value
def delta(data):
value = np.diff(data.flatten())
value = np.append(0, value)
return value
def delay(data):
period=1
value = pd.Series(data.flatten()).shift(1)
value = np.nan_to_num(value)
return value
def rank(data):
value = np.array(pd.Series(data.flatten()).rank().tolist())
value = np.nan_to_num(value)
return value
def scale(data):
k=1
data = pd.Series(data.flatten())
value = data.mul(1).div(np.abs(data).sum())
value = np.nan_to_num(value)
return value
def ts_argmax(data):
window=10
value = pd.Series(data.flatten()).rolling(10).apply(np.argmax) + 1
value = np.nan_to_num(value)
return value
def ts_argmin(data):
window=10
value = pd.Series(data.flatten()).rolling(10).apply(np.argmin) + 1
value = np.nan_to_num(value)
return value
# make_function函数群
delta = make_function(function=delta, name='delta', arity=1)
delay = make_function(function=delay, name='delay', arity=1)
rank = make_function(function=rank, name='rank', arity=1)
scale = make_function(function=scale, name='scale', arity=1)
sma = make_function(function=sma, name='sma', arity=1)
stddev = make_function(function=stddev, name='stddev', arity=1)
product = make_function(function=product, name='product', arity=1)
ts_rank = make_function(function=ts_rank, name='ts_rank', arity=1)
ts_min = make_function(function=ts_min, name='ts_min', arity=1)
ts_max = make_function(function=ts_max, name='ts_max', arity=1)
ts_argmax = make_function(function=ts_argmax, name='ts_argmax', arity=1)
ts_argmin = make_function(function=ts_argmin, name='ts_argmin', arity=1)
ts_sum = make_function(function=ts_sum, name='ts_sum', arity=1)
# init_function = [gp_add, gp_sub, gp_mul, gp_div,
# gp_sqrt, gp_log, gp_neg, gp_inv,
# gp_abs, gp_max, gp_min, gp_sin,
# gp_cos, gp_tan, gp_sig]
init_function = [gp_sqrt, gp_log, gp_neg, gp_inv,]
user_function = [delta, delay, rank, scale,
sma, stddev, product, ts_rank,
ts_min, ts_max, ts_argmax, ts_argmin,
ts_sum]
function_set = init_function + user_function
def _my_metric(y, y_pred, w):
value = np.sum(y + y_pred)
return value
my_metric = make_fitness(function=_my_metric, greater_is_better=True)
def get_data_for_model(index_code, end_date, pre_day_count, fields):
"""获取指数行情数据"""
# start_date = get_trade_days(end_date=end_date, count=pre_day_count)[0].strftime('%Y-%m-%d')
stock_price = get_price(index_code, end_date=end_date, count=pre_day_count, fq='post', fields=fields)
stock_price['pct'] = stock_price['close'].pct_change(periods=1)
# 规范化
stock_price = (stock_price - stock_price.mean())/stock_price.std()
data = np.nan_to_num(stock_price[fields].values)
target = np.nan_to_num(stock_price['pct'].values)
return data, target
def save_best_model(data, target, function_set, my_metric, fields):
"""根据指数行情数据形成模型的表达式"""
generations = 5
metric = my_metric
population_size = 100
random_state=0
est_gp = SymbolicTransformer(
feature_names=fields,
function_set=function_set,
generations=generations,
metric=metric,
population_size=population_size,
tournament_size=20,
random_state=random_state,
)
est_gp.fit(data, target)
# 将模型保存到本地
# with open('gp_model.pkl', 'wb') as f:
# pickle.dump(est_gp, f)
# 获取较优的表达式
best_programs = est_gp._best_programs
best_programs_dict = {}
for p in best_programs:
factor_name = 'alpha_' + str(best_programs.index(p) + 1)
best_programs_dict[factor_name] = {'fitness':p.fitness_, 'expression':str(p), 'depth':p.depth_, 'length':p.length_}
best_programs_dict = pd.DataFrame(best_programs_dict).T
best_programs_dict = best_programs_dict.sort_values(by='fitness')
best_programs_dict.to_csv('best_programs_dict.csv')
# print("保存表达式完成")
# 测试计算并保存模型
# # 设置起止时间
# pre_day_count = 100
# end_date = '2019-06-01'
# index_code = '000905.XSHG'
# fields = ['open', 'close', 'low', 'high', 'volume', 'factor', 'high_limit','low_limit', 'avg', 'pre_close', ]
# function_set = init_function + user_function
# data, target = get_data_for_model(index_code, end_date, pre_day_count, fields)
# save_best_model(data, target, function_set, my_metric, fields, function_set)
exec根据表达式计算因子¶
# 测试自动生成表达式
# 测试自动生成表达式
expr_init_func = """
import numpy as np
import pandas as pd
from scipy.stats import rankdata
def _protected_division(x1, x2):
'''Closure of division (x1/x2) for zero denominator.'''
with np.errstate(divide='ignore', invalid='ignore'):
return np.where(np.abs(x2) > 0.001, np.divide(x1, x2), 1.)
def _protected_sqrt(x1):
'''Closure of square root for negative arguments.'''
return np.sqrt(np.abs(x1))
def _protected_log(x1):
'''Closure of log for zero arguments.'''
with np.errstate(divide='ignore', invalid='ignore'):
return np.where(np.abs(x1) > 0.001, np.log(np.abs(x1)), 0.)
def _protected_inverse(x1):
'''Closure of log for zero arguments.'''
with np.errstate(divide='ignore', invalid='ignore'):
return np.where(np.abs(x1) > 0.001, 1. / x1, 0.)
def _sigmoid(x1):
'''Special case of logistic function to transform to probabilities.'''
with np.errstate(over='ignore', under='ignore'):
return 1 / (1 + np.exp(-x1))
def gp_add(data1, data2):
print(data1, data2)
return np.add(data1, data2)
def gp_sub(data1, data2):
return np.subtract(data1, data2)
def gp_mul(data1, data2):
return np.multiply(data1, data2)
def gp_div(data1, data2):
return _protected_division(data1, data2)
def gp_sqrt(data):
return _protected_sqrt(data)
def gp_log(data):
return _protected_log(data)
def gp_neg(data):
return np.negative(data)
def gp_inv(data):
return _protected_inverse(data)
def gp_abs(data):
return np.abs(data)
def gp_max(data1, data2):
return np.maximum(data1, data2)
def gp_min(data1, data2):
return np.minimum(data1, data2)
def gp_sin(data):
return np.sin(data)
def gp_cos(data):
return np.cos(data)
def gp_tan(data):
return np.tan(data)
def gp_sig(data):
return _sigmoid(data)
"""
expr_user_func = """
def _rolling_rank(data):
value = rankdata(data)[-1]
return value
def _rolling_prod(data):
return np.prod(data)
def ts_sum(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).sum().tolist())
value = np.nan_to_num(value)
return value
def sma(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).mean().tolist())
value = np.nan_to_num(value)
return value
def stddev(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).std().tolist())
value = np.nan_to_num(value)
return value
def ts_rank(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(10).apply(_rolling_rank).tolist())
value = np.nan_to_num(value)
return value
def product(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(10).apply(_rolling_prod).tolist())
value = np.nan_to_num(value)
return value
def ts_min(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).min().tolist())
value = np.nan_to_num(value)
return value
def ts_max(data):
window=10
value = np.array(pd.Series(data.flatten()).rolling(window).max().tolist())
value = np.nan_to_num(value)
return value
def delta(data):
value = np.diff(data.flatten())
value = np.append(0, value)
return value
def delay(data):
period=1
value = pd.Series(data.flatten()).shift(1)
value = np.nan_to_num(value)
return value
def rank(data):
value = np.array(pd.Series(data.flatten()).rank().tolist())
value = np.nan_to_num(value)
return value
def scale(data):
k=1
data = pd.Series(data.flatten())
value = data.mul(1).div(np.abs(data).sum())
value = np.nan_to_num(value)
return value
def ts_argmax(data):
window=10
value = pd.Series(data.flatten()).rolling(10).apply(np.argmax) + 1
value = np.nan_to_num(value)
return value
def ts_argmin(data):
window=10
value = pd.Series(data.flatten()).rolling(10).apply(np.argmin) + 1
value = np.nan_to_num(value)
return value
"""
expr_factor = """
def get_alpha(alphas, open, close, low, high, volume, factor, high_limit, low_limit, avg, pre_close):
stock_alphas = {}
for n in range(len(alphas)):
name = "alpha_" + str(n+1)
alpha = alphas['expression'][name]
try:
value = eval(alpha)
stock_alphas[name] = value[-1]
# print(name + " : {0}".format(alpha))
except Exception as e:
stock_alphas[name] = 'nan'
# print("{0}: {1} 运行失败, {2}".format(name, alpha, e))
return stock_alphas
def get_stocks_alphas(stock_list, end_date, pre_day_count):
''' 获取股票列表每个因子对应的因子值 '''
if pre_day_count < 10:
print("pre_day_count must bigger than 10")
alphas = pd.read_csv('best_programs_dict.csv', index_col=0)
fields = ['open', 'close', 'low', 'high', 'volume', 'factor', 'high_limit','low_limit', 'avg', 'pre_close', ]
stocks_factors_data = {}
for stock in stock_list:
stock_price = get_price(stock, end_date=end_date, count=pre_day_count, fq='post', fields=fields)
# stock_price['pct'] = stock_price['close'].pct_change(periods=1)
# 规范化
stock_price = (stock_price - stock_price.mean())/stock_price.std()
_open = np.nan_to_num(stock_price['open'].values)
_close = np.nan_to_num(stock_price['close'].values)
_low = np.nan_to_num(stock_price['low'].values)
_high = np.nan_to_num(stock_price['high'].values)
_volume = np.nan_to_num(stock_price['volume'].values)
_factor = np.nan_to_num(stock_price['factor'].values)
_high_limit = np.nan_to_num(stock_price['high_limit'].values)
_low_limit = np.nan_to_num(stock_price['low_limit'].values)
_avg = np.nan_to_num(stock_price['avg'].values)
_pre_close = np.nan_to_num(stock_price['pre_close'].values)
stock_alphas = get_alpha(alphas, _open, _close, _low, _high, _volume, _factor, _high_limit, _low_limit, _avg, _pre_close)
# print(stock_alphas)
stocks_factors_data[stock] = stock_alphas
stocks_factors_data = pd.DataFrame(stocks_factors_data).T
stocks_factors_data['alphas_sum'] = stocks_factors_data.sum(axis=1)
stocks_factors_data.to_csv('./data/' + end_date + '.csv')
"""
gp_factor_main = """
get_stocks_alphas(stock_list, end_date, pre_day_count)
"""
# 需要执行的exec组合
expr = expr_init_func + expr_user_func + expr_factor + gp_factor_main
# 测试计算一天的效果
# import pandas as pd
# stock_list = get_index_stocks('000905.XSHG')[:50]
# pre_day_count = 100
# # fields = ['open', 'close', 'low', 'high', 'volume', 'factor', 'high_limit','low_limit', 'avg', 'pre_close', ]
# exec(expr, {'stock_list':stock_list, 'end_date':end_date, 'pre_day_count':pre_day_count,})
# print("完成计算alphas")
# date = './data/2019-06-01.csv'
# day_alphas = pd.read_csv(date, index_col=0)
# day_alphas['alphas_sum'] = day_alphas.sum(axis=1)
# day_alphas = day_alphas.sort_values(by=['alphas_sum'])
# print(day_alphas.index.tolist()[-5:])
# day_alphas.head()
计算多日的数据¶
def get_days_alphas_factors(date_list, index_code, end_date, pre_day_count, fields, expr, function_set):
date_stocks_alphas = {}
for date in date_list:
date = date.strftime('%Y-%m-%d')
stock_list = get_index_stocks(index_code, date=date)
# 计算模型
data, target = get_data_for_model(index_code, end_date, pre_day_count, fields)
save_best_model(data, target, function_set, my_metric, fields)
time.sleep(1)
exec(expr, {'stock_list':stock_list, 'end_date':date, 'pre_day_count':pre_day_count,})
# 提取每天所有列表的所有alpha
date_file = './data/' + date + '.csv'
date_alphas = pd.read_csv(date_file, index_col=0)
date_stocks_alphas[date] = date_alphas
print("{0} 计算完成".format(date))
print("所有回测日期因子计算完成")
# date_stocks_alphas
# date_stocks_alphas['2019-08-12']['alphas_sum']
# # 根据前多少天数据计算因子
# pre_day_count = 15
# # 回测多长时间的,用于计算start_date
# backtest_count = 100
# end_date = dt.datetime.now().date().strftime('%Y-%m-%d')
# # end_date = '2019-09-01'
# index_code = '000905.XSHG'
# fields = ['open', 'close', 'low', 'high', 'volume', 'factor', 'high_limit','low_limit', 'avg', 'pre_close', ]
# date_list = get_trade_days(end_date=end_date, count=backtest_count)
# get_days_alphas_factors(date_list, index_code, end_date, pre_day_count, fields, expr, function_set)
# 根据前多少天数据计算因子
pre_day_count = 15
# 回测多长时间的,用于计算start_date
backtest_count = 50
# end_date = '2019-09-01'
end_date = dt.datetime.now().date().strftime('%Y-%m-%d')
start_date = get_trade_days(end_date=end_date, count=backtest_count)[0].strftime('%Y-%m-%d')
end_date = get_trade_days(end_date=end_date, count=backtest_count)[-2].strftime('%Y-%m-%d')
print('start_date {0}, end_date {1}'.format(start_date, end_date))
index_code = '000300.XSHG' # '000905.XSHG'
fields = ['open', 'close', 'low', 'high', 'volume', 'factor', 'high_limit','low_limit', 'avg', 'pre_close', ]
date_list = get_trade_days(end_date=end_date, count=backtest_count)
# 计算GP模型数据并存储
get_days_alphas_factors(date_list, index_code, end_date, pre_day_count, fields, expr, function_set)
使用并行回测框架回测¶
#1 先导入所需要的程序包
import datetime
import numpy as np
import pandas as pd
import time
from jqdata import *
from pandas import Series, DataFrame
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import copy
import pickle
# 定义类'参数分析'
class parameter_analysis(object):
# 定义函数中不同的变量
def __init__(self, algorithm_id=None):
self.algorithm_id = algorithm_id # 回测id
self.params_df = pd.DataFrame() # 回测中所有调参备选值的内容,列名字为对应修改面两名称,对应回测中的 g.XXXX
self.results = {} # 回测结果的回报率,key 为 params_df 的行序号,value 为
self.evaluations = {} # 回测结果的各项指标,key 为 params_df 的行序号,value 为一个 dataframe
self.backtest_ids = {} # 回测结果的 id
# 新加入的基准的回测结果 id,可以默认为空 '',则使用回测中设定的基准
self.benchmark_id = 'f16629492d6b6f4040b2546262782c78'
self.benchmark_returns = [] # 新加入的基准的回测回报率
self.returns = {} # 记录所有回报率
self.excess_returns = {} # 记录超额收益率
self.log_returns = {} # 记录收益率的 log 值
self.log_excess_returns = {} # 记录超额收益的 log 值
self.dates = [] # 回测对应的所有日期
self.excess_max_drawdown = {} # 计算超额收益的最大回撤
self.excess_annual_return = {} # 计算超额收益率的年化指标
self.evaluations_df = pd.DataFrame() # 记录各项回测指标,除日回报率外
self.failed_list= []
# 定义排队运行多参数回测函数
def run_backtest(self, #
algorithm_id=None, # 回测策略id
running_max=10, # 回测中同时巡行最大回测数量
start_date='2006-01-01', # 回测的起始日期
end_date='2016-11-30', # 回测的结束日期
frequency='day', # 回测的运行频率
initial_cash='1000000', # 回测的初始持仓金额
param_names=[], # 回测中调整参数涉及的变量
param_values=[], # 回测中每个变量的备选参数值
python_version = 2, # 回测的python版本
):
# 当此处回测策略的 id 没有给出时,调用类输入的策略 id
if algorithm_id == None: algorithm_id=self.algorithm_id
# 生成所有参数组合并加载到 df 中
# 包含了不同参数具体备选值的排列组合中一组参数的 tuple 的 list
param_combinations = list(itertools.product(*param_values))
# 生成一个 dataframe, 对应的列为每个调参的变量,每个值为调参对应的备选值
to_run_df = pd.DataFrame(param_combinations,dtype='object')
# 修改列名称为调参变量的名字
to_run_df.columns = param_names
# 设定运行起始时间和保存格式
start = time.time()
# 记录结束的运行回测
finished_backtests = {}
# 记录运行中的回测
running_backtests = {}
# 计数器
pointer = 0
# 总运行回测数目,等于排列组合中的元素个数
total_backtest_num = len(param_combinations)
# 记录回测结果的回报率
all_results = {}
# 记录回测结果的各项指标
all_evaluations = {}
# 在运行开始时显示
print(('【已完成|运行中|待运行】:'), end=' ')
# 当运行回测开始后,如果没有全部运行完全的话:
while len(finished_backtests)<total_backtest_num:
# 显示运行、完成和待运行的回测个数
print(('[%s|%s|%s].' % (len(finished_backtests),
len(running_backtests),
(total_backtest_num-len(finished_backtests)-len(running_backtests)) )), end=' ')
# 记录当前运行中的空位数量
to_run = min(running_max-len(running_backtests), total_backtest_num-len(running_backtests)-len(finished_backtests))
# 把可用的空位进行跑回测
for i in range(pointer, pointer+to_run):
# 备选的参数排列组合的 df 中第 i 行变成 dict,每个 key 为列名字,value 为 df 中对应的值
params = to_run_df.iloc[i].to_dict()
# 记录策略回测结果的 id,调整参数 extras 使用 params 的内容
backtest = create_backtest(algorithm_id = algorithm_id,
start_date = start_date,
end_date = end_date,
frequency = frequency,
initial_cash = initial_cash,
extras = params,
# 再回测中把改参数的结果起一个名字,包含了所有涉及的变量参数值
name = str(params),
python_version = python_version
)
# 记录运行中 i 回测的回测 id
running_backtests[i] = backtest
# 计数器计数运行完的数量
pointer = pointer+to_run
# 获取回测结果
failed = []
finished = []
# 对于运行中的回测,key 为 to_run_df 中所有排列组合中的序数
for key in list(running_backtests.keys()):
# 研究调用回测的结果,running_backtests[key] 为运行中保存的结果 id
back_id = running_backtests[key]
bt = get_backtest(back_id)
# 获得运行回测结果的状态,成功和失败都需要运行结束后返回,如果没有返回则运行没有结束
status = bt.get_status()
# 当运行回测失败
if status == 'failed':
# 失败 list 中记录对应的回测结果 id
print('')
print(('回测失败 : https://www.joinquant.com/algorithm/backtest/detail?backtestId='+back_id))
failed.append(key)
# 当运行回测成功时
elif status == 'done':
# 成功 list 记录对应的回测结果 id,finish 仅记录运行成功的
finished.append(key)
# 回测回报率记录对应回测的回报率 dict, key to_run_df 中所有排列组合中的序数, value 为回报率的 dict
# 每个 value 一个 list 每个对象为一个包含时间、日回报率和基准回报率的 dict
all_results[key] = bt.get_results()
# 回测回报率记录对应回测结果指标 dict, key to_run_df 中所有排列组合中的序数, value 为回测结果指标的 dataframe
all_evaluations[key] = bt.get_risk()
# 记录运行中回测结果 id 的 list 中删除失败的运行
for key in failed:
finished_backtests[key] = running_backtests.pop(key)
# 在结束回测结果 dict 中记录运行成功的回测结果 id,同时在运行中的记录中删除该回测
for key in finished:
finished_backtests[key] = running_backtests.pop(key)
# print (finished_backtests)
# 当一组同时运行的回测结束时报告时间
if len(finished_backtests) != 0 and len(finished_backtests) % running_max == 0 and to_run !=0:
# 记录当时时间
middle = time.time()
# 计算剩余时间,假设没工作量时间相等的话
remain_time = (middle - start) * (total_backtest_num - len(finished_backtests)) / len(finished_backtests)
# print 当前运行时间
print(('[已用%s时,尚余%s时,请不要关闭浏览器].' % (str(round((middle - start) / 60.0 / 60.0,3)),
str(round(remain_time / 60.0 / 60.0,3)))), end=' ')
self.failed_list += failed
# 5秒钟后再跑一下
time.sleep(5)
# 记录结束时间
end = time.time()
print('')
print(('【回测完成】总用时:%s秒(即%s小时)。' % (str(int(end-start)),
str(round((end-start)/60.0/60.0,2)))), end=' ')
# print (to_run_df,all_results,all_evaluations,finished_backtests)
# 对应修改类内部对应
# to_run_df = {key:value for key,value in returns.items() if key not in faild}
self.params_df = to_run_df
# all_results = {key:value for key,value in all_results.items() if key not in faild}
self.results = all_results
# all_evaluations = {key:value for key,value in all_evaluations.items() if key not in faild}
self.evaluations = all_evaluations
# finished_backtests = {key:value for key,value in finished_backtests.items() if key not in faild}
self.backtest_ids = finished_backtests
#7 最大回撤计算方法
def find_max_drawdown(self, returns):
# 定义最大回撤的变量
result = 0
# 记录最高的回报率点
historical_return = 0
# 遍历所有日期
for i in range(len(returns)):
# 最高回报率记录
historical_return = max(historical_return, returns[i])
# 最大回撤记录
drawdown = 1-(returns[i] + 1) / (historical_return + 1)
# 记录最大回撤
result = max(drawdown, result)
# 返回最大回撤值
return result
# log 收益、新基准下超额收益和相对与新基准的最大回撤
def organize_backtest_results(self, benchmark_id=None):
# 若新基准的回测结果 id 没给出
if benchmark_id==None:
# 使用默认的基准回报率,默认的基准在回测策略中设定
self.benchmark_returns = [x['benchmark_returns'] for x in self.results[0]]
# 当新基准指标给出后
else:
# 基准使用新加入的基准回测结果
self.benchmark_returns = [x['returns'] for x in get_backtest(benchmark_id).get_results()]
# 回测日期为结果中记录的第一项对应的日期
self.dates = [x['time'] for x in self.results[0]]
# 对应每个回测在所有备选回测中的顺序 (key),生成新数据
# 由 {key:{u'benchmark_returns': 0.022480100091729405,
# u'returns': 0.03184566700000002,
# u'time': u'2006-02-14'}} 格式转化为:
# {key: []} 格式,其中 list 为对应 date 的一个回报率 list
for key in list(self.results.keys()):
self.returns[key] = [x['returns'] for x in self.results[key]]
# 生成对于基准(或新基准)的超额收益率
for key in list(self.results.keys()):
self.excess_returns[key] = [(x+1)/(y+1)-1 for (x,y) in zip(self.returns[key], self.benchmark_returns)]
# 生成 log 形式的收益率
for key in list(self.results.keys()):
self.log_returns[key] = [log(x+1) for x in self.returns[key]]
# 生成超额收益率的 log 形式
for key in list(self.results.keys()):
self.log_excess_returns[key] = [log(x+1) for x in self.excess_returns[key]]
# 生成超额收益率的最大回撤
for key in list(self.results.keys()):
self.excess_max_drawdown[key] = self.find_max_drawdown(self.excess_returns[key])
# 生成年化超额收益率
for key in list(self.results.keys()):
self.excess_annual_return[key] = (self.excess_returns[key][-1]+1)**(252./float(len(self.dates)))-1
# 把调参数据中的参数组合 df 与对应结果的 df 进行合并
self.evaluations_df = pd.concat([self.params_df, pd.DataFrame(self.evaluations).T], axis=1)
# self.evaluations_df =
# 获取最总分析数据,调用排队回测函数和数据整理的函数
def get_backtest_data(self,
algorithm_id=None, # 回测策略id
benchmark_id=None, # 新基准回测结果id
file_name='results.pkl', # 保存结果的 pickle 文件名字
running_max=10, # 最大同时运行回测数量
start_date='2006-01-01', # 回测开始时间
end_date='2016-11-30', # 回测结束日期
frequency='day', # 回测的运行频率
initial_cash='1000000', # 回测初始持仓资金
param_names=[], # 回测需要测试的变量
param_values=[], # 对应每个变量的备选参数
python_version = 2
):
# 调运排队回测函数,传递对应参数
self.run_backtest(algorithm_id=algorithm_id,
running_max=running_max,
start_date=start_date,
end_date=end_date,
frequency=frequency,
initial_cash=initial_cash,
param_names=param_names,
param_values=param_values,
python_version = python_version
)
# 回测结果指标中加入 log 收益率和超额收益率等指标
self.organize_backtest_results(benchmark_id)
# 生成 dict 保存所有结果。
results = {'returns':self.returns,
'excess_returns':self.excess_returns,
'log_returns':self.log_returns,
'log_excess_returns':self.log_excess_returns,
'dates':self.dates,
'benchmark_returns':self.benchmark_returns,
'evaluations':self.evaluations,
'params_df':self.params_df,
'backtest_ids':self.backtest_ids,
'excess_max_drawdown':self.excess_max_drawdown,
'excess_annual_return':self.excess_annual_return,
'evaluations_df':self.evaluations_df,
"failed_list" : self.failed_list}
# 保存 pickle 文件
pickle_file = open(file_name, 'wb')
pickle.dump(results, pickle_file)
pickle_file.close()
# 读取保存的 pickle 文件,赋予类中的对象名对应的保存内容
def read_backtest_data(self, file_name='results.pkl'):
pickle_file = open(file_name, 'rb')
results = pickle.load(pickle_file)
self.returns = results['returns']
self.excess_returns = results['excess_returns']
self.log_returns = results['log_returns']
self.log_excess_returns = results['log_excess_returns']
self.dates = results['dates']
self.benchmark_returns = results['benchmark_returns']
self.evaluations = results['evaluations']
self.params_df = results['params_df']
self.backtest_ids = results['backtest_ids']
self.excess_max_drawdown = results['excess_max_drawdown']
self.excess_annual_return = results['excess_annual_return']
self.evaluations_df = results['evaluations_df']
self.failed_list = results['failed_list']
# 回报率折线图
def plot_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in list(self.returns.keys()):
ax.plot(list(range(len(self.returns[key]))), self.returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(list(range(len(self.benchmark_returns))), self.benchmark_returns, label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('returns',fontsize=20)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.returns[0]))
# 超额收益率图
def plot_excess_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in list(self.returns.keys()):
ax.plot(list(range(len(self.excess_returns[key]))), self.excess_returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(list(range(len(self.benchmark_returns))), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('excess returns',fontsize=20)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.excess_returns[0]))
# log回报率图
def plot_log_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in list(self.returns.keys()):
ax.plot(list(range(len(self.log_returns[key]))), self.log_returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(list(range(len(self.benchmark_returns))), [log(x+1) for x in self.benchmark_returns], label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('log returns',fontsize=20)
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.log_returns[0]))
# 超额收益率的 log 图
def plot_log_excess_returns(self):
# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;
fig = plt.figure(figsize=(20,8))
ax = fig.add_subplot(111)
# 作图
for key in list(self.returns.keys()):
ax.plot(list(range(len(self.log_excess_returns[key]))), self.log_excess_returns[key], label=key)
# 设定benchmark曲线并标记
ax.plot(list(range(len(self.benchmark_returns))), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='--')
ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]
plt.xticks(ticks, [self.dates[i] for i in ticks])
# 设置图例样式
ax.legend(loc = 2, fontsize = 10)
# 设置y标签样式
ax.set_ylabel('log excess returns',fontsize=20)
# 设置图片标题样式
ax.set_title("Strategy's performances with different parameters", fontsize=21)
plt.xlim(0, len(self.log_excess_returns[0]))
# 回测的4个主要指标,包括总回报率、最大回撤夏普率和波动
def get_eval4_bar(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
indices = set(sorted_params.index)-set(self.failed_list)
fig = plt.figure(figsize=(20,7))
# 定义位置
ax1 = fig.add_subplot(221)
# 设定横轴为对应分位,纵轴为对应指标
ax1.bar(list(range(len(indices))),
[self.evaluations[x]['algorithm_return'] for x in indices], 0.6, label = 'Algorithm_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax1.legend(loc='best',fontsize=15)
# 设置y标签样式
ax1.set_ylabel('Algorithm_return', fontsize=15)
# 设置y标签样式
ax1.set_yticklabels([str(x*100)+'% 'for x in ax1.get_yticks()])
# 设置图片标题样式
ax1.set_title("Strategy's of Algorithm_return performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax2 = fig.add_subplot(224)
# 设定横轴为对应分位,纵轴为对应指标
ax2.bar(list(range(len(indices))),
[self.evaluations[x]['max_drawdown'] for x in indices], 0.6, label = 'Max_drawdown')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax2.legend(loc='best',fontsize=15)
# 设置y标签样式
ax2.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax2.set_yticklabels([str(x*100)+'% 'for x in ax2.get_yticks()])
# 设置图片标题样式
ax2.set_title("Strategy's of Max_drawdown performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax3 = fig.add_subplot(223)
# 设定横轴为对应分位,纵轴为对应指标
ax3.bar(list(range(len(indices))),
[self.evaluations[x]['sharpe'] for x in indices], 0.6, label = 'Sharpe')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax3.legend(loc='best',fontsize=15)
# 设置y标签样式
ax3.set_ylabel('Sharpe', fontsize=15)
# 设置x标签样式
ax3.set_yticklabels([str(x*100)+'% 'for x in ax3.get_yticks()])
# 设置图片标题样式
ax3.set_title("Strategy's of Sharpe performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
# 定义位置
ax4 = fig.add_subplot(222)
# 设定横轴为对应分位,纵轴为对应指标
ax4.bar(list(range(len(indices))),
[self.evaluations[x]['algorithm_volatility'] for x in indices], 0.6, label = 'Algorithm_volatility')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax4.legend(loc='best',fontsize=15)
# 设置y标签样式
ax4.set_ylabel('Algorithm_volatility', fontsize=15)
# 设置x标签样式
ax4.set_yticklabels([str(x*100)+'% 'for x in ax4.get_yticks()])
# 设置图片标题样式
ax4.set_title("Strategy's of Algorithm_volatility performances of different quantile", fontsize=15)
# x轴范围
plt.xlim(0, len(indices))
#14 年化回报和最大回撤,正负双色表示
def get_eval(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
indices = set(sorted_params.index)-set(self.failed_list)
# 大小
fig = plt.figure(figsize = (20, 8))
# 图1位置
ax = fig.add_subplot(111)
# 生成图超额收益率的最大回撤
ax.bar([x+0.3 for x in range(len(indices))],
[-self.evaluations[x]['max_drawdown'] for x in indices], color = '#32CD32',
width = 0.6, label = 'Max_drawdown', zorder=10)
# 图年化超额收益
ax.bar([x for x in range(len(indices))],
[self.evaluations[x]['annual_algo_return'] for x in indices], color = 'r',
width = 0.6, label = 'Annual_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 基准线
plt.plot([0, len(indices)], [0, 0], c='k',
linestyle='--', label='zero')
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 设置y标签样式
ax.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances of different quantile", fontsize=15)
# 设定x轴长度
plt.xlim(0, len(indices))
#14 超额收益的年化回报和最大回撤
# 加入新的benchmark后超额收益和
def get_excess_eval(self, sort_by=[]):
sorted_params = self.params_df
for by in sort_by:
sorted_params = sorted_params.sort(by)
indices = sorted_params.index
indices = set(sorted_params.index)-set(self.failed_list)
# 大小
fig = plt.figure(figsize = (20, 8))
# 图1位置
ax = fig.add_subplot(111)
# 生成图超额收益率的最大回撤
ax.bar([x+0.3 for x in range(len(indices))],
[-self.excess_max_drawdown[x] for x in indices], color = '#32CD32',
width = 0.6, label = 'Excess_max_drawdown')
# 图年化超额收益
ax.bar([x for x in range(len(indices))],
[self.excess_annual_return[x] for x in indices], color = 'r',
width = 0.6, label = 'Excess_annual_return')
plt.xticks([x+0.3 for x in range(len(indices))], indices)
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 基准线
plt.plot([0, len(indices)], [0, 0], c='k',
linestyle='--', label='zero')
# 设置图例样式
ax.legend(loc='best',fontsize=15)
# 设置y标签样式
ax.set_ylabel('Max_drawdown', fontsize=15)
# 设置x标签样式
ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])
# 设置图片标题样式
ax.set_title("Strategy's performances of different quantile", fontsize=15)
# 设定x轴长度
plt.xlim(0, len(indices))
#2 设定回测的 策略id
pa = parameter_analysis('9fddae058dc7edb14974319f077ba37d')
#3 运行回测
pa.get_backtest_data(file_name = 'results.pkl', # 保存回测结果的Pickle文件名
running_max = 10, # 同时回测的最大个数,可以通过积分商城兑换
benchmark_id = None, # 基准的回测ID,注意是回测ID而不是策略ID,为None时为策略中使用的基准
start_date = start_date, #回测开始时间
end_date = end_date, #回测结束时间
frequency = 'day', #回测频率,支持 day, minute, tick
initial_cash = '2000000', #初始资金
param_names = ['sell_time', 'period'], #变量名称
param_values = [['09:31', '14:55'], [1, 5]], #变量对应的参数
python_version = 3 # 回测python版本
)
#4 数据读取
pa.read_backtest_data('results.pkl')
#6 查看回测结果指标
pa.evaluations_df
#7 回报率折线图
pa.plot_returns()