拜读了韭菜Hulk 大神的基于希尔伯特-黄变换的短线择时
于是在网上搜索EMDT的python实现,以下是本菜鸟搬运github上大神的杰作,并使之能在Notebook运行。
from jqdata import *
import math
import talib
import numpy as np
import scipy as sp
import scipy.optimize
import scipy.interpolate
import scipy.signal
from scipy.signal import hilbert
import pandas as pd
import pandas.io.data as web
from pandas.io.data import DataReader
import datetime
import calendar
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import matplotlib.dates as mdates
from sklearn import gaussian_process
def emd(data,max_modes=10):
# initialize modes
modes=[]
# perform sifts until we have all modes
residue=data
while not _done_sifting(residue):
# perform a sift
imf,residue = _do_sift(residue)
# append the imf
modes.append(imf)
# see if achieved max
if len(modes) == max_modes:
# we have all we wanted
break
# append the residue
modes.append(residue)
# return an array of modes
return np.asarray(modes)
def eemd(data, noise_std=0.2, num_ensembles=100, num_sifts=10):
# get modes to generate
num_samples = len(data)
num_modes = int(np.fix(np.log2(num_samples)))-1
# normalize incomming data
dstd = data.std()
y = data/dstd
# allocate for starting value
all_modes = np.zeros((num_modes+2,num_samples))
# loop over num_ensembles
for e in range(num_ensembles):
# perturb starting data
x0 = y + np.random.randn(num_samples)*noise_std
# save the starting value
all_modes[0] += x0
# loop over modes
for m in range(num_modes):
# do the sifts
imf = x0
for s in range(num_sifts):
imf = _do_one_sift(imf)
# save the imf
all_modes[m+1] += imf
# set the residual
x0 = x0 - imf
all_modes[-1] += x0
return all_modes*dstd/np.float64(num_ensembles)
def _done_sifting(d):
return np.sum(_localmax(d))+np.sum(_localmax(-d))<=2
def _do_sift(data):
imf=data
while True:
imf=_do_one_sift(imf)
numExtrema,numZC = _analyze_imf(imf)
if abs(numExtrema-numZC)<=1:
break
numConstant = 0
desiredNumConstant = 5
lastNumExtrema = numExtrema
lastNumZC = numZC
while numConstant < desiredNumConstant:
imf=_do_one_sift(imf)
numExtrema,numZC = _analyze_imf(imf)
if numExtrema == lastNumExtrema and \
numZC == lastNumZC:
# is the same so increment
numConstant+=1
else:
# different, so reset
numConstant = 0
# save the last extrema and ZC
lastNumExtrema = numExtrema
lastNumZC = numZC
residue=data-imf
return imf,residue
def _do_one_sift(data):
upper=_get_upper_spline(data)
lower=-_get_upper_spline(-data)
imf = (upper+lower)*.5
detail=data-imf
return detail # imf
def _get_upper_spline(data):
maxInds = np.nonzero(_localmax(data))[0]
if len(maxInds) == 1:
s = np.ones(len(data))*data[maxInds]
return s
if maxInds[0]==0:
preTimes=1-maxInds[1]
preData=data[maxInds[1]]
else:
preTimes=1-maxInds[[1,0]]
preData=data[maxInds[[1,0]]]
if maxInds[-1]==len(data)-1:
postTimes=2*len(data)-maxInds[-2]-1;
postData=data[maxInds[-2]];
else:
postTimes=2*len(data)-maxInds[[-1,-2]];
postData=data[maxInds[[-1,-2]]]
t=np.r_[preTimes,maxInds,postTimes];
d2=np.r_[preData, data[maxInds], postData];
rep = scipy.interpolate.splrep(t,d2,s=.0)
s = scipy.interpolate.splev(range(len(data)),rep)
return s
def _analyze_imf(d):
numExtrema = np.sum(_localmax(d))+np.sum(_localmax(-d))
numZC = np.sum(np.diff(np.sign(d))!=0)
return numExtrema,numZC
def _localmax(d):
diffvec = np.r_[-np.inf,d,-np.inf]
diffScore=np.diff(np.sign(np.diff(diffvec)))
runEndingPositions=np.r_[np.nonzero(d[0:-1]!=d[1:])[0],len(d)-1]
runLengths = np.diff(np.r_[-1, runEndingPositions])
runStarts=runEndingPositions-runLengths + 1
realRunStarts = runStarts[runLengths>1]
realRunStops = runEndingPositions[runLengths>1]
realRunLengths = runLengths[runLengths>1]
maxRuns=(diffScore[realRunStarts]==-1) & (diffScore[realRunStops]==-1)
maxRunMiddles=np.round(realRunStarts[maxRuns]+realRunLengths[maxRuns]/2.)-1
maxima=(diffScore==-2)
maxima[maxRunMiddles.astype(np.int32)] = True
return maxima
def calc_inst_info(modes,samplerate):
amp=np.zeros(modes.shape,np.float32);
phase=np.zeros(modes.shape,np.float32);
f=np.zeros(modes.shape,np.float32);
for m in range(len(modes)):
h=scipy.signal.hilbert(modes[m]);
amp[m,:]=np.abs(h);
phase[m,:]=np.angle(h);
f[m,:] = np.r_[np.nan,
0.5*(np.angle(-h[2:]*np.conj(h[0:-2]))+np.pi)/(2*np.pi) * samplerate,
np.nan]
return f,amp,phase
def tlag(x,i,offset):
if i-offset<0:
return 0
else:
return x[i-offset]
def get2PHPF(sin, T):
n = len(sin)
alpha = (math.cos(2*math.pi/T) + math.sin(2*math.pi/T) -1)/math.cos(2*math.pi/T)
c0 = (1-alpha/2.0)*(1-alpha/2.0)
c1 = 0
N = 2
b0 = 1
b1 = -2
b2 = 1
a1 = 2*(1-alpha)
a2 = -(1-alpha)*(1-alpha)
sout = np.zeros(n)
for i in range(0,N):
sout[i] = 0
for i in range(N,n):
sout[i] = c0*( b0*tlag(sin,i,0) + b1*tlag(sin,i,1) + b2*tlag(sin,i,2) ) \
+ a1*tlag(sout,i,1) + a2*tlag(sout,i,2) - c1*tlag(sin,i,N)
return sout
security = '510300.XSHG'
# 获取股票的收盘价
h = get_bars(security, 250, '1m', ['close'], include_now = True)
Kdata = h['close']
#print(Kdata)
signal = eemd(get2PHPF(Kdata, 250))
fig = plt.figure(figsize=(20, 10))
plt.subplot(7, 1, 1)
plt.plot(signal[1,:], 'b-')
plt.title('Decomposed Frequency')
plt.ylabel('mode 1')
plt.subplot(7, 1, 2)
plt.plot(signal[2,:], 'b-')
plt.xlabel('time')
plt.ylabel('mode 2')
plt.subplot(7, 1, 3)
plt.plot(signal[3,:], 'b-')
plt.xlabel('time')
plt.ylabel('mode 3')
plt.subplot(7, 1, 4)
plt.plot(signal[4,:], 'b-')
plt.xlabel('time')
plt.ylabel('mode 4')
plt.subplot(7, 1, 5)
plt.plot(signal[5,:], 'b-')
plt.xlabel('time')
plt.ylabel('mode 5')
plt.subplot(7, 1, 6)
plt.plot(signal[6,:], 'b-')
plt.xlabel('time')
plt.ylabel('mode 6')
plt.subplot(7, 1, 7)
plt.plot(signal[7,:], 'b-')
plt.xlabel('time')
plt.ylabel('mode 7')
<matplotlib.text.Text at 0x7f33d9baf250>
mode = hilbert(signal[1,:])
Q = mode.imag
I = mode.real
T = len(Kdata)
fig = plt.figure(figsize=(20, 10))
plt.subplot(1, 2, 1)
plt.plot(Q,I)
for t in range(0,T):
if Kdata[t]>Kdata[t-1]:
plt.plot(Q[t],I[t],'ro',markersize=20)
elif Kdata[t]<Kdata[t-1]:
plt.plot(Q[t],I[t],'go',markersize=20)
plt.subplot(1, 2, 2)
N = 100
plt.plot(Q[T-N:T],I[T-N:T])
for t in range(T-N,T):
if Kdata[t]>Kdata[t-1]:
plt.plot(Q[t],I[t],'ro',markersize=200*(Kdata[t]-Kdata[t-1])/Kdata[t-1])
elif Kdata[t]<Kdata[t-1]:
plt.plot(Q[t],I[t],'go',markersize=200*(Kdata[t-1]-Kdata[t])/Kdata[t-1])
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...