列举了一些常用的函数,此帖会不断更新~~~
过滤停牌股票、ST股票、退市股票、新股等函数
## 过滤停牌股票
def paused_filter(security_list):
current_data = get_current_data()
security_list = [stock for stock in security_list if not current_data[stock].paused]
# 返回结果
return security_list
## 过滤退市股票
def delisted_filter(context, security_list):
if g.filter_delisted:
current_data = get_current_data()
security_list = [stock for stock in security_list if not (('退' in current_data[stock].name) or ('*' in current_data[stock].name))]
# 返回结果
return security_list
## 过滤ST股票
def st_filter(security_list):
current_data = get_current_data()
security_list = [stock for stock in security_list if not current_data[stock].is_st]
# 返回结果
return security_list
## 只有ST股票
def st_filter(security_list):
current_data = get_current_data()
security_list = [stock for stock in security_list if current_data[stock].is_st]
# 返回结果
return security_list
## 获取上市新股
def get_new_stocks(context):
df = get_all_securities(types=['stock'])
date=context.current_dt.date()
new_stocks = list(df[df.start_date == date].index)
return new_stocks
过滤停牌股票、ST股票、退市股票等函数
## 个股止损
def security_stoploss(context,loss=0.1):
if len(context.portfolio.positions)>0:
for stock in context.portfolio.positions.keys():
avg_cost = context.portfolio.positions[stock].avg_cost
current_price = context.portfolio.positions[stock].price
if 1 - current_price/avg_cost >= loss:
log.info(str(stock) ' 跌幅达个股止损线,平仓止损!')
order_target_value(stock, 0)
## 个股止盈
def security_stopprofit(context,profit=0.1):
if len(context.portfolio.positions)>0:
for stock in context.portfolio.positions.keys():
avg_cost = context.portfolio.positions[stock].avg_cost
current_price = context.portfolio.positions[stock].price
if current_price/avg_cost - 1 >= profit:
log.info(str(stock) ' 涨幅达个股止盈线,平仓止盈!')
order_target_value(stock, 0)
## 大盘止损
# 止损方法1:根据大盘指数N日均线进行止损
def index_stoploss_sicha(index, context, n=60):
'''
当大盘N日均线(默认60日)与昨日收盘价构成“死叉”,则清仓止损
'''
if len(context.portfolio.positions)>0:
hist = attribute_history(index, n 2, '1d', 'close', df=False)
temp1 = mean(hist['close'][1:-1])
temp2 = mean(hist['close'][0:-2])
close1 = hist['close'][-1]
close2 = hist['close'][-2]
if (close2 > temp2) and (close1 < temp1):
log.info('大盘触及止损线,清仓!')
for stock in context.portfolio.positions.keys():
order_target_value(stock, 0)
# 止损方法2:根据大盘指数跌幅进行止损
def index_stoploss_diefu(index, context, n=10, zs=0.03):
'''
当大盘N日内跌幅超过zs,则清仓止损
'''
if len(context.portfolio.positions)>0:
hist = attribute_history(index, n, '1d', 'close',df=False)
if ((1-float(hist['close'][-1]/hist['close'][0])) >= zs):
log.info('大盘触及止损线,清仓!')
for stock in context.portfolio.positions.keys():
order_target_value(stock, 0)
获取一段时期财务数据的季报、年报数据,及其总和与均值。
def get_fundamentals_sum_mean_value(security='000001.XSHE', search=income.basic_eps, count=5, frequency='quarter'):
'''
输入:
security: 要查询股票的代码
search: 要查询的字段,详情参考 API: get_fundamentals-查询财务数据
count: 单位时间长度,表示返回前几多少期的季报或者年报
frequency: 获取数据类型,'quarter'为季报,'year'为年报
输出:
sum_num: 总值
mean_num: 平均值
df: 一段时间内季报或者年报的 DataFrame
注:
对于年报数据, 我们目前只有 现金流表 和 利润表, 当查询其他表时, 会返回该年份最后一个季报的数据"
'''
import pandas as pd
def get_quarter(month):
if month in (1,2,3):
return 1
elif month in (4,5,6):
return 2
elif month in (7,8,9):
return 3
elif month in (10,11,12):
return 4
# 查询条件
q = query(
income.code,
income.statDate,
search,
).filter(
income.code.in_([security])
)
# 获取最近一次报表发布的日期
statDate_num = get_fundamentals(q)['statDate'][0]
# 获取最近一次报表发布所属的年份
year = datetime.datetime.strptime(statDate_num, "%Y-%m-%d").year
# 获取最近一次报表发布所属的月份
month = datetime.datetime.strptime(statDate_num, "%Y-%m-%d").month
# 获取最近一次报表发布日期所属的季度
qt = get_quarter(month)
# 获取季度列表
qt_list = range(qt,0,-1) range(4,0,-1)*(count/4 1)
qt_list = qt_list[:count]
# 查询时间列表
data_list = []
# 获取查询的名称
name = str(search).split('.')[-1]
# 列名称
colums_list = []
# 获取拼接后的 DataFrame
if frequency == 'quarter':
for num in range(len(qt_list)):
s = str(year) 'q' str(qt_list[num])
data_list.append(s)
colums_list.append(name '_' s)
if qt_list[num] == 1:
year -= 1
# 拼接列表
df = get_fundamentals(q, statDate = data_list[0])
df.set_index(df.code.values, inplace=True)
for t in data_list[1:]:
d = get_fundamentals(q, statDate = t)
d.set_index(d.code.values, inplace=True)
df = pd.concat([df[name],d[name]],axis = 1)
elif frequency == 'year':
for num in range(count):
year -= 1
s = str(year)
data_list.append(s)
colums_list.append(name '_' s)
# 拼接列表
df = get_fundamentals(q, statDate = data_list[0])
df.set_index(df.code.values, inplace=True)
for t in data_list[1:]:
d = get_fundamentals(q, statDate = t)
d.set_index(d.code.values, inplace=True)
df = pd.concat([df[name],d[name]],axis = 1)
else:
print "请输入正确的 frequency 参数,'quarter'为季报,'year'为年报. \n对于年报数据, 我们目前只有 现金流表 和 利润表, \n当查询其他表时, 会返回该年份最后一个季报的数据"
return
# 设置列名称
df.columns = colums_list[:len(df.iloc[0])]
# 计算总值
sum_num = sum(df.iloc[0])
# 计算平均值
mean_num = mean(df.iloc[0])
# 返回结果
return sum_num, mean_num, df
使用技术指标函数前,请现在策略中导入如下模块,以确保函数能正常运行
from jqdata import *
import numpy as np
import talib
具体函数如下:
# MACD
def MACD(security_list, fastperiod=12, slowperiod=26, signalperiod=9):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 MACD
security_data = history(slowperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
macd_DIF = {}; macd_DEA = {}; macd_HIST = {}
for stock in security_list:
nan_count = list(np.isnan(security_data[stock])).count(True)
if nan_count == len(security_data[stock]):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
macd_DIF[stock] = array([np.nan])
macd_DEA[stock] = array([np.nan])
macd_HIST[stock]= array([np.nan])
else:
macd_DIF[stock], macd_DEA[stock], macd = talib.MACDEXT(security_data[stock], fastperiod=fastperiod, fastmatype=1, slowperiod=slowperiod, slowmatype=1, signalperiod=signalperiod, signalmatype=1)
macd_HIST[stock] = macd * 2
return macd_DIF, macd_DEA, macd_HIST
# MA
def MA(security_list, timeperiod=5):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 MA
security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
ma = {}
for stock in security_list:
nan_count = list(np.isnan(security_data[stock])).count(True)
if nan_count == len(security_data[stock]):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
ma[stock] = array([np.nan])
else:
ma[stock] = talib.MA(security_data[stock], timeperiod)
return ma
# SMA
def SMA(security_list, timeperiod=5) :
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 SMA
security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
sma = {}
for stock in security_list:
close = np.nan_to_num(security_data[stock])
sma[stock] = reduce(lambda x, y: ((timeperiod - 1) * x y) / timeperiod, close)
return sma
# KDJ
def KDJ(security_list, fastk_period=5, slowk_period=3, fastd_period=3) :
def SMA_CN(close, timeperiod) :
close = np.nan_to_num(close)
return reduce(lambda x, y: ((timeperiod - 1) * x y) / timeperiod, close)
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 KDJ
n = max(fastk_period, slowk_period, fastd_period)
k = {}; d = {}; j = {}
for stock in security_list:
security_data = attribute_history(stock, n*2,'1d',fields=['high', 'low', 'close'], df=False)
nan_count = list(np.isnan(security_data['close'])).count(True)
if nan_count == len(security_data['close']):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
k[stock] = np.nan
d[stock] = np.nan
j[stock] = np.nan
else:
high = security_data['high']
low = security_data['low']
close = security_data['close']
kValue, dValue = talib.STOCHF(high, low, close, fastk_period, fastd_period, fastd_matype=0)
kValue = np.array(map(lambda x : SMA_CN(kValue[:x], slowk_period), range(1, len(kValue) 1)))
dValue = np.array(map(lambda x : SMA_CN(kValue[:x], fastd_period), range(1, len(kValue) 1)))
jValue = 3 * kValue - 2 * dValue
func = lambda arr : np.array([0 if x < 0 else (100 if x > 100 else x) for x in arr])
k[stock] = func(kValue)
d[stock] = func(dValue)
j[stock] = func(jValue)
return k, d, j
# RSI
def RSI(security_list, timeperiod=6):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 MA
security_data = history(timeperiod*20, '1d', 'close' , security_list, df=False, skip_paused=True)
rsi = {}
for stock in security_list:
nan_count = list(np.isnan(security_data[stock])).count(True)
if nan_count == len(security_data[stock]):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
rsi[stock] = array([np.nan])
else:
rsi[stock] = talib.RSI(security_data[stock], timeperiod)[-1]
return rsi
# CCI
def CCI(security_list, timeperiod=14):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 CCI
cci = {}
for stock in security_list:
security_data = attribute_history(stock, timeperiod*2, '1d',['close','high','low'] , df=False)
nan_count = list(np.isnan(security_data['close'])).count(True)
if nan_count == len(security_data['close']):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
cci[stock] = array([np.nan])
else:
close_CCI = security_data['close']
high_CCI = security_data['high']
low_CCI = security_data['low']
cci[stock] = talib.CCI(high_CCI, low_CCI, close_CCI, timeperiod)
return cci
# ATR
def ATR(security_list, timeperiod=14):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 ATR
atr = {}
for stock in security_list:
security_data = attribute_history(stock, timeperiod*2, '1d',['close','high','low'] , df=False)
nan_count = list(np.isnan(security_data['close'])).count(True)
if nan_count == len(security_data['close']):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
atr[stock] = array([np.nan])
else:
close_ATR = security_data['close']
high_ATR = security_data['high']
low_ATR = security_data['low']
atr[stock] = talib.ATR(high_ATR, low_ATR, close_ATR, timeperiod)
return atr
# 布林线
def Bollinger_Bands(security_list, timeperiod=5, nbdevup=2, nbdevdn=2):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 Bollinger Bands
security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
upperband={}; middleband={}; lowerband={}
for stock in security_list:
nan_count = list(np.isnan(security_data[stock])).count(True)
if nan_count == len(security_data[stock]):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
upperband[stock] = array([np.nan])
middleband[stock] = array([np.nan])
lowerband[stock] = array([np.nan])
else:
upperband[stock], middleband[stock], lowerband[stock] = talib.BBANDS(security_data[stock], timeperiod, nbdevup, nbdevdn)
return upperband, middleband, lowerband
# 平均成交额
def MA_MONEY(security_list, timeperiod=5):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 N 日平均成交额
security_data = history(timeperiod*2, '1d', 'money' , security_list, df=False, skip_paused=True)
mamoney={}
for stock in security_list:
nan_count = list(np.isnan(security_data[stock])).count(True)
if nan_count == len(security_data[stock]):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
mamoney[stock] = array([np.nan])
else:
mamoney[stock] = talib.MA(security_data[stock], timeperiod)
return mamoney
# 平均成交量
def MA_VOLUME(security_list, timeperiod=5):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 N 日平均成交量
security_data = history(timeperiod*2, '1d', 'volume' , security_list, df=False, skip_paused=True)
mavol={}
for stock in security_list:
nan_count = list(np.isnan(security_data[stock])).count(True)
if nan_count == len(security_data[stock]):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
mavol[stock] = array([np.nan])
else:
mavol[stock] = talib.MA(security_data[stock], timeperiod)
return mavol
# BIAS
def BIAS(security_list, timeperiod=5):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 BIAS
security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
bias = {}
for stock in security_list:
average_price = security_data[stock][-timeperiod:].mean()
current_price = security_data[stock][-1]
bias[stock]=(current_price-average_price)/average_price
return bias
# BBI
def BBI(security_list, timeperiod1=3, timeperiod2=6, timeperiod3=12, timeperiod4=24):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 BBI
security_data = history(timeperiod4*2, '1d', 'close' , security_list, df=False, skip_paused=True)
bbi={}
for stock in security_list:
x = security_data[stock]
d = (x[-timeperiod1:].mean() x[-timeperiod2:].mean() x[-timeperiod3:].mean() x[-timeperiod4:].mean())/4.0
bbi[stock] = d
return bbi
# TRIX
def TRIX(security_list, timeperiod=30):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 TRIX
security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
trix={}
for stock in security_list:
nan_count = list(np.isnan(security_data[stock])).count(True)
if nan_count == len(security_data[stock]):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
trix[stock] = array([np.nan])
else:
trix[stock] = talib.TRIX(security_data[stock], timeperiod)
return trix
# EMA
def EMA(security_list, timeperiod=30):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 EMA
security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
ema={}
for stock in security_list:
nan_count = list(np.isnan(security_data[stock])).count(True)
if nan_count == len(security_data[stock]):
log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
ema[stock] = array([np.nan])
else:
ema[stock] = talib.EMA(security_data[stock], timeperiod)
return ema
# DMA
def DMA(security_list,fastperiod=5,slowperiod=60,amaperiod=20):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 DMA
dma = {}
for security in security_list:
security_data = attribute_history(security, slowperiod*2, '1d', ['close'], skip_paused=True, df=False)['close']
m1 = map(lambda i: security_data[i-fastperiod 1:i 1],range(len(security_data))[len(security_data)-slowperiod:])
m2 = map(lambda i: security_data[i-slowperiod 1:i 1],range(len(security_data))[len(security_data)-slowperiod:])
MA1 = map(lambda x: mean(x), m1)
MA2 = map(lambda x: mean(x), m2)
DMA = array(MA1) - array(MA2)
dma[security] = DMA
return dma
# AMA
def AMA(security_list,fastperiod=5,slowperiod=60,amaperiod=20):
# 修复传入为单只股票的情况
if isinstance(security_list, str):
security_list = [security_list]
# 计算 DMA
ama = {}
for security in security_list:
security_data = attribute_history(security, slowperiod*2, '1d', ['close'], skip_paused=True, df=False)['close']
m1 = map(lambda i: security_data[i-fastperiod 1:i 1],range(len(security_data))[len(security_data)-slowperiod:])
m2 = map(lambda i: security_data[i-slowperiod 1:i 1],range(len(security_data))[len(security_data)-slowperiod:])
MA1 = map(lambda x: mean(x), m1)
MA2 = map(lambda x: mean(x), m2)
DMA = array(MA1) - array(MA2)
# 计算 AMA
a1 = map(lambda i: DMA[i-amaperiod 1:i 1],range(len(DMA))[-2:])
AMA = mean(a1)
ama[security] = AMA
return ama
from jqdata import *
HY_ZJH = get_industries(name='zjw')
from jqdata import *
HY_ZY =get_industries(name='jq_l1')
from jqdata import *
HY_ZY =get_industries(name='jq_l2')
from jqdata import *
SW1 = get_industries(name='sw_l1')
from jqdata import *
SW2 = get_industries(name='sw_l2')
from jqdata import *
SW3 = get_industries(name='sw_l3')
from jqdata import *
GN = get_concepts()
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...
移动端课程