请 [注册] 或 [登录]  | 返回主站

量化交易吧 /  量化平台 帖子:3365795 新帖:2

全市场策略单票回测引擎V1——《批量回测.ipynb》

今天你爆仓了吗发表于:5 月 9 日 21:02回复(1)

该模块用于快速对单票策略进行自动批量回测,具备断点续航能力。
另:聚宽单日回测次数上限是1000左右,需注意。

# coding: utf-8
'''全市场回测引擎 V1
   要求策略主体部分以 stock=g.security[0] 的形式获取标的代码
'''
import time
import json
import jqdata
import os
maxtasks = 18
stocks = get_all_securities()
taskids = dict()  # key(stock)-value(tid)
otaskids = dict()

# 用户需要在这里设置参数
showname_prefix = r'************'    # 回测名称前缀
algorithm_id = r'#############################'    # 策略ID
frequency = "minute"    # 回测周期
# 其它的参数请在 dotask() 里面修改代码

def dotask(stock):
    global showname_prefix, algorithm_id
    showname = showname_prefix + '-%s(%s)'%(stock.display_name[0],str(stock.index[0]))
    tid = create_backtest(algorithm_id = algorithm_id, \
                          start_date = stock.start_date[0], end_date = jqdata.get_trade_days(count=3)[-3], \
                          frequency = frequency, \
                          initial_cash = 100000, initial_positions = None, \
                          extras = {'security':[str(stock.index[0])]}, \
                          name = showname)
    print showname
    return tid


def reqtask(stock_code):
    global maxtasks,taskids,otaskids
    while True:
        if maxtasks - len(taskids) > 0:
            gt = get_all_securities()
            stock = gt[gt.index == stock_code]
            if stock.start_date[0] >= jqdata.get_trade_days(count=3)[-3]:
                return
            taskids[stock.index[0]] = dotask(stock)
            otaskids[stock.index[0]] = taskids[stock.index[0]]
            savefile()
            return
        else:
            time.sleep(60)  # 60秒后再重试
        for stock in [n for n in taskids]:
            t = get_backtest(taskids[stock])
            status = t.get_status()
            if status in ['done', 'failed', 'canceled', 'deleted']:
                del taskids[stock]
                if status in ['failed', 'canceled', 'deleted']:
                    res = {'failed':u'失败', 'canceled':u'取消', 'deleted':u'删除'}[status]
                    print '股票 %s 在回测时意外%s' % (stock, res)

                    
def savefile():
    global otaskids
    j = json.dumps(otaskids, indent = 4)
    write_file('alltids.txt', str(j))

    
def loadfile():
    global otaskids,taskid
    if os.path.exists('alltids.txt'):
        content = read_file('alltids.txt')
    else:
        content = '{}'
    otaskids = json.loads(content)
    for stock in otaskids:
        t = get_backtest(otaskids[stock])
        status = t.get_status()
        if status not in ['done']:
            print '%s is %s' % (stock, status)
        if status in ['none','running']:
            taskids[stock] = otaskids[stock] # 装入未完成的工作


loadfile()
print '加载完成。'
gt = get_all_securities()
for s in gt.index:
    if s not in otaskids:
        reqtask(stock_code = s)
    elif get_backtest(otaskids[s]).get_status() in ['deleted']:
        reqtask(stock_code = s)
# 部分回测或者全部回测完成后执行此 cell 将检查所有任务是否已完成并生成数据报表
import pandas as pd
import json
content = json.loads(read_file('alltids.txt'))
df = pd.DataFrame(content,index=['id']).T
df['state'] = [get_backtest(df.id[stock]).get_status() for stock in df.index]
df.to_csv('tids.csv')
 

全部回复

0/140

达人推荐

量化课程

    移动端课程