做量化没有趁手工具是不成的,随时设计大量的数据计算,如果能随时看到数据的内容,就会对结果有一个直观的认识。
back = np.cumsum(np.random.randn(1000))
items = [pd.date_range('20000101',periods=1000)]
x1 = np.cumsum(np.random.randn(1000,7),0)
x2 = np.cumsum(np.random.randn(1000,7),0)
x3 = np.mean(x2,1)
x4 = x3 > np.mean(x3)
plot ([x1,x2,x3],sign=x4,back=back,items=items,focus=6,
refline=',,0',
xlabel='标题1,标题2',
ylabel='标题3,标题4',
lw=1,pcount=10,hight=5,width=12)
x = np.random.randn(100)
plot ([x,x],pt='b')
x = np.random.rand(10)
y = x>np.mean(x)
plot (x,sign=y)
x = np.random.rand(10)
plot (x)
x = np.random.rand(10,5)
x[:,1] *= 5
plot (x)
x = np.random.rand(10,5)
x[:,1] *= 5
plot (x,norm='1,')
x = np.random.rand(2,10,5)
x[:,1] *= 5
plot (x,norm='1,')
x = np.random.rand(2,10,5)
plot (x,focus=0)
x = np.random.rand(2,10,5)
plot (x,lw=2)
,width=12,hight=6
x = np.random.rand(2,10,5)
plot (x,lw=2,width=12,hight=3)
x = np.random.rand(10,5)
y = np.random.rand(10,2)
plot ([x,y],lw=2,width=12,hight=3)
x = np.random.rand(10,5)
y = np.random.rand(10,2)
a = np.random.rand(10,4)
b = np.random.rand(10,2)
plot ([[x,y],[a,b]],lw=2,width=12,hight=3)
px = get_price(['000001.XSHE','000002.XSHE'], start_date='2015-10-01 14:00:00', end_date='2015-12-02 15:00:00', \
frequency='1d')['close']
plot (px)
plot (px,norm='1,')
px = get_price(['000001.XSHE','000002.XSHE'], start_date='2015-10-01 14:00:00', end_date='2015-12-02 15:00:00', \
frequency='1d')['close']
plot (px,norm='1,',refline='0.45,')
plot (px,norm='1,',refline='0.45,',cmReverse=True)
x = np.random.randn(2,1000,100)
x = np.cumsum(x,1)
plot (x,lw=1,pcount=10,hight=6)
df = pd.DataFrame(np.random.randn(1000,7),columns=list('ABCDEFG'),index=pd.date_range('20000101',periods=1000))
df = df.cumsum()
plot (df,lw=1,pcount=10,hight=3,width=12,cmap='YlOrBr')
xx = np.cumsum(np.random.randn(1000))
df = pd.DataFrame(np.random.randn(1000,7),columns=list('ABCDEFG'),index=pd.date_range('20000101',periods=1000))
df = df.cumsum()
plot (df,back=xx,lw=1,pcount=10,hight=3,width=12,cmap='YlOrBr')
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
from datetime import datetime, timedelta
from scipy import sparse
from scipy.stats import binom, norm, beta, expon
from matplotlib.ticker import Formatter
from functools import wraps
import matplotlib.pyplot as plt
import matplotlib.transforms as mtrans
import matplotlib as mpl
import numpy as np
import pandas as pd
import jqdata as jp
import warnings
mpl.rcParams['axes.unicode_minus']=False
def asdate(istr):
try:
pdate = pd.to_datetime(istr,box=True)
if str(pdate.time())=='00:00:00':
return pdate.date()
else:
return pdate
except:
return istr
def verb_list(v,maxl=5,value=''):
if v is None: return None
if isinstance(v, str): v = v.split(',')
if str(v).replace('.', '').replace('-', '').isdigit():
_v=[str(v)]
else:
_v=list(v)
if len(_v) <= 0: _v = ['']
_v = [x if x!='' else value for x in _v]
return _v+([value]*(maxl-len(_v))) if len(_v)<maxl else _v
def customize(func):
"""
Decorator to set plotting context and axes style during function call.
装饰器在函数调用期间设置绘制上下文和坐标轴样式。
"""
@wraps(func)
def call_w_context(*args, **kwargs):
set_context = kwargs.pop('set_context', True)
if set_context:
with plotting_context():
mpl.rcParams['font.family']='serif'
#mpl.rcParams['font.serif' ]='SimHei'
return func(*args, **kwargs)
else:
return func(*args, **kwargs)
return call_w_context
def plotting_context(context='seaborn-darkgrid', font_scale=1.5, rc=None):
if rc is None: rc = {}
rc_default = {"lines.linewidth":1.5}
# Add defaults if they do not exist
for name, val in rc_default.items():
rc.setdefault(name, val)
mpl.rcParams.update(rc)
return plt.style.context((context))
def Keepborder(a):
x = np.array(a)
if x.ndim==1:
if np.isnan(x[0]):
bmin,bmax = np.nanmin(x),np.nanmax(x)
x[0] = (bmin+bmax)/2
else:
if np.isnan(x[0]).all():
bmin,bmax = np.nanmin(x,0),np.nanmax(x,0)
x[0] = (bmin+bmax)/2
return x
@customize
def plot(self,sign=None,back=None,xlabel=None,ylabel=None,items=None,
title='快速Array数据显示工具',hratio='1',width=6,hight=3,refline=None,yscale=None,norm=None,pcount=10,grid=True,fcolor=None,
fsize=12,yleft=0.015,alpha=0.65,lw=1,pt=None,focus=None,cmap=None,cmReverse=False):
''' 简单绘图函数
self : 数组 1d/2d以上 支持嵌套2级分组,如果输入大于2d的数组会被拆分绘制,如果存在于元组内则不在拆分
refline : str (',,1,,')允许输入参考线
width : 绘制区宽度
hight : 绘制区高度
'''
# ===========================================基本参数设置===========================================
if cmap!=None:
if isinstance(cmap,str):
if cmap[0]=='-': cmReverse=True; cmap=cmap[1:]
if isinstance(cmap,list):
cmap = mcolors.LinearSegmentedColormap.from_list("", cmap)
colormap = plt.get_cmap(cmap) if isinstance(cmap,str) else cmap
else:
colormap = cm.RdYlGn_r
# 判断输入数据格式和维度
if isinstance(self,pd.DataFrame):
if self.index.dtype =='<M8[ns]': index = np.asarray(self.index, dtype='datetime64[s]')
else: index = np.asarray(self.index)
if self.columns.dtype=='<M8[ns]': field = np.asarray(self.columns,dtype='datetime64[s]')
else: field = np.asarray(self.columns)
items = np.asarray([index,field])
order = tuple(self.values) if self.values.ndim > 2 else (self.values,)
elif isinstance(self,np.ndarray):
# 如果输入是数组且大于2个维度,则使用元组拆分,否则使用元组打包
order = tuple(self) if self.ndim > 2 else (self,)
else: order = self
# 提取主要数据集合(如果输入数据时经过打包的,取得第一个子集数据)
main = order[0][0] if isinstance(order[0],(list,tuple)) else order[0]
if items is None: items = getattr(main,'items',None) # 如果是自用子类,提取 x轴属性参数
if main.ndim!=1: main = main[:,0]
maxploy = len(order); step = 1.0/maxploy;
refline = verb_list(refline,maxploy) # 整理输入参数(基准点)
_norm = verb_list(norm, maxploy) # 整理输入参数(基准点)
_ptype = verb_list(pt ,maxploy) # 整理输入参数(绘制类型)
_ylabel = verb_list(ylabel, maxploy) # 整理输入参数(y轴标题)
_xlabel = verb_list(xlabel, maxploy) # 整理输入参数(x轴标题)
_hratio = verb_list(hratio ,maxploy, 1) # 整理输入参数(子图高度)
_yscale = verb_list(yscale ,maxploy, 1) # 整理输入参数(子图高度)
_hratio = np.array(list(map(lambda x:float(x),_hratio))) # 子图尺寸默认为1
_hratio =(_hratio*step)/(np.sum(_hratio)/maxploy)
# ===========================================开始图形绘制===========================================
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
fig = plt.figure(figsize=(width,hight))
ax1 = fig.add_axes([0,0,1,1]); ax1.margins(x=0,y=0.00001)#;ax1.yaxis.set_ticks_position('right')
if not fcolor is None: ax1.tick_params(axis='y', colors=fcolor)# 设置y轴颜色
# 如果有背景数据绘制背景图形(如果没有背景图,则用主数据代替)
if not back is None:
if back.ndim!=1: back = back[:,0]
bmin,bmax = np.nanmin(back),np.nanmax(back); ax1.set_ylim([bmin, bmax*1.25])
ax1.fill_between(np.arange(back.shape[0]),bmin,np.asarray(back),color='lightskyblue',alpha=0.3)
else:
bmin,bmax = np.nanmin(Keepborder(main)),np.nanmax(Keepborder(main))#; ax1.set_ylim([bmin, bmax])
ax1.set_xticklabels([]) ; ax1.set_title(title, fontsize=16) # 关闭背景 x 轴标签显示,设置标题
if len(order)>1: ax1.tick_params(bottom="off") # 关闭背景y轴分隔元素
# 绘制信号图(如果输入是个mask 则需要转换为信号类型)
if not sign is None:
# 如果数据不是一维,提取第一维数据
if sign.ndim!=1: sign = sign[:,0]
if sign.dtype =='bool': sign = Keepborder(mask_to_sign(sign))
s1,s2 = sign_to_index(sign)
for peak_index,recovery_index in zip(s1,s2):
ax1.fill_between((peak_index, recovery_index),bmin,bmax,alpha=.2)
# 遍历绘制
if not order is None:
for i,f in enumerate(order):
ax = fig.add_axes([0, 1.0-sum(_hratio[0:i+1]),1,_hratio[i]]); ax.margins(x=0,y=0.05)
if len(order)>1:
ax.patch.set_visible(False) # 设置背景透明
else:
ax = ax.twinx()
if i!=0: ax.spines['top'].set_color('none') # 关闭上边线
ax.yaxis.set_ticks_position('right') # 设置 y轴坐标移动到左边
if not fcolor is None: ax.tick_params(axis='y', colors = fcolor) # 设置 y轴刻度颜色
ax.margins(x=0,y = .15/(_hratio[i]*hight))
# 设置Y标签
if not _ylabel is None:
if not fcolor is None:
ax.set_ylabel(_ylabel[i], color = fcolor,fontsize =fsize)
else:
ax.set_ylabel(_ylabel[i], fontsize = fsize)
# 设置 y轴标签的显示位置
ax.get_yaxis().set_label_coords(-yleft,0.5)
# 设置x标签
if not _xlabel is None:
xltop = ((_hratio[i]*hight)-0.15)/(_hratio[i]*hight) # (子窗高-标题高)/子窗高 = 标题y坐标
xleft = 0.18/width
# 绘制标题文本
t = ax.text(xleft, xltop, # 设置标题位置
_xlabel[i], # 设置标题内容
fontsize=fsize,
alpha=0.95,
bbox={'facecolor':'white','alpha':1,'pad':5},
transform=ax.transAxes,
linespacing=1.,
verticalalignment='top')
#设置文本底色透明
t.set_bbox(dict(facecolor='white', alpha=0.65, edgecolor='steelblue',zorder=2.2))
# 设置是否归一化
ns = '' if _norm is None else _norm[i]
# 设置绘图类型
pt = '' if _ptype is None else _ptype[i]
fitems = getattr(f,'items',None)
legend = _plot_sub(f,fitems,pcount,ax,main,alpha,ns,lw,focus,colormap,cmReverse,pt) # 绘制窗口数据
if not fcolor is None: ax.tick_params(axis='x',colors=fcolor) # 设置 x轴字体颜色
if not items is None:
# 读取子类 x轴标签内容
if items[0].size == main.size:
xticklabels = np.array([asdate(x) for x in items[0]])
elif items[1].size == main.size:
xticklabels = np.array([asdate(x) for x in items[1]])
else:
xticklabels = np.arange(main.size)
else:
# 如果子类属性不存在用索引作为标签
xticklabels = np.arange(main.size)
# 计算可以绘制数量
step = int(np.round(len(xticklabels)/(width/0.26+1)+1))
# 生成 x轴标签索引
xticks = np.arange(len(xticklabels))
# 修正 x轴标签索引(为不同间隔的循环对齐最后边界)
xticks = xticks[range((len(xticklabels)-1)%step,len(xticklabels),step)]
# 清空 x 轴标签内容
ax.set_xticklabels([])
# 设置 x轴标签索引,指定主刻度
ax1.set_xticks(xticks, minor= False) # minor= False 代表主刻度线
# 只有最下边界允许显示 x轴标签
if i == (maxploy-1):
ax1.set_xticklabels([])
# 设置 x轴标签内容,指定主刻度,旋转角度,右边对齐,对齐方式
ax1.set_xticklabels(xticklabels[xticks], minor=False, rotation=35, ha = 'right', rotation_mode="anchor")
# 绘制基准线
if not refline is None:
if str(refline[i]).replace('.', '').replace(':', '').isdigit():
ax.axhline(float(refline[i]),linestyle=':', color='black',lw=0.5)
# 绘制网格 主刻度major,小刻度网格minor,全部both, 选择 axis = x,y,both
plt.grid(grid ,axis= 'both', which='major', linestyle = "-.", color = "black", linewidth = 0.5, alpha=.22)
plt.show() ; plt.close()
def _plot_sub(ret,items=None,maxcount=10,Ax=None,main=None,alpha=0.9,norms=None,lw=1,focus=None,cmap=None,cmReverse=False,pt=None):
if isinstance(ret,np.ndarray): ret = [ret]
legend = [] ; lindex = 0
if cmReverse: cmx =1; cmy =0
else: cmx =0; cmy =1
for x in ret:
v = Keepborder(x)
if norms =='1': v = maxminmap(v,0)
if v.ndim==1:
if not items is None:
if items[0].size>1:
legend = items[1]
elif items[1].size>1:
legend = items[0]
else:
lindex = lindex+1 ; legend = lindex
if (np.nanmax(v)+np.nanmin(v)==0)|(pt=='b'): # 如果是信号类型
if not cmap is None: Ax.set_prop_cycle(color=[cmap(k) for k in np.linspace(cmx,cmy,2)])
with warnings.catch_warnings():
warnings.filterwarnings("ignore","invalid value encountered in (less|multiply|divide|greater)")
if pt=='b':
bmin,bmax = np.nanmin(v),np.nanmax(v); v[0] = 0 #; Ax.set_ylim([-bmax,bmax]);
for i in np.arange(v.shape[0]):
if v[i]>0: Ax.vlines(i,0, v[i],color='darkred', alpha=alpha,linewidth=1.2)
if v[i]<0: Ax.vlines(i,0, v[i],color='steelblue',alpha=alpha,linewidth=1.2)
else:
bmin,bmax = np.nanmin(main),np.nanmax(main); v[0] = 1 #; Ax.set_ylim([-bmax,bmax]);
for i in np.arange(v.shape[0]):
if v[i]== 1: Ax.vlines(i,0, main[i]*0.75,color='darkred', alpha=alpha,linewidth=1.2)
if v[i]==-1: Ax.vlines(i,0,-main[i]*0.75,color='steelblue',alpha=alpha,linewidth=1.2)
Ax.axhline(0,linestyle='-', color='steelblue',alpha=alpha,linewidth=2)
else:
_ = Ax.plot(np.arange(v.shape[0]), v, label=legend, alpha=alpha)
else:
if (np.nanmax(v)+np.nanmin(v)==0)|(pt=='b'): # 如果是信号类型
tmp = np.array(v) ; tmp[v<0] = 0 ; H = np.nansum(tmp,1) ; H[0] = 1 # 累计买点信号
tmp[:] = 0 ; tmp[v<0] = 1 ; L = np.nansum(tmp,1) ; L[0] = 1 # 累计卖点信号
bmin,bmax = np.nanmax(L),np.nanmax(H); Ax.set_ylim([-bmin,bmax*1.33])
if not cmap is None: Ax.set_prop_cycle(color=[cmap(k) for k in np.linspace(cmx,cmy,2)])
for i in np.arange(v.shape[0]):
if H[i]!=0: Ax.vlines(i,0, H[i],color='darkred', alpha=alpha,linewidth=1.2)
if L[i]!=0: Ax.vlines(i,0,-L[i],color='steelblue',alpha=alpha,linewidth=1.2)
Ax.axhline(0,linestyle='-', color='steelblue',alpha=alpha,linewidth=2)
else:
'''检查需要绘制数量,如果小于10则直接绘制'''
if not cmap is None: Ax.set_prop_cycle(color=[cmap(k) for k in np.linspace(cmx,cmy,v.shape[1])])
for i in np.arange(v.shape[1]):
lindex = lindex + 1
if not items is None:
legend = items[1][i] if i < items[1].size else lindex
else:
legend = lindex
if v.shape[1]>maxcount:
if (i>=(maxcount//2))&(i<(v.shape[1]-(maxcount//2))):
legend = None
if focus==i:
_ = Ax.plot(np.arange(v.shape[0]), v[:,i],label=legend,alpha=alpha,linewidth=2)
else:
_ = Ax.plot(np.arange(v.shape[0]), v[:,i],label=legend,alpha=alpha,linewidth=lw)
# 绘制图例
handles, labels = Ax.get_legend_handles_labels()
leg = Ax.legend(handles,labels,bbox_to_anchor=(0.95, .98, 0.05, 0),
loc='upper right',ncol=1,mode='None',handlelength=3,handletextpad=0.2,borderpad = 0.2)
frame = leg.get_frame()
frame.set_facecolor('0.80') # set the frame face color to light gray
return legend
def maxminmap(x,axis=0,ymin=0,ymax=1,**kwargs):
'''
maxmin 将数据x归一化到任意区间[ymin,ymax]范围的方法
输入参数x:需要被归一化的数据
输入参数ymin:归一化的区间[ymin,ymax]下限
输入参数ymax:归一化的区间[ymin,ymax]上限
输出参数y:归一化到区间[ymin,ymax]的数据
此函数可以指定轴方向计算
'''
if axis!=0:
xmax=np.nanmax(x,axis,**kwargs)[:,None]
xmin=np.nanmin(x,axis,**kwargs)[:,None]
else:
xmax=np.nanmax(x,axis,**kwargs)
xmin=np.nanmin(x,axis,**kwargs)
return (ymax-ymin)*(x-xmin)/(xmax-xmin)+ymin
def mask_to_sign(x):
'''输入mask, 返回信号标记数据'''
sign = x.copy().astype(int) ; sign[[0,-1]] = 0 # 复制一个副本
sign[1:] = sign[1:]-sign[0:-1] # 标记买卖信号
return sign
def sign_to_index(sign):
'''输入1d信号,返回开始结束索引'''
return np.nonzero(sign>0)[0].astype(int),np.nonzero(sign<0)[0].astype(int)
@customize
def hist(self,bins=33,ranges=None,facecolor=None,rwidth=0.8,normed=True,alpha=0.95):
fig = plt.figure(figsize=(6, 3))
order = (self,) if isinstance(self,np.ndarray) else self
for i,f in enumerate(order):
ht = np.asarray(f).ravel();mask = np.isfinite(ht)
if ranges is None:
ranges = tuple([np.min(ht[mask]),np.max(ht[mask])])
if facecolor is None:
plt.hist(ht[mask],bins=bins,range=ranges,histtype='bar',rwidth=rwidth,normed=normed,alpha=alpha)
else:
plt.hist(ht[mask],bins=bins,range=ranges,facecolor=facecolor,histtype='bar',rwidth=rwidth,normed=normed,alpha=alpha)
plt.grid(True ,axis= 'both', which='major', linestyle = "-.", color = "black", linewidth = 0.5, alpha=.22)
plt.show()
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...