上次发布了一系列的绩效函数,后来自己看了一下,发现要用到别的策略里去还是有些麻烦,于是动手进行了一些简化,将所有代码封装成了一个独立的 MyTrader 类,里面封装了大部分我们在聚宽编写策略时反复要用到的一些功能,使用起来比直接调用聚宽的 api 更简单,诸如 按比例自动调仓、自动平均调仓等功能。
具体使用方法,在研究下面新建一个 MyTrader.py 文件,然后将下面代码复制到该文件,之后只要在你的策略代码的顶部 使用 from MyTrader import MyTrader 即可引入,然后在策略代码开始的第一行 加上 trader = MyTrader(context, g) 就行了,很简单。
重点 推荐 buy_amount() 函数,你们可以试试加到自己的策略里去,不少策略回测的效果会有显著提升哦 -v- ,其他的,看代码注释就好了,很易用。
===================
buy_value()、sell_value()、buy_amount()、sell_amount() 都是对官方现有的 order 系列的包装,所不同的是这buy_value() 函数买入时会更积极,怎么理解呢?比如有时候看日志,你经常能看到 买入 180 股被系统重置为 100 股,但是实际上我们既然要买,肯定会倾向买入 200 股,实际测试的时候我也发现,相同策略改用 buy_value() 后回测的收益会比 用 order 系列函数高不少。
以下是代码举例:
buy_value('000001.XSHE', 10000)
买入 10000 元 中国银行
buy_amount('000001.XSHE', 100)
买入 100 股 平安银行
sell_value('000001.XSHE', 10000)
卖出 10000 元的 平安银行,如果第二个参数为0,则会直接清空 平安银行 的持仓
sell_amount('000001.XSHE', 100)
卖出 100 股的 平安银行,如果第二个参数为0,则会直接清空 平安银行 的持仓
adjust_target_percent('000001.XSHE', 0.5)
将当前持有的 平安银行 仓位调整为当前总金额的 50%(总金额=持仓市值 可用资金),假设当前平安银行的持仓总值为60%,那么就会卖出 10% 的股票,如果是 40%,则会买入 10%,总之会尽量保证仓位为 50%,这里要注意,是尽量,不是一定会,因为有时候你可能钱不够,买不到。
第二个参数为0的话就直接清空该股票仓位
adjust_target_value('000001.XSHE', 10000)
类似上面的函数,会将 中国平安 的持仓总额调整到 10000元左右
adjust_auto_avg()
这个函数不接受任何参数,会自动根据持仓股票数量动态计算每只股票的目标仓位,然后多出的卖掉,少了的买入,反正会尽量让他们仓位保持均衡。
以上就是几个比较关键的函数用法,剩下的那些用的机会少,看参数也能明白是干什么的,我就不多写了。
# coding=utf-8
import math
from kuanke.user_space_api import *
from pandas import Series
class MyTrader(object):
def __init__(self, _context, _g, _print_log=True):
super(MyTrader, self).__init__()
self.__context = _context
self.__g = _g
self.__g.print_dict = {'sell': [], 'buy': []}
self.__print_log = _print_log
try:
self.__g.trade_statics
except:
self.__g.trade_statics = {
'total_count': 0, # 交易总次数(全部卖出时算一次交易)
'success_count': 0, # 交易成功次数(卖出时盈利即算一次成功)
'day_count': 0, # 交易天数(每天累加一次)
'win_percent': [], # 盈利数据
'loss_percent': [], # 亏损数据
'buy_history': Series(), # 持股历史记录
'order_list': [], # 自制订单列表,用 DataFrame 来做好了
}
try:
self.__g.stop_loss_list
except:
self.__g.stop_loss_list = {} # 止损列表,{'000001.SHGH': '2016-04-21'}
self.__g.stop_loss_count = 0 # 止损次数
self.__add_day_count()
# 买入指定数量的股票
def buy_amount(self, stock, amount):
if 100 <= amount:
self.__g.print_dict["buy"].append({"stock": str(stock), "amount": int(amount)})
order_result = order(stock, amount)
'''
if order_result is not None:
if stock in self.__g.trade_statics['buy_history']:
self.__g.trade_statics['buy_history'][stock] = 1
else:
self.__g.trade_statics['buy_history'][stock] = 1
'''
else:
self.log("买入 {2}({1}) amount 为 {0},不足100,无法买入".format(amount, stock, self.get_stock_name(stock)))
# 买入指定金额的股票
def buy_value(self, stock, value):
price = self.get_stock_price(stock)
if not math.isnan(price):
amount = math.floor(round(value / price) / 100) * 100
self.buy_amount(stock, amount)
else:
self.log("买入 {1}({0}) buy_value() 获得的 price 为 NAN,跳过该订单".format(stock, self.get_stock_name(stock)))
# 卖出指定数量的股票,数量为0表示清空当前持仓
def sell_amount(self, stock, amount):
if 0 == amount:
record_data = self.get_record_data(stock)
amount = self.get_stock_sellable_amount(stock)
self.__g.print_dict['sell'].append({"stock": str(stock), "amount": int(amount)})
order_result = order_target_value(stock, 0)
if order_result is not None:
self.__g.trade_statics['total_count'] = 1
if record_data['current_value'] > record_data['total_cost']:
self.__g.trade_statics['success_count'] = 1
self.__g.trade_statics['win_percent'].append([stock, record_data['percent'], self.get_today_date()])
else:
self.__g.trade_statics['loss_percent'].append(
[stock, record_data['percent'], self.get_today_date()])
else:
self.__g.print_dict['sell'].append({"stock": str(stock), "amount": int(amount)})
order(stock, -amount)
# 卖出指定金额的股票,金额为0表示清空当前持仓
def sell_value(self, stock, value):
if 0 == int(value):
amount = 0
else:
price = self.get_stock_price(stock)
if not math.isnan(price):
amount = math.floor(round(value / price) / 100) * 100
else:
self.log("卖出 {1}({0}) sell_value() 获得的 price 为 NAN ,跳过该订单".format(stock, self.get_stock_name(stock)))
return
self.sell_amount(stock, amount)
# 将股票的持仓调整到指定的百分比,target_percent不能大于1或者小于0,若为0则直接清空所有仓位
def adjust_target_percent(self, stock, target_percent):
if 0 > target_percent or 1 < target_percent:
self.log("股票 {1}({0}) target_percent 不能大于1或者小于0".format(stock, self.get_stock_name(stock)))
return
portfolio_value = self.get_portfolio_value()
total_amount = self.get_stock_total_amount(stock)
current_price = self.get_stock_last_sell_price(stock)
current_value = total_amount * current_price
target_value = portfolio_value * target_percent
if 0 == target_percent:
self.clean_position(stock)
return
if 0 == total_amount:
self.buy_value(stock, target_value)
return
if target_value > current_value:
self.buy_value(stock, target_value - current_value)
elif target_value < current_value:
self.sell_value(stock, current_value - target_value)
# 将股票持仓调整到指定的金额,target_value不能小于0,若为0则直接清空所有仓位
def adjust_target_value(self, stock, target_value):
if 0 > target_value:
self.log("股票 {1}({0}) target_value 不能小于0".format(stock, self.get_stock_name(stock)))
return
if 0 == target_value:
self.clean_position(stock)
return
current_value = self.get_stock_total_amount(stock) * self.get_stock_last_sell_price(stock)
if target_value > current_value:
self.buy_value(stock, target_value - current_value)
elif target_value < current_value:
self.sell_value(stock, current_value - target_value)
# 按持仓股票数量,自动将所有股票仓位调整到平均值
def adjust_auto_avg(self):
positions = self.get_current_positions()
position_count = len(positions)
if 0 >= position_count:
self.log("当前没有持仓,无法自动平均仓位")
return
avg_percent = round(1.0 / position_count, 3)
self.log("当前持仓数为 {0},所有股票仓位自动调整到 {1}%".format(position_count, avg_percent * 100))
for stock in positions.keys():
self.adjust_target_percent(stock, avg_percent)
# 个股止损,limit表示最大亏损百分百
def stock_stop_loss(self, stock, limit=8):
amount = self.get_stock_total_amount(stock)
total_cost = self.get_stock_avg_cost(stock) * amount
position_value = self.get_stock_last_sell_price(stock) * amount
if position_value > total_cost:
return
percent = round((position_value - total_cost) / total_cost * 100, 2)
if percent <= -limit:
# 卖出止损
self.log('{0}({1}) 亏损已经超过 {2}%,卖出止损'.format(self.get_stock_name(stock), stock, percent))
self.clean_position(stock)
self.__g.stop_loss_count = 1
self.__g.stop_loss_list[stock] = self.get_today_date()
# 清空当前所有可卖持仓
def clean_positions(self):
for stock in self.get_current_positions().keys():
self.clean_position(stock)
# 清空指定股票仓位
def clean_position(self, stock):
if self.has_position(stock):
self.sell_value(stock, 0)
else:
self.log("股票 {1}({0}) 当前没有持仓,无法清空".format(stock, self.get_stock_name(stock)))
# 指定股票是否拥有持仓
def has_position(self, stock):
return 0 < self.get_stock_total_amount(stock)
# 半仓处理
def half_positions(self):
for stock in self.get_current_positions().keys():
half_amount = self.get_half_amount(self.get_stock_sellable_amount(stock))
if half_amount is not None:
self.sell_amount(stock, half_amount)
# 打印出绩效报表
def print_report(self, end_date=None):
start_date = self.get_today_date()
if end_date is None or str(start_date) == str(end_date):
win_rate = self.compute_win_rate()
most_win = self.compute_most_win_percent()
most_loss = self.compute_most_loss_percent()
total_profit = self.compute_total_profit()
starting_cash = self.get_starting_cash()
total_yield = round(total_profit / starting_cash, 4)
trade_days = self.get_trade_day_count()
per_day_win_loss = 0 if 0 == total_profit or 0 == trade_days else round(total_profit / trade_days, 2)
per_win_loss = 0 if 0 == total_profit or 0 == self.__g.trade_statics['total_count'] else round(
total_profit / self.__g.trade_statics['total_count'])
print "-"
print '------------绩效报表------------'
print '总资产: {0}, 本金: {1}, 盈利: {2}'.format(starting_cash total_profit, starting_cash, total_profit)
print '交易天数: {0}, 交易次数: {1}, 盈利次数: {2}'.format(trade_days,
self.__g.trade_statics['total_count'],
self.__g.trade_statics['success_count'])
print '胜率: {2}, 日均盈亏: {0}, 每笔盈亏: {1}'.format(per_day_win_loss, per_win_loss, str(win_rate * 100) str('%'))
print '单次盈利最高: {0}, 盈利比例: {1}%({2})'.format(most_win['stock'], most_win['value'], most_win['date'])
print '单次亏损最高: {0}, 亏损比例: {1}%({2})'.format(most_loss['stock'], most_loss['value'], most_loss['date'])
print "总收益率: {0}%".format(total_yield * 100)
print '--------------------------------'
print "-"
# 计算当前胜率
def compute_win_rate(self):
win_rate = 0
if 0 < self.__g.trade_statics['total_count'] and 0 < self.__g.trade_statics['success_count']:
win_rate = round(self.__g.trade_statics['success_count'] / float(self.__g.trade_statics['total_count']), 2)
return win_rate
# 计算当前最高盈利比例
def compute_most_win_percent(self):
result = {'stock': None, 'value': None, 'date': None}
for statics in self.__g.trade_statics['win_percent']:
if result['value'] is None:
result['stock'] = statics[0]
result['value'] = statics[1]
result['date'] = statics[2]
else:
if statics[1] > result['value']:
result['stock'] = statics[0]
result['value'] = statics[1]
result['date'] = statics[2]
return result
# 计算当前最高亏损比例
def compute_most_loss_percent(self):
result = {'stock': None, 'value': None, 'date': None}
for statics in self.__g.trade_statics['loss_percent']:
if result['value'] is None:
result['stock'] = statics[0]
result['value'] = statics[1]
result['date'] = statics[2]
else:
if statics[1] < result['value']:
result['stock'] = statics[0]
result['value'] = statics[1]
result['date'] = statics[2]
return result
# 计算当前总盈利金额
def compute_total_profit(self):
return self.get_portfolio_value() - self.get_starting_cash()
def get_today_date(self):
return self.get_context().current_dt.strftime("%Y-%m-%d")
def get_today_week(self):
return self.get_context().current_dt.isoweekday()
# 获得股票上一分钟的价格
def get_pre_minute_price(self, stock):
__today = self.get_today_date()
__hour = self.get_context().current_dt.hour
__minute = self.get_context().current_dt.minute
__start_time = "{0} {1}:{2}:00".format(__today, __hour, __minute)
__end_time = "{0} {1}:{2}:59".format(__today, __hour, __minute)
return get_price(stock, __start_time, __end_time, '1m', 'close')['close'][0]
def get_current_positions(self):
return self.get_context().portfolio.positions
def get_stock_sellable_amount(self, stock):
return self.get_context().portfolio.positions[stock].sellable_amount
def get_stock_total_amount(self, stock):
return self.get_context().portfolio.positions[stock].total_amount
def get_stock_avg_cost(self, stock):
return self.get_context().portfolio.positions[stock].avg_cost
def get_stock_last_sell_price(self, stock):
return self.get_context().portfolio.positions[stock].last_sale_price
def get_starting_cash(self):
return self.get_context().portfolio.starting_cash
def get_trade_day_count(self):
return self.__g.trade_statics['day_count']
def get_portfolio_value(self):
return self.get_context().portfolio.portfolio_value
def get_context(self):
return self.__context
@staticmethod
def get_stock_name(stock):
return get_security_info(stock).display_name
@staticmethod
def get_stock_price(stock, interval=1):
h = attribute_history(stock, interval, unit='1d', fields='close', skip_paused=True)
return h['close'].values[0]
@staticmethod
def remove_paused(stock_list):
current_data = get_current_data(stock_list)
return [s for s in stock_list if not current_data[s].paused]
@staticmethod
def remove_st(stock_list):
current_data = get_current_data(stock_list)
return [s for s in stock_list if not current_data[s].is_st]
@staticmethod
def remove_cyb(stock_list):
return [s for s in stock_list if not s.startswith("300")]
# 取指定数量的一半
@staticmethod
def get_half_amount(amount):
if 100 >= amount:
return None
return math.ceil(round(amount / 2) / 100) * 100
# 预先获得要统计的数据,只有订单确实执行成功才记录
def get_record_data(self, stock):
total_amount = self.get_stock_total_amount(stock)
avg_cost = self.get_stock_avg_cost(stock)
last_price = self.get_stock_last_sell_price(stock)
current_value = total_amount * last_price
total_cost = total_amount * avg_cost
percent = round((current_value - total_cost) / total_cost * 100, 2)
return {'current_value': current_value, 'total_cost': total_cost, 'percent': percent}
def log(self, msg):
if self.__print_log:
print(msg)
def __add_day_count(self):
self.__g.trade_statics['day_count'] = 1