请 [注册] 或 [登录]  | 返回主站

量化交易吧 /  量化平台 帖子:3364735 新帖:22

策略常用函数库、行业板块代码

我太难了发表于:5 月 10 日 07:26回复(1)

列举了一些常用的函数,此帖会不断更新~~~

函数来源:

  1. 社区用户无私的奉献
  2. JQ 员工产出

目录:

  • 一、函数:
    • 股票过滤
    • 风控函数
    • 财务数据
    • 技术指标
  • 二、行业概念代码
    • 证监会行业
    • 聚宽一级行业
      • 聚宽二级行业
    • 申万一级行业
    • 申万二级行业
    • 申万三级行业
    • 概念板块

一、函数

股票过滤

过滤停牌股票、ST股票、退市股票、新股等函数

## 过滤停牌股票
def paused_filter(security_list):
    current_data = get_current_data()
    security_list = [stock for stock in security_list if not current_data[stock].paused]
    # 返回结果
    return security_list

## 过滤退市股票
def delisted_filter(context, security_list):
    if g.filter_delisted:
        current_data = get_current_data()
        security_list = [stock for stock in security_list if not (('退' in current_data[stock].name) or ('*' in current_data[stock].name))]
    # 返回结果
    return security_list


## 过滤ST股票
def st_filter(security_list):
    current_data = get_current_data()
    security_list = [stock for stock in security_list if not current_data[stock].is_st]
    # 返回结果
    return security_list

## 只有ST股票
def st_filter(security_list):
    current_data = get_current_data()
    security_list = [stock for stock in security_list if current_data[stock].is_st]
    # 返回结果
    return security_list

## 获取上市新股
def get_new_stocks(context):
    df = get_all_securities(types=['stock'])
    date=context.current_dt.date()
    new_stocks = list(df[df.start_date == date].index)
    return new_stocks

风控函数

过滤停牌股票、ST股票、退市股票等函数

## 个股止损
def security_stoploss(context,loss=0.1):
    if len(context.portfolio.positions)>0:
        for stock in context.portfolio.positions.keys():
            avg_cost = context.portfolio.positions[stock].avg_cost
            current_price = context.portfolio.positions[stock].price
            if 1 - current_price/avg_cost >= loss:
                log.info(str(stock)   '  跌幅达个股止损线,平仓止损!')
                order_target_value(stock, 0)

## 个股止盈
def security_stopprofit(context,profit=0.1):
    if len(context.portfolio.positions)>0:
        for stock in context.portfolio.positions.keys():
            avg_cost = context.portfolio.positions[stock].avg_cost
            current_price = context.portfolio.positions[stock].price
            if current_price/avg_cost - 1 >= profit:
                log.info(str(stock)   '  涨幅达个股止盈线,平仓止盈!')
                order_target_value(stock, 0)

## 大盘止损
# 止损方法1:根据大盘指数N日均线进行止损
def index_stoploss_sicha(index, context, n=60):
    '''
    当大盘N日均线(默认60日)与昨日收盘价构成“死叉”,则清仓止损
    '''
    if len(context.portfolio.positions)>0:
        hist = attribute_history(index, n 2, '1d', 'close', df=False)
        temp1 = mean(hist['close'][1:-1])
        temp2 = mean(hist['close'][0:-2])
        close1 = hist['close'][-1]
        close2 = hist['close'][-2]
        if (close2 > temp2) and (close1 < temp1):
            log.info('大盘触及止损线,清仓!')
            for stock in context.portfolio.positions.keys():
                order_target_value(stock, 0)
# 止损方法2:根据大盘指数跌幅进行止损
def index_stoploss_diefu(index, context, n=10, zs=0.03):
    '''
    当大盘N日内跌幅超过zs,则清仓止损
    '''
    if len(context.portfolio.positions)>0:
        hist = attribute_history(index, n, '1d', 'close',df=False)
        if ((1-float(hist['close'][-1]/hist['close'][0])) >= zs):
            log.info('大盘触及止损线,清仓!')
            for stock in context.portfolio.positions.keys():
                order_target_value(stock, 0)

财务数据

获取一段时期财务数据的季报、年报数据,及其总和与均值。

def get_fundamentals_sum_mean_value(security='000001.XSHE', search=income.basic_eps, count=5, frequency='quarter'):
    '''
    输入:
        security:  要查询股票的代码
        search:    要查询的字段,详情参考 API: get_fundamentals-查询财务数据
        count:     单位时间长度,表示返回前几多少期的季报或者年报
        frequency: 获取数据类型,'quarter'为季报,'year'为年报
    输出:
        sum_num:  总值
        mean_num: 平均值
        df:       一段时间内季报或者年报的 DataFrame
    注:
        对于年报数据, 我们目前只有 现金流表 和 利润表, 当查询其他表时, 会返回该年份最后一个季报的数据"
    '''
    import pandas as pd

    def get_quarter(month):
        if month in (1,2,3):  
            return 1  
        elif month in (4,5,6):  
            return 2
        elif month in (7,8,9):  
            return 3
        elif month in (10,11,12):
            return 4

    # 查询条件
    q = query(
                income.code,
                income.statDate,
                search,
              ).filter(
                income.code.in_([security])
            )
    # 获取最近一次报表发布的日期
    statDate_num = get_fundamentals(q)['statDate'][0]
    # 获取最近一次报表发布所属的年份
    year = datetime.datetime.strptime(statDate_num, "%Y-%m-%d").year
    # 获取最近一次报表发布所属的月份
    month = datetime.datetime.strptime(statDate_num, "%Y-%m-%d").month
    # 获取最近一次报表发布日期所属的季度
    qt = get_quarter(month)
    # 获取季度列表
    qt_list = range(qt,0,-1) range(4,0,-1)*(count/4 1)
    qt_list = qt_list[:count]
    # 查询时间列表
    data_list = []
    # 获取查询的名称
    name = str(search).split('.')[-1]
    # 列名称
    colums_list = []
    # 获取拼接后的 DataFrame
    if frequency == 'quarter':
        for num in range(len(qt_list)):
            s = str(year) 'q' str(qt_list[num])
            data_list.append(s)
            colums_list.append(name   '_'   s)
            if qt_list[num] == 1:
                year -= 1
        # 拼接列表
        df = get_fundamentals(q, statDate = data_list[0])
        df.set_index(df.code.values, inplace=True)
        for t in data_list[1:]:
            d = get_fundamentals(q, statDate = t)
            d.set_index(d.code.values, inplace=True)
            df = pd.concat([df[name],d[name]],axis = 1)
    elif frequency == 'year':
        for num in range(count):
            year -= 1
            s = str(year)
            data_list.append(s)
            colums_list.append(name   '_'   s)
        # 拼接列表
        df = get_fundamentals(q, statDate = data_list[0])
        df.set_index(df.code.values, inplace=True)
        for t in data_list[1:]:
            d = get_fundamentals(q, statDate = t)
            d.set_index(d.code.values, inplace=True)
            df = pd.concat([df[name],d[name]],axis = 1)
    else:
        print "请输入正确的 frequency 参数,'quarter'为季报,'year'为年报. \n对于年报数据, 我们目前只有 现金流表 和 利润表, \n当查询其他表时, 会返回该年份最后一个季报的数据"
        return
    # 设置列名称
    df.columns = colums_list[:len(df.iloc[0])]
    # 计算总值
    sum_num = sum(df.iloc[0])
    # 计算平均值
    mean_num = mean(df.iloc[0])
    # 返回结果
    return sum_num, mean_num, df

技术指标函数群

使用技术指标函数前,请现在策略中导入如下模块,以确保函数能正常运行

from jqdata import *
import numpy as np
import talib

具体函数如下:

# MACD
def MACD(security_list, fastperiod=12, slowperiod=26, signalperiod=9):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 MACD
    security_data = history(slowperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
    macd_DIF = {}; macd_DEA = {}; macd_HIST = {}
    for stock in security_list:
        nan_count = list(np.isnan(security_data[stock])).count(True)
        if nan_count == len(security_data[stock]):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            macd_DIF[stock] = array([np.nan])
            macd_DEA[stock] = array([np.nan])
            macd_HIST[stock]= array([np.nan])
        else:
            macd_DIF[stock], macd_DEA[stock], macd = talib.MACDEXT(security_data[stock], fastperiod=fastperiod, fastmatype=1, slowperiod=slowperiod, slowmatype=1, signalperiod=signalperiod, signalmatype=1)
            macd_HIST[stock] = macd * 2
    return macd_DIF, macd_DEA, macd_HIST

# MA
def MA(security_list, timeperiod=5):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 MA
    security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
    ma = {}
    for stock in security_list:
        nan_count = list(np.isnan(security_data[stock])).count(True)
        if nan_count == len(security_data[stock]):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            ma[stock] = array([np.nan])
        else:
            ma[stock] = talib.MA(security_data[stock], timeperiod)
    return ma

# SMA
def SMA(security_list, timeperiod=5) :
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 SMA
    security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
    sma = {}
    for stock in security_list:
        close = np.nan_to_num(security_data[stock])
        sma[stock] = reduce(lambda x, y: ((timeperiod - 1) * x   y) / timeperiod, close)
    return sma


# KDJ
def KDJ(security_list, fastk_period=5, slowk_period=3, fastd_period=3) :
    def SMA_CN(close, timeperiod) :
        close = np.nan_to_num(close)
        return reduce(lambda x, y: ((timeperiod - 1) * x   y) / timeperiod, close)

    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 KDJ
    n = max(fastk_period, slowk_period, fastd_period)
    k = {}; d = {}; j = {}
    for stock in security_list:

        security_data = attribute_history(stock, n*2,'1d',fields=['high', 'low', 'close'], df=False)
        nan_count = list(np.isnan(security_data['close'])).count(True)
        if nan_count == len(security_data['close']):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            k[stock] = np.nan
            d[stock] = np.nan
            j[stock] = np.nan
        else:
            high = security_data['high']
            low = security_data['low']
            close = security_data['close']
            kValue, dValue = talib.STOCHF(high, low, close, fastk_period, fastd_period, fastd_matype=0)
            kValue = np.array(map(lambda x : SMA_CN(kValue[:x], slowk_period), range(1, len(kValue)   1)))
            dValue = np.array(map(lambda x : SMA_CN(kValue[:x], fastd_period), range(1, len(kValue)   1)))
            jValue = 3 * kValue - 2 * dValue

            func = lambda arr : np.array([0 if x < 0 else (100 if x > 100 else x) for x in arr])

            k[stock] = func(kValue)
            d[stock] = func(dValue)
            j[stock] = func(jValue)
    return k, d, j

# RSI
def RSI(security_list, timeperiod=6):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 MA
    security_data = history(timeperiod*20, '1d', 'close' , security_list, df=False, skip_paused=True)
    rsi = {}
    for stock in security_list:
        nan_count = list(np.isnan(security_data[stock])).count(True)
        if nan_count == len(security_data[stock]):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            rsi[stock] = array([np.nan])
        else:
            rsi[stock] = talib.RSI(security_data[stock], timeperiod)[-1]
    return rsi

# CCI
def CCI(security_list, timeperiod=14):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 CCI
    cci = {}
    for stock in security_list:
        security_data = attribute_history(stock, timeperiod*2, '1d',['close','high','low'] , df=False)
        nan_count = list(np.isnan(security_data['close'])).count(True)
        if nan_count == len(security_data['close']):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            cci[stock] = array([np.nan])
        else:
            close_CCI = security_data['close']
            high_CCI = security_data['high']
            low_CCI = security_data['low']
            cci[stock] = talib.CCI(high_CCI, low_CCI, close_CCI, timeperiod)
    return cci

# ATR
def ATR(security_list, timeperiod=14):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 ATR
    atr = {}
    for stock in security_list:
        security_data = attribute_history(stock, timeperiod*2, '1d',['close','high','low'] , df=False)
        nan_count = list(np.isnan(security_data['close'])).count(True)
        if nan_count == len(security_data['close']):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            atr[stock] = array([np.nan])
        else:
            close_ATR = security_data['close']
            high_ATR = security_data['high']
            low_ATR = security_data['low']
            atr[stock] = talib.ATR(high_ATR, low_ATR, close_ATR, timeperiod)
    return atr

# 布林线
def Bollinger_Bands(security_list, timeperiod=5, nbdevup=2, nbdevdn=2):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 Bollinger Bands
    security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
    upperband={}; middleband={}; lowerband={}
    for stock in security_list:
        nan_count = list(np.isnan(security_data[stock])).count(True)
        if nan_count == len(security_data[stock]):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            upperband[stock] = array([np.nan])
            middleband[stock] = array([np.nan])
            lowerband[stock] = array([np.nan])
        else:
            upperband[stock], middleband[stock], lowerband[stock] = talib.BBANDS(security_data[stock], timeperiod, nbdevup, nbdevdn)
    return upperband, middleband, lowerband

# 平均成交额
def MA_MONEY(security_list, timeperiod=5):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 N 日平均成交额
    security_data = history(timeperiod*2, '1d', 'money' , security_list, df=False, skip_paused=True)
    mamoney={}
    for stock in security_list:
        nan_count = list(np.isnan(security_data[stock])).count(True)
        if nan_count == len(security_data[stock]):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            mamoney[stock] = array([np.nan])
        else:
            mamoney[stock] = talib.MA(security_data[stock], timeperiod)
    return mamoney

# 平均成交量
def MA_VOLUME(security_list, timeperiod=5):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 N 日平均成交量
    security_data = history(timeperiod*2, '1d', 'volume' , security_list, df=False, skip_paused=True)
    mavol={}
    for stock in security_list:
        nan_count = list(np.isnan(security_data[stock])).count(True)
        if nan_count == len(security_data[stock]):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            mavol[stock] = array([np.nan])
        else:
            mavol[stock] = talib.MA(security_data[stock], timeperiod)
    return mavol


# BIAS
def BIAS(security_list, timeperiod=5):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 BIAS 
    security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
    bias = {}
    for stock in security_list:
        average_price = security_data[stock][-timeperiod:].mean()
        current_price = security_data[stock][-1]
        bias[stock]=(current_price-average_price)/average_price
    return bias

# BBI
def BBI(security_list, timeperiod1=3, timeperiod2=6, timeperiod3=12, timeperiod4=24):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 BBI
    security_data = history(timeperiod4*2, '1d', 'close' , security_list, df=False, skip_paused=True)
    bbi={}
    for stock in security_list:
        x = security_data[stock]
        d = (x[-timeperiod1:].mean() x[-timeperiod2:].mean() x[-timeperiod3:].mean() x[-timeperiod4:].mean())/4.0
        bbi[stock] = d
    return bbi

# TRIX
def TRIX(security_list, timeperiod=30):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 TRIX
    security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
    trix={}
    for stock in security_list:
        nan_count = list(np.isnan(security_data[stock])).count(True)
        if nan_count == len(security_data[stock]):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            trix[stock] = array([np.nan])
        else:
            trix[stock] = talib.TRIX(security_data[stock], timeperiod)
    return trix

# EMA
def EMA(security_list, timeperiod=30):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 EMA
    security_data = history(timeperiod*2, '1d', 'close' , security_list, df=False, skip_paused=True)
    ema={}
    for stock in security_list:
        nan_count = list(np.isnan(security_data[stock])).count(True)
        if nan_count == len(security_data[stock]):
            log.info("股票 %s 输入数据全是 NaN,该股票可能已退市或刚上市,返回 NaN 值数据。" %stock)
            ema[stock] = array([np.nan])
        else:
            ema[stock] = talib.EMA(security_data[stock], timeperiod)
    return ema

# DMA
def DMA(security_list,fastperiod=5,slowperiod=60,amaperiod=20):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 DMA
    dma = {}
    for security in security_list:
        security_data = attribute_history(security, slowperiod*2, '1d', ['close'], skip_paused=True, df=False)['close']
        m1 = map(lambda i: security_data[i-fastperiod 1:i 1],range(len(security_data))[len(security_data)-slowperiod:])
        m2 = map(lambda i: security_data[i-slowperiod 1:i 1],range(len(security_data))[len(security_data)-slowperiod:])
        MA1 = map(lambda x: mean(x), m1)
        MA2 = map(lambda x: mean(x), m2)
        DMA = array(MA1) - array(MA2)
        dma[security] = DMA
    return dma

# AMA
def AMA(security_list,fastperiod=5,slowperiod=60,amaperiod=20):
    # 修复传入为单只股票的情况
    if isinstance(security_list, str):
        security_list = [security_list]
    # 计算 DMA
    ama = {}
    for security in security_list:
        security_data = attribute_history(security, slowperiod*2, '1d', ['close'], skip_paused=True, df=False)['close']
        m1 = map(lambda i: security_data[i-fastperiod 1:i 1],range(len(security_data))[len(security_data)-slowperiod:])
        m2 = map(lambda i: security_data[i-slowperiod 1:i 1],range(len(security_data))[len(security_data)-slowperiod:])
        MA1 = map(lambda x: mean(x), m1)
        MA2 = map(lambda x: mean(x), m2)
        DMA = array(MA1) - array(MA2)
        # 计算 AMA
        a1 = map(lambda i: DMA[i-amaperiod 1:i 1],range(len(DMA))[-2:])
        AMA = mean(a1)
        ama[security] = AMA
    return ama

二、行业概念代码

证监会行业

from jqdata import *
HY_ZJH = get_industries(name='zjw')

聚宽一级行业

from jqdata import *
HY_ZY =get_industries(name='jq_l1')

聚宽二级行业

from jqdata import *
HY_ZY =get_industries(name='jq_l2')

申万一级行业

from jqdata import *
SW1 = get_industries(name='sw_l1')

申万二级行业

from jqdata import *
SW2 = get_industries(name='sw_l2')

申万三级行业

from jqdata import *
SW3 = get_industries(name='sw_l3')

概念板块

from jqdata import *
GN = get_concepts()

全部回复

0/140

达人推荐

量化课程

    移动端课程