这是一个很简单的筹码分布计算,因为看到很多人一直在问这个,所以我简单写了这个例子,但是我不保证是否有用或者说有价值,因为这取决于你,所以不要问我怎么用来选股之类。另外拍砖就免了 这个例子本来就是一个简单的示范,如果你有好的想法和建议欢迎提出。如果你在这个例子基础上有更好的想法和改进意见欢迎交流。
算法中数字化分箱使用的是等价间隔分箱,可以参考本人其他帖子中,分位数分箱函数,会有更好的表现
很多新来的小伙伴大概不会使用自定义库,所以本文附加的统计库使用困难,想想可以先慢慢熟悉以后再说,这里对代码做一些修改,可以不使用后面的统计库实现原有功能。如下:
# 取收盘价
df = get_price('000001.XSHE', end_date='2018-07-13',frequency='1d',fields=['close'],count=60)
# 取换手率
tr = get_fundamentals_continuously(query(valuation.turnover_ratio).filter(valuation.code.in_(['000001.XSHE'])),\
end_date='2018-07-13',count=60)['turnover_ratio']
# 换手率转数组
ratio = (np.nan_to_num(tr.values)/100).ravel()
# 收盘价转数组
close = df.values.ravel()
# 计算累积换手率
ratio[0:-1] *= np.cumprod(1-ratio[::-1],0)[::-1][1:]
def cost_distribution(x,c,q=60):
'''对商品价格进行数字化分组'''
cuts = cut(x,q) # x是价格序列
# 获取数组尺寸,取得有效值(分组号及筹码表)
mask = np.isfinite(cuts) & np.isfinite(c)
# 标记无效值
cuts[~mask] = q
# 汇总统计
ctbe = np.bincount(cuts.astype(int64)[mask],weights=c[mask])
# 统计结果
return ctbe
# 筹码分布统计
cost = cost_distribution(close,ratio)
import matplotlib.pyplot as plt
from grouping import group_by
#set_printoptions设置输出精度
np.set_printoptions(precision=9)
#formatter参数允许你指定一个格式为每个类型的函数(注意结尾补0了)
np.set_printoptions(formatter={'float': '{: 0.3f}'.format})
#消除小的数字使用科学记数法
np.set_printoptions(suppress=True)
#设置显示宽度
np.set_printoptions(linewidth=220)
# 取收盘价
df = get_price('000001.XSHE', start_date='2018-04-18', end_date='2018-07-12', frequency='1d', fields=['close'])
# 取换手率
tr = get_fundamentals_continuously(query(valuation.turnover_ratio).filter(valuation.code.in_(['000001.XSHE'])),\
end_date='2018-07-13',count=60)['turnover_ratio']
# 换手率转数组
ratio = (np.nan_to_num(tr.values)/100).ravel()
# 收盘价转数组
close = df.values.ravel()
# 计算累积换手率
ratio[0:-1] *= np.cumprod(1-ratio[::-1],0)[::-1][1:]
# 筹码分布统计
cost = cost_distribution(close,ratio)
fig, ax = plt.subplots()
# 显示价格图形
ax.plot(np.arange(close.shape[0]),close)
[<matplotlib.lines.Line2D at 0x7fc6bf3b3b10>]
fig, ax = plt.subplots()
# 显示分布图
ax.barh(np.arange(cost.shape[0]),cost, height=0.3, align='center', color='#AAAAAA')
<Container object of 60 artists>
def cut(x, q, axis=0):
''' 多维数字化分箱函数
通过线性变换到列直接对多维数据进行分箱
x : 1d/2d二维数据(按轴)
q : 分箱数/如未提供则默认为1%100间隔数
axis : 支持指定轴计算
'''
with np.warnings.catch_warnings():
'''大数据计算中无法避免传入全nan的数据因此直接忽略'''
np.warnings.filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
x = np.asarray(x)
if axis==0:
f,l = np.nanmin(x, axis=axis),np.nanmax(x, axis=axis)
else:
f,l = np.nanmin(x, axis=axis)[:,None],np.nanmax(x, axis=axis)[:,None]
if q==None:
q = np.nanmean((l-f)/f)*100
return np.rint((q-1)*(x-f)/(l-f))
def cost_distribution(x,c,q=60):
'''对商品价格进行数字化分组'''
cuts = cut(x,q) # x是价格序列
# 获取数组尺寸,取得有效值(分组号及筹码表)
shps = x.shape; mask = np.isfinite(cuts) & np.isfinite(c)
# 输出申请个空数组
rets = np.full(15,np.nan)
# 汇总统计,q分布价的筹码数量
unique, ctbe = group_by(cuts[mask].ravel(),hold=q).sum(c[mask].ravel())
# 统计结果
return ctbe
# -*- coding: utf-8 -*-
"""grouping module"""
from __future__ import absolute_import, division, print_function, unicode_literals
from builtins import *
from functools import reduce
import itertools
import numpy as np
__author__ = "Eelco Hoogendoorn"
__license__ = "LGPL"
__email__ = "hoogendoorn.eelco@gmail.com"
class GroupBy(object):
"""
GroupBy class
contains an index of keys, and extends the index functionality with grouping-specific functionality
"""
def __init__(self, keys, hold, axis=0):
"""
Parameters
----------
keys : indexable object
sequence of keys to group by
axis : int, optional
axis to regard as the key-sequence, in case keys is multi-dimensional
See Also
--------
numpy_indexed.as_index : for information regarding the casting rules to a valid Index object
"""
self.index = as_index(keys, axis)
self.hold = hold
@property
def unique(self):
"""unique keys"""
return self.index.unique
@property
def count(self):
"""count of each unique key"""
return self.index.count
@property
def inverse(self):
"""mapping such that unique[inverse]==keys"""
return self.index.inverse
@property
def groups(self):
"""int, number of groups formed by the keys"""
return self.index.groups
@property
def shape(self):
return self.index.shape
#some different methods of chopping up a set of values by key
def split_iterable_as_iterable(self, values):
"""Group iterable into iterables, in the order of the keys
Parameters
----------
values : iterable of length equal to keys
iterable of values to be grouped
Yields
------
iterable of items in values
Notes
-----
Memory consumption depends on the amount of sorting required
Worst case, if index.sorter[-1] = 0, we need to consume the entire value iterable,
before we can start yielding any output
But to the extent that the keys are already sorted, the grouping is lazy
"""
values = iter(enumerate(values))
cache = dict()
def get_value(ti):
try:
return cache.pop(ti)
except:
while True:
i, v = next(values)
if i==ti:
return v
cache[i] = v
s = iter(self.index.sorter)
for c in self.count:
yield (get_value(i) for i in itertools.islice(s, int(c)))
def split_iterable_as_unordered_iterable(self, values):
"""Group iterable into iterables, without regard for the ordering of self.index.unique
key-group tuples are yielded as soon as they are complete
Parameters
----------
values : iterable of length equal to keys
iterable of values to be grouped
Yields
------
tuple of key, and a list of corresponding items in values
Notes
-----
This approach is lazy, insofar as grouped values are close in their iterable
"""
from collections import defaultdict
cache = defaultdict(list)
count = self.count
unique = self.unique
key = (lambda i: unique[i]) if isinstance(unique, np.ndarray) else (lambda i: tuple(c[i] for c in unique))
for i,v in zip(self.inverse, values):
cache[i].append(v)
if len(cache[i]) == count[i]:
yield key(i), cache.pop(i)
def split_sequence_as_iterable(self, values):
"""Group sequence into iterables
Parameters
----------
values : iterable of length equal to keys
iterable of values to be grouped
Yields
------
iterable of items in values
Notes
-----
This is the preferred method if values has random access, but we dont want it completely in memory.
Like a big memory mapped file, for instance
"""
s = iter(self.index.sorter)
for c in self.count:
yield (values[i] for i in itertools.islice(s, int(c)))
def split_array_as_array(self, values):
"""Group ndarray into ndarray by means of reshaping
Parameters
----------
values : ndarray_like, [index.size, ...]
Returns
-------
ndarray, [groups, group_size, ...]
values grouped by key
Raises
------
AssertionError
This operation is only possible if index.uniform==True
"""
if not self.index.uniform:
raise ValueError("Array can only be split as array if all groups have the same size")
values = np.asarray(values)
values = values[self.index.sorter]
return values.reshape(self.groups, -1, *values.shape[1:])
def split_array_as_list(self, values):
"""Group values as a list of arrays, or a jagged-array
Parameters
----------
values : ndarray, [keys, ...]
Returns
-------
list of length self.groups of ndarray, [key_count, ...]
"""
values = np.asarray(values)
values = values[self.index.sorter]
return np.split(values, self.index.slices[1:-1], axis=0)
def split(self, values):
"""some sensible defaults"""
try:
return self.split_array_as_array(values)
except:
# FIXME: change to iter in python 3?
return self.split_array_as_list(values)
def __call__(self, values):
"""not sure how i feel about this. explicit is better than implict?"""
return self.pivot_table(self.unique, self.split(values))
# ufunc based reduction methods. should they return unique keys by default?
def reduce(self, values, operator=np.add, axis=0, dtype=None):
"""Reduce the values over identical key groups, using the given ufunc
reduction is over the first axis, which should have elements corresponding to the keys
all other axes are treated indepenently for the sake of this reduction
Parameters
----------
values : ndarray, [keys, ...]
values to perform reduction over
operator : numpy.ufunc
a numpy ufunc, such as np.add or np.sum
axis : int, optional
the axis to reduce over
dtype : output dtype
Returns
-------
ndarray, [groups, ...]
values reduced by operator over the key-groups
"""
values = np.take(values, self.index.sorter, axis=axis)
return operator.reduceat(values, self.index.start, axis=axis, dtype=dtype)
def sum(self, values, axis=0, dtype=None):
"""compute the sum over each group
Parameters
----------
values : array_like, [keys, ...]
values to sum per group
axis : int, optional
alternative reduction axis for values
dtype : output dtype
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
values = np.asarray(values)
return self.pivot_table(self.unique, self.reduce(values, axis=axis, dtype=dtype))
def prod(self, values, axis=0, dtype=None):
"""compute the product over each group
Parameters
----------
values : array_like, [keys, ...]
values to multiply per group
axis : int, optional
alternative reduction axis for values
dtype : output dtype
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
values = np.asarray(values)
return self.pivot_table(self.unique, self.reduce(values, axis=axis, dtype=dtype, operator=np.multiply))
def mean(self, values, axis=0, weights=None, dtype=None):
"""compute the mean over each group
Parameters
----------
values : array_like, [keys, ...]
values to take average of per group
axis : int, optional
alternative reduction axis for values
weights : ndarray, [keys, ...], optional
weight to use for each value
dtype : output dtype
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
values = np.asarray(values)
if weights is None:
result = self.reduce(values, axis=axis, dtype=dtype)
shape = [1] * values.ndim
shape[axis] = self.groups
weights = self.count.reshape(shape)
else:
weights = np.asarray(weights)
result = self.reduce(values * weights, axis=axis, dtype=dtype)
weights = self.reduce(weights, axis=axis, dtype=dtype)
return self.pivot_table(self.unique, result / weights)
def var(self, values, axis=0, weights=None, dtype=None):
"""计算每个组的方差
Parameters
----------
values : array_like, [keys, ...]
values to take variance of per group
axis : int, optional
alternative reduction axis for values
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
values = np.asarray(values)
unique, mean = self.mean(values, axis, weights, dtype)
err = values - mean.take(self.inverse, axis)
if weights is None:
shape = [1] * values.ndim
shape[axis] = self.groups
group_weights = self.count.reshape(shape)
var = self.reduce(err ** 2, axis=axis, dtype=dtype)
else:
weights = np.asarray(weights)
group_weights = self.reduce(weights, axis=axis, dtype=dtype)
var = self.reduce(weights * err ** 2, axis=axis, dtype=dtype)
return self.pivot_table(unique, var / group_weights)
def std(self, values, axis=0, weights=None, dtype=None):
"""standard deviation over each group
Parameters
----------
values : array_like, [keys, ...]
values to take standard deviation of per group
axis : int, optional
alternative reduction axis for values
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
unique, var = self.var(values, axis, weights, dtype)
return self.pivot_table(unique, np.sqrt(var))
def median(self, values, axis=0, average=True):
"""计算每个组的中值
Parameters
----------
values : array_like, [keys, ...]
values to compute the median of per group
axis : int, optional
alternative reduction axis for values
average : bool, optional
when average is true, the average of the two central values is taken for groups with an even key-count
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
mid_2 = self.index.start + self.index.stop
hi = (mid_2 ) // 2
lo = (mid_2 - 1) // 2
#need this indirection for lex-index compatibility
sorted_group_rank_per_key = self.index.sorted_group_rank_per_key
def median1d(slc):
#place values at correct keys; preconditions the upcoming lexsort
slc = slc[self.index.sorter]
#refine value sorting within each keygroup
sorter = np.lexsort((slc, sorted_group_rank_per_key))
slc = slc[sorter]
return (slc[lo]+slc[hi]) / 2 if average else slc[hi]
values = np.asarray(values)
if values.ndim>1: #is trying to skip apply_along_axis somewhat premature optimization?
values = np.apply_along_axis(median1d, axis, values)
else:
values = median1d(values)
return self.pivot_table(self.unique, values)
def min(self, values, axis=0):
"""return the minimum within each group
Parameters
----------
values : array_like, [keys, ...]
values to take minimum of per group
axis : int, optional
alternative reduction axis for values
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
values = np.asarray(values)
return self.pivot_table(self.unique, self.reduce(values, np.minimum, axis))
def max(self, values, axis=0):
"""return the maximum within each group
Parameters
----------
values : array_like, [keys, ...]
values to take maximum of per group
axis : int, optional
alternative reduction axis for values
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
values = np.asarray(values)
return self.pivot_table(self.unique, self.reduce(values, np.maximum, axis))
def first(self, values, axis=0):
"""在第一次出现相关键时返回值
Parameters
----------
values : array_like, [keys, ...]
values to pick the first value of per group
axis : int, optional
alternative reduction axis for values
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
values = np.asarray(values)
return first(self.unique, np.take(values, self.index.sorter[self.index.start], axis))
def last(self, values, axis=0):
"""返回与其关联的键最后出现的值
Parameters
----------
values : array_like, [keys, ...]
values to pick the last value of per group
axis : int, optional
alternative reduction axis for values
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...]
value array, reduced over groups
"""
values = np.asarray(values)
return first(self.unique, np.take(values, self.index.sorter[self.index.stop-1], axis))
def any(self, values, axis=0):
"""compute if any item evaluates to true in each group
Parameters
----------
values : array_like, [keys, ...]
values to take boolean predicate over per group
axis : int, optional
alternative reduction axis for values
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...], np.bool
value array, reduced over groups
"""
values = np.asarray(values)
if not values.dtype == np.bool:
values = values != 0
return first(self.unique, self.reduce(values, axis=axis) > 0)
def all(self, values, axis=0):
"""compute if all items evaluates to true in each group
Parameters
----------
values : array_like, [keys, ...]
values to take boolean predicate over per group
axis : int, optional
alternative reduction axis for values
Returns
-------
unique: ndarray, [groups]
unique keys
reduced : ndarray, [groups, ...], np.bool
value array, reduced over groups
"""
values = np.asarray(values)
return first(self.unique, self.reduce(values, axis=axis, operator=np.multiply) != 0)
def argmin(self, values):
"""return the index into values corresponding to the minimum value of the group
Parameters
----------
values : array_like, [keys]
values to pick the argmin of per group
Returns
-------
unique: ndarray, [groups]
unique keys
argmin : ndarray, [groups]
index into value array, representing the argmin per group
"""
keys, minima = self.min(values)
minima = minima[self.inverse]
# select the first occurence of the minimum in each group
index = as_index((self.inverse, values == minima))
return keys, index.sorter[index.start[-self.groups:]]
def argmax(self, values):
"""return the index into values corresponding to the maximum value of the group
Parameters
----------
values : array_like, [keys]
values to pick the argmax of per group
Returns
-------
unique: ndarray, [groups]
unique keys
argmax : ndarray, [groups]
index into value array, representing the argmax per group
"""
keys, maxima = self.max(values)
maxima = maxima[self.inverse]
# select the first occurence of the maximum in each group
index = as_index((self.inverse, values == maxima))
return keys, index.sorter[index.start[-self.groups:]]
#implement iter interface? could simply do zip( group_by(keys)(values)), no?
def pivot_table(self, unique, median):
label = [] ; inver = [] ; shape = []
if not self.hold is None:
if isinstance(unique, tuple):
for i,u in enumerate(unique):
if isinstance(self.hold[i],int):
hold = np.arange(self.hold[i])
else:
hold = np.asarray(self.hold[i])
inver.append(np.searchsorted(hold,u))
shape.append(hold.size)
table = np.full(shape,np.nan)
else:
if isinstance(self.hold,int):
hold = np.arange(self.hold)
else:
hold = np.asarray(self.hold)
inver = np.searchsorted(hold,unique)
table = np.full(hold.size,np.nan)
table[inver] = median
return hold,table
else:
if isinstance(unique, tuple):
for u in unique:
uni,pos = np.unique(u, return_inverse=True)
label.append(uni)
inver.append(pos)
shape.append(uni.size)
table = np.full(shape,np.nan)
else:
uni,pos = np.unique(unique, return_inverse=True)
label = uni
inver = pos
table = np.full(uni.size,np.nan)
table[inver] = median
return label,table
def group_by(keys, values=None, reduction=None, hold=None, axis=0):
"""construct a grouping object on the given keys, optionally performing the given reduction on the given values
Parameters
----------
keys : indexable object
keys to group by
values : array_like, optional
sequence of values, of the same length as keys
if a reduction function is provided, the given values are reduced by key
if no reduction is provided, the given values are grouped and split by key
reduction : lambda, optional
reduction function to apply to the values in each group
axis : int, optional
axis to regard as the key-sequence, in case keys is multi-dimensional
Returns
-------
iterable
if values is None, a GroupBy object of the given keys object
if reduction is None, an tuple of a sequence of unique keys and a sequence of grouped values
else, a sequence of tuples of unique keys and reductions of values over that key-group
See Also
--------
numpy_indexed.as_index : for information regarding the casting rules to a valid Index object
"""
g = GroupBy(keys,hold,axis)
if values is None:
return g
groups = g.split(values)
if reduction is None:
return g.unique, groups
return [(key, reduction(group)) for key, group in zip(g.unique, groups)]
class BaseIndex(object):
"""
minimal indexing functionality
only provides unique and counts, but with optimal performance
no grouping, or lex-keys are supported,
or anything that would require an indirect sort
"""
def __init__(self, keys):
"""
keys is a flat array of possibly composite type
"""
self._keys = np.asarray(keys).flatten()
self.sorted = np.sort(self._keys)
#the slicing points of the bins to reduce over
if self.size == 0:
self.flag = np.empty(0, np.bool)
self.slices = np.empty(0, np.int)
else:
self.flag = self.sorted[:-1] != self.sorted[1:]
self.slices = np.concatenate((
[0],
np.flatnonzero(self.flag)+1,
[self.size]))
print ('base')
@property
def keys(self):
return self._keys
@property
def sorted_keys(self):
return self.sorted
@property
def size(self):
"""number of keys"""
return self._keys.size
@property
def start(self):
"""start index of all bins"""
return self.slices[:-1]
@property
def stop(self):
"""stop index of all bins"""
return self.slices[1:]
@property
def unique(self):
"""all unique keys"""
return self.sorted[self.start]
@property
def groups(self):
"""number of unique keys"""
return len(self.start)
@property
def count(self):
"""number of times each key occurs"""
return np.diff(self.slices)
@property
def uniform(self):
"""returns true if each key occurs an equal number of times"""
return not np.any(np.diff(self.count))
class Index(BaseIndex):
"""
index object over a set of keys
adds support for more extensive functionality, notably grouping
relies on indirect sorting
maybe it should be called argindex?
"""
def __init__(self, keys, stable):
"""
keys is a flat array of possibly composite type
if stable is true, stable sorting of the keys is used. stable sorting is required
uf first and last properties are required
"""
self.stable = stable
self._keys = np.asarray(keys)
#find indices which sort the keys; use mergesort for stability, so first and last give correct results
self.sorter = np.argsort(self._keys, kind='mergesort' if self.stable else 'quicksort')
#computed sorted keys
self.sorted = self._keys[self.sorter]
if self.size == 0:
self.flag = np.empty(0, np.bool)
self.slices = np.empty(0, np.int)
else:
#the slicing points of the bins to reduce over
self.flag = self.sorted[:-1] != self.sorted[1:]
self.slices = np.concatenate((
[0],
np.flatnonzero(self.flag)+1,
[self.size]))
@property
def sorted_group_rank_per_key(self):
"""find a better name for this? enumeration of sorted keys. also used in median implementation"""
return np.cumsum(np.concatenate(([False], self.flag)))
@property
def inverse(self):
"""return index array that maps unique values back to original space. unique[inverse]==keys"""
inv = np.empty(self.size, np.int)
inv[self.sorter] = self.sorted_group_rank_per_key
return inv
@property
def rank(self):
"""how high in sorted list each key is. inverse permutation of sorter, such that sorted[rank]==keys"""
r = np.empty(self.size, np.int)
r[self.sorter] = np.arange(self.size)
return r
@property
def index(self):
"""returns indices such that keys[index]==unique
not sure of the use case, but included for backwards compatibility with np.unique"""
return self.sorter[self.start]
def as_index(keys, axis=0, base=False, stable=True):
if isinstance(keys, Index):
return keys #already done here
if isinstance(keys, tuple):
return LexIndex(keys, stable)
try:
keys = np.asarray(keys)
except:
raise TypeError('Given object does not form a valid set of keys')
if axis is None:
keys = keys.flatten()
if keys.ndim==1:
if base:
return BaseIndex(keys)
else:
return Index(keys, stable=stable)
class LexIndex(Index):
def __init__(self, keys, stable):
self._keys = tuple(np.asarray(key) for key in keys) # 将输入键值全部转换为数组
# 如果是复杂键'V'则通过 index 引导
self.sorter = np.lexsort(self._keys)
#computed sorted keys
self.sorted = self.take(self._keys, self.sorter)
if self.size == 0:
self.flag = np.empty(0, np.bool)
self.slices = np.empty(0, np.int)
else:
self.flag = reduce(
np.logical_or,
(s[:-1] != s[1:] for s in self.sorted))
self.slices = np.concatenate((
[0],
np.flatnonzero(self.flag)+1,
[self.size]))
@property
def unique(self):
"""returns a tuple of unique key columns"""
#print (self.sorted)
return tuple(
(s)[self.start]
for s, k in zip(self.sorted, self._keys))
@property
def size(self):
return self.sorter.size
def take(self, keys, indices):
return tuple(key[indices] for key in keys)
def concatenate(self, *others):
return
__all__ = ['group_by']
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...
移动端课程