请 [注册] 或 [登录]  | 返回主站

量化交易吧 /  数理科学 帖子:3364768 新帖:4

“强势上涨+回调”策略研究——基于一些用于时间序列分割的算法

好的名字都没了发表于:5 月 10 日 03:12回复(1)

本文主要内容见下方研究。

“强势上涨+回调”策略研究——基于一些用于时间序列分割的算法

本文的思路源于国信证券2012年12月17日金融工程研究报告。感谢该报告提供些算法的名称,为策略提供思路,但是这个报告除了策略的名称提了两句以外,其它相当于没说,结果也无法复制(没必要去找那篇文章来看==)。本文主要作用是介绍一下大体思路,然后将其中所提及的算法进行搜集整理并加上自己的定义和尝试来实现结果。

一、总体思路

我们的最终目的是找到“强势股”。本文的方针是通过股票价格形态确定的方法,而不是通过因子的方法。

先找出“强势股”的形态,确定其关键点(“关键点”定义后面解释,先理解为:一个强势股票,先要上涨,经历一段回调,再上涨(这是我们暂时对“强势股”的定义,后面详说),我们的肉眼可以直接看出它从第一个上涨之后到回调阶段的拐点就是关键点,但电脑如何找到这个拐点,这又涉及到一些算法,后面会阐明这些用于时间序列分割的算法),因为关键点表示股票进入了回调阶段,所以只要确定了前面的“先上涨+回调”的形态,就认为它就是强势股,我们认为它将在后面有第二轮上涨,可以在回调期间买入。

有人可能会问,不一定它会先上涨+回调之后又会上涨,也有可能回调之后下跌了。解释来源于研报中的一些背景前提:

所谓强势股,指的是在短期内出现了非常明显的上涨,如连续涨停板的出现。这种上涨背后的原因往往是由于消息面的刺激,诸如2007年的奥运概念龙头股北京旅游,2010年锂电池概念龙头股成飞集成,2012年的金改概念龙头股浙江东日、金山开发,2012年稀土永磁概念龙头股包钢稀土等。这些股票从技术分析角度来看,在出现了相应的消息面配合后,都存在非常明显的强势特征。由于前期过快的拉升,导致大部分投资者不能享受其第一波拉升,只能望洋兴叹。这些强势股在结束第一波拉升后,在若干个交易日内,会呈现横盘整理的状态,表现为成交量和换手率的变化,同时股价大部分的时间内处在某条均线的支撑之上。经过一段时间的调整后,强势股有可能会出现第二波涨势。

我们希望找到的强势股是这样的:

强势股举例

但它其实可能后来是这样的:

强势股寻找失败案例

所以我们的研究只能在基于上述心理分析+历史统计的基础上,认为大部分的概率,找到强势股后它会有第二波上涨,所以有大的胜率就行了。其实做到后面时候就会发现,能找到“先上涨+回调”的形态的股票已经非常不容易了。。

为了预防“先上涨+回调”之后下跌的情形,我们可以设定一些限制,比如在回调期间内MA5必须保持在MA10之上,等等限制,可以放在策略中,本研究不做过多讨论。

二、实现步骤

第一步:先找到一个“强势上涨+回调”阶段的形态作为模板标准。并将其标准化(缩短至0-1价格区间)。

我们希望找到的模板是这样的:(不要嫌弃)

股票形态模板

这种形态在短期内出现次数很少,最终找到了一个近似的模板形态是下方的,然后我们将其价格进行了标准化(模板暂时用肉眼找的,找到了模板才能根据这个来找股票池里具备这样形态的股票。价格标准化就是把价格范围改变在0-1区间,公式标注在下方代码):

import pandas as pd
import numpy as np
from pandas import DataFrame,Series
import matplotlib.pyplot as plt
from dtw import dtw
from sklearn.metrics.pairwise import euclidean_distances


fig_size = plt.rcParams['figure.figsize']
fig_size[0] = 12
fig_size[1] = 8


##首先要找到一个标准的强势上涨和回调的标准形态,万向钱潮
close_benchmark0 = get_price('000559.XSHE','2009-01-04','2009-02-10','1d',['close'])['close'][:16]
close_benchmark=[0]*len(close_benchmark0)

#价格标准化
for k in range(len(close_benchmark0)):
    close_benchmark[k]=(close_benchmark0[k]-min(close_benchmark0))/(max(close_benchmark0)-min(close_benchmark0))

plt.plot(range(len(close_benchmark)),close_benchmark,'k')
/opt/conda/envs/python2/lib/python2.7/site-packages/sklearn/externals/joblib/_multiprocessing_helpers.py:28: UserWarning: [Errno 30] Read-only file system.  joblib will operate in serial mode
  warnings.warn('%s.  joblib will operate in serial mode' % (e,))
[<matplotlib.lines.Line2D at 0x7fa11fbfad50>]

上面就是我们的模板。基本符合“上涨+回调”形态。

第二步:用该模板遍历股票池中的所有股票,用DTW算法找到符合“强势上涨+回调”形态的股票。

关于DTW算法:全称是动态时间弯曲算法(Dynamic Time Warping),在基于动态时间弯曲算法寻找趋势相似的股票研究中,我们已经验证了它的可行性,具体了解可点击链接。这里简单说一下,DTW就是用来解决时间段上不匹配的问题,比如我的模板本来只是想表达一种“先上涨+回调”的意思而已,但是在遍历的过程中,有可能因为某只股票时间段上的错位,造成计算error很大,然后原本匹配的价格形态被筛除,这种方法能够解决时间不对应问题,比如形态被拉伸了或者缩短了等等。但该方法缺点是运算速度很慢。

上次发了研究后,有同学说DTW太慢了,所以可以先用相关系数筛选,筛选之后再用DTW,在策略中可以尝试~~~

第三步:找到合适的股票之后,找“关键点”。找到关键点之后就可以分割开“先上涨”+“回调”阶段了,然后就可以在策略中对回调阶段分析来筛选强势股,当MA10上穿MA5时就剔除,来尽量避免回调之后又下跌的情形。后续策略内容本文就不多讨论。主要问题在于,怎么划分开“上涨”+“回调”?

三、分割时间序列算法介绍

这里主要介绍两种方法,一种叫普通滑动窗口算法(Generic Sliding Window Algorithm),还有一种是自下而上逐渐合并的算法(Generic Bottom-Up Algorithm)。

The Generic Sliding Window Algorithm

滑动窗口算法。简单用语言描述,就是:在一个时间序列里,比如我先拿第一个数字作为第一个锚,然后在他往后的第三个数字开始,尝试第一至第三个数字的时间段可否作为一段,那么判定的函数(代码中设为calculate_error)我自己定义。本文定义的calculate_error为,连接锚和最终点,然后求中间点到该直线的距离,取最长的距离distance为calculate_error结果,当distance小于阈值max_error时,第三个点移至第四个,,,第五个。。以此类推,当distance终于大于max_error后,比如第六个点算的calculate_error就大于阈值了,那么锚就改成第六个点,然后从第六个点后面的第三个点开始重新算calculate_error,以此类推,直到序列末尾。然后我们就可以将这个时间序列进行分段了。

如果觉得文字描述很绕,可以看下方伪代码:

sliding window

符号含义为:

notation

The Generic Bottom-Up Algorithm

大致意思是,先两两时间序列的元素进行合成,变成一串小segments, 然后再将相邻的所有segment都合成看他们的cal_error(即代码中的merge_cost),然后如果cal_error小于阈值max_error,则合并这两个segments,相邻的merge_cost也要进行调整重新算。以此类推,不断地合并,直到超出阈值max_error。

具体步骤看伪代码:

bottom-up

四、编程实现

1.首先我们看看能不能用dtw方法,用上述价格形态模板来在2016-01-01至2016-12-01内从股票池上证50中找到“强势上涨+回调”的形态。因为DTW包很慢,我们只是为了探究后面的找关键点的方法,而不是为了来验证DTW做策略,为了节约时间,所以就拿非常小的股票池,只有50只股票,在很短的时间内寻找“强势上涨+回调”形态,可惜在该区域内没找到合适的。

import pandas as pd
import numpy as np
from pandas import DataFrame,Series
import matplotlib.pyplot as plt
from dtw import dtw
from sklearn.metrics.pairwise import euclidean_distances
import time
from numpy import inf


##遍历所有股票,DTW方法
pool = Series(get_index_stocks('000016.XSHG'))

#归一化所有股票的closeprice
closeprice={}
for i in pool.index:
    closeprice[i] = get_price(pool[i],'2016-01-01','2016-12-01','1d',['close'])['close']    
    for k in range(len(closeprice[i])):
        closeprice[i][k] = (closeprice[i][k]-min(closeprice[i]))/(max(closeprice[i])-min(closeprice[i]))

        if closeprice[i][k]==NaN or closeprice[i][k]==inf:
            del closeprice[i]
            continue
        else:
            closeprice[i][k] = float("%.9f" % closeprice[i][k])

#设定distance的阈值
dist_threshold = 3
stockpairs=[]
window = len(close_benchmark)       #滑动窗口
for i in pool.index:
    if i in closeprice.keys():
        
        for k in range(len(closeprice[i])+1-window):
            try:
                dist, cost, acc, path = dtw(close_benchmark,closeprice[i][k:(k+window-1)],dist=euclidean_distances)
            except:
                continue
            else:
                distance_ = acc[-1][-1]   
        
                if distance_ < dist_threshold:
                    print(pool[i],distance_,k)
                    stockpairs.append((pool[i],k))

print(stockpairs)
(u'600000.XSHG', 2.7794713349999496, 0)
(u'600104.XSHG', 2.9651373997777775, 0)
(u'600111.XSHG', 2.8057925002222484, 0)
(u'600519.XSHG', 2.9154120298888837, 0)
(u'601088.XSHG', 2.4601154925555599, 0)
(u'601169.XSHG', 2.3964030970000612, 0)
(u'601288.XSHG', 2.794230461777802, 0)
[(u'600000.XSHG', 0), (u'600104.XSHG', 0), (u'600111.XSHG', 0), (u'600519.XSHG', 0), (u'601088.XSHG', 0), (u'601169.XSHG', 0), (u'601288.XSHG', 0)]
window = len(close_benchmark)
close_benchmark0 = get_price('000559.XSHE','2009-01-04','2009-02-10','1d',['close'])['close'][:16]
m = get_price('601169.XSHG','2016-01-01','2016-12-01','1d',['close'])['close'][:window]
                    
fig,ax1=plt.subplots()
ax2 = ax1.twinx()

p1=ax1.plot(range(len(close_benchmark0)),close_benchmark0,'r')
p1=ax2.plot(range(len(close_benchmark0)),m,'k')
standard_m = [0]*len(m)
for i in range(len(m)):
    standard_m[i] = (m[i]-min(m))/(max(m)-min(m))

fig,ax1=plt.subplots()
ax2 = ax1.twinx()

p1=ax1.plot(range(len(close_benchmark)),close_benchmark,'r')
p1=ax2.plot(range(len(close_benchmark)),standard_m,'k')

这个结果找的不好,也许是我们取的时间太短了,股票池也太小了,说明在这一段内就没有合适的。所以为了能够继续验证下面的Sliding Window和Bottom-Up算法,我就直接拿价格模板来验证了,不浪费时间在找上了,以后做策略的时候可以用相关系数法+dtw来节约时间。

m = get_price('000559.XSHE','2009-01-04','2009-02-10','1d',['close'])['close'][:16]
##先用PLR算法得到关键点

##PLR算法
from scipy.spatial import distance
import math
import time

fig_size = plt.rcParams['figure.figsize']
fig_size[0] = 12
fig_size[1] = 8

p1=plot(range(len(m)),m,'k')

def dist(x1,y1,x2,y2,x3,y3): # x3,y3 is the point
    px = x2-x1
    py = y2-y1
   
    something = px*px + py*py
#     print(something,'lalala')

    u =  ((x3 - x1) * px + (y3 - y1) * py) / float(something)

    if u > 1:
        u = 1
    elif u < 0:
        u = 0

    x = x1 + u * px
    y = y1 + u * py

    dx = x - x3
    dy = y - y3

    # Note: If the actual distance does not matter,
    # if you only want to compare what this function
    # returns to other results of this function, you
    # can just return the squared distance instead
    # (i.e. remove the sqrt) to gain a little performance

    dist = math.sqrt(dx*dx + dy*dy)

    return dist


def calculate_error(x1,y1,x2,y2,x3_range,y3_range,i):
    
    distance0 = [0]*(i-1)
    for k in range(len(x3_range)):
        distance0[k] = dist(x1,y1,x2,y2,x3_range[k],y3_range[k])
    
    distance = max(distance0)
    
    return distance
    
    
#max_error为一个重要参数,此处省去BPN与根据profit优化步骤,根据多次尝试直接设定即可
max_error = 0.1
anchor_set = []
    
anchor = 0
anchor_set.append(anchor)
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))


i_prior = 0
while anchor + i_prior < len(m):
    i = 2
    
    while calculate_error(anchor,m[anchor],anchor+i,m[anchor+i],range(anchor+1,anchor+i),m[(anchor+1):(anchor+i)],i) < max_error:
        i_prior = i
        i += 1
    
        if i >= len(m)-anchor:
            break
    else:
        
        anchor += i-1
        anchor_set.append(anchor)
        plt.scatter(anchor,m[anchor],color='red')
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),'lalala')
        
        continue
    break
    
2016-12-15 09:52:03
('2016-12-15 09:52:03', 'lalala')
('2016-12-15 09:52:04', 'lalala')

上面是Sliding Window算法的结果,包含了我们想要的点,也多了一个下面起始上涨的点,结果还OK吧~~

下面看看Bottom-Up算法:

##用Bottum-Up算法求关键点
from pandas import Series


p1=plot(range(len(m)),m,'k')

def cal_error(x1,y1,x2,y2,x3_range,y3_range,w):

    distance0 = [0]*w
    for k in range(w):
        distance0[k] = dist(x1,y1,x2,y2,x3_range[k],y3_range[k])
    
    distance = max(distance0)        
  
    return distance

    
#max_error重要参数
max_error = 0.2

segment_0 = []
for i in range(0,len(m),2):
    segment_0_portion = (m[i],m[i+1])
    segment_0.append(segment_0_portion) 

times = 300
    
merge_cost = [0]*(len(segment_0)-1)
for i in range(len(segment_0)-1):
    
    merge = []
    for k in range(len(segment_0[i])):
        merge.append(segment_0[i][k])
    
    for k in range(len(segment_0[i+1])):
        merge.append(segment_0[i+1][k])
    
    
    j = len(merge)-1
    merge_cost[i]=cal_error(0,merge[0],len(merge)-1,merge[-1],range(1,j),merge[1:j],len(merge)-2)
    
    
count = 0
while min(merge_cost) < max_error and count<=times and len(merge_cost)>1:
    count += 1
    merge_cost_array = np.array(merge_cost)
    i = np.argmin(merge_cost_array)          #i是最小数值的那个index
    
    print(i,len(merge_cost))
    
    segment_0[i] = segment_0[i] + segment_0[i+1]
    del segment_0[i+1]
    
    if i ==0 :
        
        del merge_cost[i+1]
        merge = []
        for k in range(len(segment_0[i])):
            merge.append(segment_0[i][k])
    
        for k in range(len(segment_0[i+1])):
            merge.append(segment_0[i+1][k])
            
        j = len(merge)-1
        merge_cost[i] = cal_error(0,merge[0],len(merge),merge[-1],range(1,j),merge[1:j],j-1)
    
    elif i == len(merge_cost)-1:

        del merge_cost[i]
        merge = []
        for k in range(len(segment_0[i-1])):
            merge.append(segment_0[i-1][k])
    
        for k in range(len(segment_0[i])):
            merge.append(segment_0[i][k])
        
        j = len(merge)-1
        merge_cost[i-1] = cal_error(0,merge[0],len(merge),merge[-1],range(1,j),merge[1:j],j-1)
        
    else:

        del merge_cost[i+1]
        
        merge = []
        for k in range(len(segment_0[i])):
            merge.append(segment_0[i][k])
    
        for k in range(len(segment_0[i+1])):
            merge.append(segment_0[i+1][k])
        
        j = len(merge)-1
        merge_cost[i] = cal_error(0,merge[0],len(merge),merge[-1],range(1,j),merge[1:j],j-1)
        
        merge = []
        for k in range(len(segment_0[i-1])):
            merge.append(segment_0[i-1][k])
    
        for k in range(len(segment_0[i])):
            merge.append(segment_0[i][k])
            
        j = len(merge)-1
        merge_cost[i-1] = cal_error(0,merge[0],len(merge),merge[-1],range(1,j),merge[1:j],j-1)
        

m_price=Series([0])
index_set = []
for k in range(len(segment_0)):
    for i in range(len(segment_0[k])):
        m_price.set_value(max(m_price.index) + 1, segment_0[k][i] )
        if i == 0:
            index_set.append(m_price.index[-1]) 
    if k!=0:
        plt.scatter(index_set[k]-1,segment_0[k][0],color='red')
    
    
(4, 7)
(4, 6)
(2, 5)
(0, 4)
(2, 3)
(0, 2)

哈哈,结果非常好✌️

本研究主体到此就结束了。希望能够多多指出不足,再改进,可以做成策略。

以下是草稿。本来想拿相关系数来选取相似走势的股票的,恩。。不管了

#用相关系数方法遍历各个股票,在第一找到的时候停止

pool = Series(get_index_stocks('000002.XSHG'))
for i in pool.index:
    close_i = get_price(pool[i],'2014-01-01','2016-12-01','1d',['close'])['close']
    k = i+1
    if k != pool.index[-1]:
        for m in range(k,pool.index[-1]):
            close_m = get_price(pool[m],'2014-01-01','2016-12-01','1d',['close'])['close']
            correlation = pearsonr(close_i,close_m)[1]
            if correlation>0.9:
                print(correlation)
                fig,ax1=plt.subplots()
                ax2 = ax1.twinx()
                
                p1=ax1.plot(range(len(close_m)),close_i,'r')
                p1=ax2.plot(range(len(close_m)),close_m,'k')
                break
        else:
            continue
        break
               
    
1.0
numpy.corrcoef(x, y)
pool = Series(get_index_stocks('000002.XSHG'))
for i in pool.index:
    close_i = get_price(pool[i],'2014-01-01','2016-12-01','1d',['close'])['close']
    k = i+1
    if k != pool.index[-1]:
        for m in range(k,pool.index[-1]):
            try:
                close_m = get_price(pool[m],'2014-01-01','2016-12-01','1d',['close'])['close']
            except:
                break
            correlation = np.corrcoef(close_i,close_m)[0][-1]
            if correlation > 0.9:
                print(correlation)
                fig,ax1 = plt.subplots()
                ax2 = ax1.twinx()
                
                p1=ax1.plot(range(len(close_m)),close_i,'r')
                p1=ax2.plot(range(len(close_m)),close_m,'k')
                break
        else:
            continue
        break
0.945047310278
 

全部回复

0/140

量化课程

    移动端课程