前几天看博客看到Alex Graves设计的ACT (Adaptive Computation Time Graves, 2016)通过构建包裹函数实现RNN模型在时间序列t可以进行n迭代计算,取权重累加和作为时间序列在时刻t的输出,其核心逻辑为通过拉长RNN的在时间序列上的长度增强RNN模型的复杂性,提高非线性表达。
由于RNN使用tanh激活函数,超过5层的RNN计算成本很大,近两年的论文认为拉长RNN的时间序列长度可以提高RNN模型的效果,通过对一个时刻的RNN反复迭代计算并输出权重和作为时间序列在时刻t的输出,可以变相拉长RNN的长度。ACT模型通过使用时间损失函数来压迫模型学习在有限时间(有限的迭代步长)学习判断输出。
这里使用ACT包裹GRU模型对HS300指数进行预测,对比GRU模型,ACT包裹GRU有一定的效果的提升。
import numpy as np
import pandas as pd
import tensorflow as tf
import talib
import datetime
import matplotlib.pylab as plt
%matplotlib inline
import seaborn as sns
sns.set_style('whitegrid')
tmp = pd.read_csv('G:/QuantCodes/Adaptive_Computation_Time/A1.csv')
Dtmp = pd.read_csv('G:/QuantCodes/Adaptive_Computation_Time/A2.csv')
del tmp['Unnamed: 0']
del Dtmp['Unnamed: 0']
tmp['oc'] = (tmp['close'] - tmp['open'])/tmp['open']
tmp['oh'] = (tmp['high'] - tmp['open'])/tmp['open']
tmp['ol'] = (tmp['low'] - tmp['open'])/tmp['open']
tmp['ch'] = (tmp['high'] - tmp['close'])/tmp['close']
tmp['cl'] = (tmp['low'] - tmp['close'])/tmp['close']
tmp['lh'] = (tmp['high'] - tmp['low'])/tmp['low']
tmp['oc_4'] = (tmp['close'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['oh_4'] = (tmp['high'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['ol_4'] = (tmp['low'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['co_8'] = (tmp['open'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['ch_8'] = (tmp['high'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['cl_8'] = (tmp['low'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['co_80'] = (tmp['open'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['ch_80'] = (tmp['high'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['cl_80'] = (tmp['low'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['ATR14'] = talib.ATR(tmp.high.values, tmp.low.values, tmp.close.values, timeperiod=14)
tmp['ATR6'] = talib.ATR(tmp.high.values, tmp.low.values, tmp.close.values, timeperiod=6)
tmp['EMA6'] = talib.EMA(tmp.close.values, timeperiod=6)
tmp['EMA12'] = talib.EMA(tmp.close.values, timeperiod=12)
tmp['EMA26'] = talib.EMA(tmp.close.values, timeperiod=26)
tmp['tEMA6'] = talib.EMA(tmp.total_turnover.values, timeperiod=6)
tmp['tEMA12'] = talib.EMA(tmp.total_turnover.values, timeperiod=12)
tmp['tEMA26'] = talib.EMA(tmp.total_turnover.values, timeperiod=26)
tmp['VEMA6'] = talib.EMA(tmp.volume.values, timeperiod=6)
tmp['VEMA12'] = talib.EMA(tmp.volume.values, timeperiod=12)
tmp['VEMA26'] = talib.EMA(tmp.volume.values, timeperiod=26)
tmp['MACD_DIF'], tmp['MACD_DEA'], tmp['MACD_bar'] = talib.MACD(
tmp.close.values, fastperiod=12, slowperiod=24, signalperiod=9)
faclist = ['oc', 'oh', 'ol', 'ch', 'cl', 'lh', 'oc_4', 'oh_4', 'ol_4', 'co_8',
'ch_8', 'cl_8', 'co_80', 'ch_80', 'cl_80', 'ATR14', 'ATR6', 'EMA6',
'EMA12', 'EMA26', 'tEMA6', 'tEMA12', 'tEMA26', 'VEMA6', 'VEMA12',
'VEMA26', 'MACD_DIF', 'MACD_DEA', 'MACD_bar']
del_list = ['total_turnover', 'volume', 'open', 'close', 'high', 'low']
# 去极值
for i in faclist:
mean = tmp.ix[:,i].mean()
std = tmp.ix[:,i].std()
tmp.loc[(tmp[i] > mean+3*std),i] = (mean + 3*std)
# 标准化
for i in faclist:
mean = tmp.ix[:,i].mean()
std = tmp.ix[:,i].std()
tmp[i] = (tmp[i] - mean)/std
tmp.drop(labels=del_list, axis=1, inplace=True)
tmp.dropna(inplace= True)
Dtmp['returns'] = Dtmp.close.shift(-17)/Dtmp.close -1.
Dtmp.dropna(inplace=True)
Dtmp['dir_lab'] = 1
Dtmp.loc[Dtmp.returns>0.07,'dir_lab'] = 2
Dtmp.loc[Dtmp.returns<-0.07,'dir_lab'] = 0
Dtmp= Dtmp.iloc[-359:,:]
Dtmp.reset_index(drop= True , inplace= True)
start = Dtmp.Date.values[0]
end = Dtmp.Date.values[-1]
end = datetime.datetime.strptime(end, '%Y-%m-%d')
end = end + datetime.timedelta(days= 1)
end = end.strftime('%Y-%m-%d')
tmp = tmp.ix[(tmp.Date.values>start)&(tmp.Date.values<end)].reset_index(drop=True)
forward = 59
tmpfac = np.zeros((1, forward*16*29))
for i in np.arange(forward, int(len(Dtmp))):
tp = tmp.iloc[16*(i-forward):16*i,1:]
tp = np.array(tp).ravel(order='C').transpose()
tmpfac = np.vstack((tmpfac, tp))
tmpfac = np.delete(tmpfac,0,axis=0)
trainX = tmpfac.reshape([-1,59,16*29])
trainY = (Dtmp.iloc[-300:,-1])
def dense_to_one_hot(labels_dense):
"""标签 转换one hot 编码
输入labels_dense 必须为非负数
2016-11-21
"""
num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
raws_labels = labels_dense.shape[0]
index_offset = np.arange(raws_labels) * num_classes
labels_one_hot = np.zeros((raws_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
trainY = dense_to_one_hot(trainY)
trainY.shape
(300, 3)
batch_size = 100
in_length = 59
in_width = 464
num_units = 128
outclasses = 3
learning_rate = 0.001
max_grad_norm = 5
input_data = tf.placeholder(shape=[None, in_length, in_width], dtype=tf.float32)
targets = tf.placeholder(shape=[None,outclasses], dtype=tf.float32)
test = tf.transpose(input_data, [1,0,2])
test = tf.reshape(test, [-1, in_width])
test = tf.split(value=test, num_or_size_splits=in_length, axis=0)
GRUCell = tf.contrib.rnn.GRUCell(num_units=num_units)
output, state = tf.contrib.rnn.static_rnn(cell=GRUCell, dtype=tf.float32, inputs= test)
final_output = output[-1]
softmax_W = tf.Variable(tf.truncated_normal(shape=([num_units, outclasses]), dtype=tf.float32, name='softmax_W'))
softmax_b = tf.Variable(tf.truncated_normal(shape=([outclasses]), dtype=tf.float32, name='softmax_b'))
final_output = tf.matmul(final_output, softmax_W) + softmax_b
#final_output = tf.nn.softmax(final_output)
final_output
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=final_output, labels=targets))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)
correct_pred = tf.equal(tf.argmax(final_output,1), tf.argmax(targets,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
%%time
loss_list = []
acc_list = []
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for epoch in np.arange(30):
for i in np.arange(int(len(trainX)/batch_size)):
batch_x = trainX[i*batch_size: (i+1)*batch_size]
batch_y = trainY[i*batch_size: (i+1)*batch_size]
sess.run(optimizer, feed_dict={input_data:batch_x, targets:batch_y})
acc, loss = sess.run([accuracy, cost], feed_dict={input_data:batch_x, targets:batch_y})
loss_list.append(loss)
acc_list.append(acc)
Wall time: 23.2 s
tmp = pd.Series(loss_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x4e10908>
tmp = pd.Series(acc_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x16fbfcf8>
tf.Graph().as_default()
<contextlib._GeneratorContextManager at 0x171481d0>
import collections
import math
import tensorflow as tf
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import clip_ops
from tensorflow.python.ops import embedding_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn_ops
from tensorflow.python.ops import partitioned_variables
from tensorflow.python.ops import variable_scope as vs
from tensorflow.python.ops.math_ops import sigmoid
from tensorflow.python.ops.math_ops import tanh
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.util import nest
def _state_size_with_prefix(state_size, prefix=None):
result_state_size = tensor_shape.as_shape(state_size).as_list()
if prefix is not None:
if not isinstance(prefix, list):
raise TypeError("prefix of _state_size_with_prefix should be a list.")
result_state_size = prefix + result_state_size
return result_state_size
def _linear(args, output_size, bias, bias_start=0.0, scope=None):
"""Linear map: sum_i(args[i] * W[i]), where W[i] is a variable.
Args:
args: a 2D Tensor or a list of 2D, batch x n, Tensors.
output_size: int, second dimension of W[i].
bias: boolean, whether to add a bias term or not.
bias_start: starting value to initialize the bias; 0 by default.
scope: VariableScope for the created subgraph; defaults to "Linear".
Returns:
A 2D Tensor with shape [batch x output_size] equal to
sum_i(args[i] * W[i]), where W[i]s are newly created matrices.
Raises:
ValueError: if some of the arguments has unspecified or wrong shape.
"""
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
# Calculate the total size of arguments on dimension 1.
total_arg_size = 0
shapes = [a.get_shape().as_list() for a in args]
for shape in shapes:
if len(shape) != 2:
raise ValueError("Linear is expecting 2D arguments: %s" % str(shapes))
if not shape[1]:
raise ValueError("Linear expects shape[1] of arguments: %s" % str(shapes))
else:
total_arg_size += shape[1]
dtype = [a.dtype for a in args][0]
# Now the computation.
with vs.variable_scope(scope or "Linear"):
matrix = vs.get_variable(
"Matrix", [total_arg_size, output_size], dtype=dtype)
if len(args) == 1:
res = math_ops.matmul(args[0], matrix)
else:
res = math_ops.matmul(array_ops.concat(1, args), matrix)
if not bias:
return res
bias_term = vs.get_variable(
"Bias", [output_size],
dtype=dtype,
initializer=init_ops.constant_initializer(
bias_start, dtype=dtype))
return res + bias_term
class RNNCell(object):
def zero_state(self, batch_size, dtype):
state_size = self.state_size
if nest.is_sequence(state_size):
state_size_flat = nest.flatten(state_size)
zeros_flat = [
array_ops.zeros(
array_ops.stack(_state_size_with_prefix(
s, prefix=[batch_size])),
dtype= dtype)
for s in state_size_flat
]
for s,z in zip(state_size_flat, zeros_flat):
z.set_shape(_state_size_with_prefix(s, prefix=[None]))
zeros = nest.pack_sequence_as(structure=state_size,
flat_sequence=zeros_flat)
else:
zeros_size = _state_size_with_prefix(state_size, prefix=[batch_size])
zeros = array_ops.zeros(array_ops.stack(zeros_size), dtype=dtype)
zeros.set_shape(_state_size_with_prefix(state_size, prefix=[None]))
return zeros
class GRUCell(RNNCell):
"""Gated Recurrent Unit cell (cf. http://arxiv.org/abs/1406.1078)."""
def __init__(self, num_units, input_size=None, activation=tanh):
if input_size is not None:
logging.warn("%s: The input_size parameter is deprecated.", self)
self._num_units = num_units
self._activation = activation
@property
def state_size(self):
return self._num_units
@property
def output_size(self):
return self._num_units
def __call__(self, inputs, state, scope=None):
"""Gated recurrent unit (GRU) with nunits cells."""
with vs.variable_scope(scope or "gru_cell"):
with vs.variable_scope("gates"): # Reset gate and update gate.
# We start with bias of 1.0 to not reset and not update.
r, u = array_ops.split(
value=_linear(
[inputs, state], 2 * self._num_units, True, 1.0),
num_or_size_splits=2,
axis=1)
r, u = sigmoid(r), sigmoid(u)
with vs.variable_scope("candidate"):
c = self._activation(_linear([inputs, r * state],
self._num_units, True))
new_h = u * state + (1 - u) * c
return new_h, new_h
class ACTCell(tf.contrib.rnn.RNNCell):
def __init__(self,
num_units,
cell,
epsilon,
max_computation,
batch_size,
sigmoid_output=False
):
self._num_units = num_units
self.cell = cell
self.batch_size = batch_size
self.one_sub_epsilon = tf.constant(value=1.-epsilon, shape=[self.batch_size])
self.N = tf.constant(value=max_computation, dtype=tf.float32, shape=[self.batch_size])
self.sigmoid_output = sigmoid_output
self.ACT_Rt = []
self.ACT_n = []
@property
def input_size(self):
return self._num_units
@property
def output_size(self):
return self._num_units
@property
def state_size(self):
return self._num_units
def act_step(self,
accumulation_ht_n_sub_1,
condiction_1_stop_signal,
x_t_input,
state_t_n_sub_1,
n,
previous_step_modify_the_stop_signal,
state_t,
output_t):
"""
Equation 3
二进制标签,设置标志区别对于模型是否是t时刻的第一次输入第一次输入,
标签为1 表示在t时刻第一次输入xinput
标签为0 表示在t时刻这不是第一次输入xinput数据,这是重复计算。
标签矩阵由 htn 的累加和决定,如果 accumulation_htn 在该批次全部为0,
则认为这是在t时刻第一次输入数据,不是自适应的重复计算。
注意equation 3 中xtn只有在第一次输入的时候才有delta=1
"""
binary_flag = tf.cond(pred= tf.reduce_all(tf.equal(accumulation_ht_n_sub_1,0.)),
fn1= lambda: tf.ones(shape=[self.batch_size,1], dtype=tf.float32),
fn2= lambda: tf.zeros(shape=[self.batch_size,1], dtype=tf.float32))
x_t_n_input = tf.concat(values=[binary_flag, x_t_input], axis=1)
output_t_n, state_t_n = tf.contrib.rnn.static_rnn(cell=self.cell, inputs=[x_t_n_input],
initial_state= state_t_n_sub_1, scope=type(self.cell).__name__)
"""
equation 5
计算 halting unit htn
修改格式为1维
"""
with tf.variable_scope('halting_unit'):
ht_n = tf.sigmoid(_linear(args= state_t_n, output_size= 1, bias=True, bias_start=.1))
ht_n = tf.squeeze(ht_n)
"""
equation 6 && equation 13
equation 13 可以转换为判断
1> htn在n步的累加和 < 1 - epsilon 也就是 n < min{} eq6
2> n < self.N
"""
condition_1 = tf.less(accumulation_ht_n_sub_1 + ht_n, self.one_sub_epsilon)
#condition_1_float = tf.cast(x=condition_1, dtype=tf.float32)
modify_the_stop_signal = tf.logical_and(x=condition_1, y= previous_step_modify_the_stop_signal)
modify_the_stop_signal_float = tf.cast(x=modify_the_stop_signal, dtype=tf.float32)
accumulation_ht_n = ht_n*modify_the_stop_signal_float + accumulation_ht_n_sub_1
# 只有在pt_n第一次切换到R(t)起作用的停止信号
# 对batch 训练最后一个pt_n切换到R(t)的时候起作用
condiction_1_stop_signal += ht_n * tf.cast(x=previous_step_modify_the_stop_signal, dtype=tf.float32)
n = n + modify_the_stop_signal_float
condition_2 = tf.less(x=n, y=self.N)
"""
条件1 条件2 同时成立 p_t_n = h_t_n
否则 p_t_n = R_t
"""
p_t_n_select = tf.logical_and(condition_1, condition_2)
R_t = tf.expand_dims(input=1.-accumulation_ht_n_sub_1, axis=-1)
ht_n = tf.expand_dims(input=ht_n, axis=-1)
p_t_n = tf.where(condition=p_t_n_select, x=ht_n, y=R_t)
signal_select_channel = tf.expand_dims(input=tf.cast(x=previous_step_modify_the_stop_signal, dtype=tf.float32), axis=-1)
state_t = (state_t_n * p_t_n * signal_select_channel) + state_t
output_t = (output_t_n * p_t_n * signal_select_channel) + output_t
state_t = tf.squeeze(state_t)
output_t = tf.squeeze(output_t)
return [
accumulation_ht_n,
condiction_1_stop_signal,
x_t_input,
state_t_n,
n,
modify_the_stop_signal,
state_t,
output_t
]
"""
上面的act_step设计缺陷在使用 batch 训练的时候会出现在同一batch内 n 停止数值不同,
假设 batch_size = 128, 也就是使用SGD训练的时候同时对128行数据进行计算,
这个时候出现对于ACT迭代模型出现问题为,可能在 n = 8 的时候 row_1就达到条件切换到R(t)
但是row_2没有达到停止条件,这个时候在 n = 9的时候出现问题,也就是这个时候上面的算法,
在n = 9的情况下继续进行计算,这个时候出现问题是了, 对于row_1 而言在n=8已经切换到ptn = R(t)
在n = 9的时候 row_1还会继续运算,这个时候出现问题是,row_1 会累加第二个 R(t),而不是停止运算,
这里为了处理这个问题,将设置batch_mask 在n=9的时候, 设置row_1 累加0解决这个问题。
注意在使用修正信号的时候注意最后处理前一步的修正信号
因为修正信号_float 是0,1 表示 False,True所以 最终更新state_t和y_t的时候可以设置乘以0 来修正额外的更新错误,
但是这个地方需要注意的是,只要在步骤n p_t_n从h_t_n切换到R(t) 修正信号就为False 这是表示成float的信号就是0,
但是第一次切换到R(t)的时候 n步骤的修正信号为0,n-1步骤修正信号为1,这个时候需要* n-1步骤的修正信号放行第一次出现R(t)
"""
def __call__(self, Xinput, state, timestep=0, scope=None):
with tf.variable_scope(scope or type(self).__name__):
accumulation_ht_n = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_halting_probability')
condiction_1_stop_signal = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_condiction_1_stop_signal')
n = tf.constant(value=0, dtype=tf.float32, shape=[self.batch_size], name='ininital_counter')
modify_the_stop_signal = tf.constant(value=True, dtype=tf.bool, shape=[self.batch_size])
cache_output_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_output_t')
cache_state_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_state_t')
def halting_function(accumulation_ht_n, condiction_1_stop_signal, x_t_input, state_t_n, n, modify_the_stop_signal, state_t, output_t):
return tf.reduce_any(tf.logical_and(tf.less(n, self.N), tf.less(condiction_1_stop_signal, self.one_sub_epsilon)))
Rt,_,_,_, n, _, state_t, output_t = tf.while_loop(cond=halting_function, body=self.act_step,
loop_vars=[accumulation_ht_n, condiction_1_stop_signal, Xinput, state, n, modify_the_stop_signal, cache_state_t, cache_output_t])
self.ACT_Rt.append(tf.reduce_sum(1.- Rt))
self.ACT_n.append(tf.reduce_sum(n))
if self.sigmoid_output:
output_t = tf.sigmoid(_linear(args=output_t, output_size=1, bias=True, bias_start=.1))
return output_t, state_t
def calculate_ponder_cost(self, time_penalty=0.01):
return time_penalty * tf.reduce_sum(
tf.add_n(self.ACT_Rt)/len(self.ACT_Rt) +
tf.to_float(tf.add_n(self.ACT_n)/len(self.ACT_n))
)
def test(self, Xinput, state, timestep=0, scope=None):
with tf.variable_scope(scope or type(self).__name__):
accumulation_ht_n = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_halting_probability')
condiction_1_stop_signal = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_condiction_1_stop_signal')
n = tf.constant(value=0, dtype=tf.float32, shape=[self.batch_size], name='ininital_counter')
modify_the_stop_signal = tf.constant(value=True, dtype=tf.bool, shape=[self.batch_size])
cache_output_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_output_t')
cache_state_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_state_t')
def halting_function(accumulation_ht_n, condiction_1_stop_signal, x_t_input, state_t_n, n, modify_the_stop_signal, state_t, output_t):
return tf.reduce_any(tf.logical_and(tf.less(n, self.N), tf.less(condiction_1_stop_signal, self.one_sub_epsilon)))
print ('accumulation_ht_n',accumulation_ht_n.get_shape().as_list())
print ('condiction_1_stop_signal', condiction_1_stop_signal.get_shape().as_list())
print ('n',n.get_shape().as_list())
print ('modify_the_stop_signal', modify_the_stop_signal.get_shape().as_list())
print ('cache_output_t', cache_output_t.get_shape().as_list())
print ('cache_state_t', cache_state_t.get_shape().as_list())
self.act_step(accumulation_ht_n_sub_1 = accumulation_ht_n,
condiction_1_stop_signal = condiction_1_stop_signal,
x_t_input = Xinput,
state_t_n_sub_1 = state,
n = n,
previous_step_modify_the_stop_signal = modify_the_stop_signal,
state_t = cache_state_t,
output_t = cache_output_t)
print ('-----------------')
print ('accumulation_ht_n',accumulation_ht_n.get_shape().as_list())
print ('condiction_1_stop_signal', condiction_1_stop_signal.get_shape().as_list())
print ('n',n.get_shape().as_list())
print ('modify_the_stop_signal', modify_the_stop_signal.get_shape().as_list())
print ('cache_output_t', cache_output_t.get_shape().as_list())
print ('cache_state_t', cache_state_t.get_shape().as_list())
batch_size = 100
in_length = 59
in_width = 464
num_units = 128
outclasses = 3
learning_rate = 0.001
max_grad_norm = 5
input_data = tf.placeholder(shape=[None, in_length, in_width], dtype=tf.float32)
targets = tf.placeholder(shape=[None,outclasses], dtype=tf.float32)
GRU = tf.contrib.rnn.GRUCell(num_units= num_units)
ACT = ACTCell(num_units=num_units, cell=GRU, epsilon=0.01, max_computation=50, batch_size=batch_size)
test = tf.transpose(input_data, [1,0,2])
test = tf.reshape(test, [-1, in_width])
test = tf.split(value=test, num_or_size_splits=in_length, axis=0)
output, state = tf.contrib.rnn.static_rnn(cell=ACT, inputs=test, dtype=tf.float32)
final_output = output[-1]
softmax_W = tf.Variable(tf.truncated_normal(shape=([num_units, outclasses]), dtype=tf.float32, name='softmax_W'))
softmax_b = tf.Variable(tf.truncated_normal(shape=([outclasses]), dtype=tf.float32, name='softmax_b'))
final_output = tf.matmul(final_output, softmax_W) + softmax_b
ponder_loss = ACT.calculate_ponder_cost(time_penalty=.01)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=final_output, labels=targets))
cost = cost + ponder_loss
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars), max_grad_norm)
optimizer = tf.train.AdamOptimizer(learning_rate)
optimizer = optimizer.apply_gradients(zip(grads, tvars))
correct_pred = tf.equal(tf.argmax(final_output,1), tf.argmax(targets,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
%%time
loss_list = []
acc_list = []
pon_list = []
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for epoch in np.arange(30):
for i in np.arange(int(len(trainX)/batch_size)):
batch_x = trainX[i*batch_size: (i+1)*batch_size]
batch_y = trainY[i*batch_size: (i+1)*batch_size]
sess.run(optimizer, feed_dict={input_data:batch_x, targets:batch_y})
pon, acc, loss = sess.run([ponder_loss, accuracy, cost], feed_dict={input_data:batch_x, targets:batch_y})
loss_list.append(loss)
acc_list.append(acc)
pon_list.append(pon)
Wall time: 1min 20s
tmp = pd.Series(loss_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x16fcac18>
tmp = pd.Series(acc_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x4122b978>
tmp = pd.Series(pon_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x17150b38>
tf.Graph().as_default()
<contextlib._GeneratorContextManager at 0x1712f6a0>
import numpy as np
import pandas as pd
import tensorflow as tf
import talib
import datetime
import matplotlib.pylab as plt
%matplotlib inline
import seaborn as sns
sns.set_style('whitegrid')
tmp = pd.read_csv('G:/QuantCodes/Adaptive_Computation_Time/A1.csv')
Dtmp = pd.read_csv('G:/QuantCodes/Adaptive_Computation_Time/A2.csv')
del tmp['Unnamed: 0']
del Dtmp['Unnamed: 0']
tmp['oc'] = (tmp['close'] - tmp['open'])/tmp['open']
tmp['oh'] = (tmp['high'] - tmp['open'])/tmp['open']
tmp['ol'] = (tmp['low'] - tmp['open'])/tmp['open']
tmp['ch'] = (tmp['high'] - tmp['close'])/tmp['close']
tmp['cl'] = (tmp['low'] - tmp['close'])/tmp['close']
tmp['lh'] = (tmp['high'] - tmp['low'])/tmp['low']
tmp['oc_4'] = (tmp['close'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['oh_4'] = (tmp['high'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['ol_4'] = (tmp['low'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['co_8'] = (tmp['open'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['ch_8'] = (tmp['high'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['cl_8'] = (tmp['low'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['co_80'] = (tmp['open'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['ch_80'] = (tmp['high'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['cl_80'] = (tmp['low'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['ATR14'] = talib.ATR(tmp.high.values, tmp.low.values, tmp.close.values, timeperiod=14)
tmp['ATR6'] = talib.ATR(tmp.high.values, tmp.low.values, tmp.close.values, timeperiod=6)
tmp['EMA6'] = talib.EMA(tmp.close.values, timeperiod=6)
tmp['EMA12'] = talib.EMA(tmp.close.values, timeperiod=12)
tmp['EMA26'] = talib.EMA(tmp.close.values, timeperiod=26)
tmp['tEMA6'] = talib.EMA(tmp.total_turnover.values, timeperiod=6)
tmp['tEMA12'] = talib.EMA(tmp.total_turnover.values, timeperiod=12)
tmp['tEMA26'] = talib.EMA(tmp.total_turnover.values, timeperiod=26)
tmp['VEMA6'] = talib.EMA(tmp.volume.values, timeperiod=6)
tmp['VEMA12'] = talib.EMA(tmp.volume.values, timeperiod=12)
tmp['VEMA26'] = talib.EMA(tmp.volume.values, timeperiod=26)
tmp['MACD_DIF'], tmp['MACD_DEA'], tmp['MACD_bar'] = talib.MACD(
tmp.close.values, fastperiod=12, slowperiod=24, signalperiod=9)
faclist = ['oc', 'oh', 'ol', 'ch', 'cl', 'lh', 'oc_4', 'oh_4', 'ol_4', 'co_8',
'ch_8', 'cl_8', 'co_80', 'ch_80', 'cl_80', 'ATR14', 'ATR6', 'EMA6',
'EMA12', 'EMA26', 'tEMA6', 'tEMA12', 'tEMA26', 'VEMA6', 'VEMA12',
'VEMA26', 'MACD_DIF', 'MACD_DEA', 'MACD_bar']
del_list = ['total_turnover', 'volume', 'open', 'close', 'high', 'low']
# 去极值
for i in faclist:
mean = tmp.ix[:,i].mean()
std = tmp.ix[:,i].std()
tmp.loc[(tmp[i] > mean+3*std),i] = (mean + 3*std)
# 标准化
for i in faclist:
mean = tmp.ix[:,i].mean()
std = tmp.ix[:,i].std()
tmp[i] = (tmp[i] - mean)/std
tmp.drop(labels=del_list, axis=1, inplace=True)
tmp.dropna(inplace= True)
Dtmp['returns'] = Dtmp.close.shift(-17)/Dtmp.close -1.
Dtmp.dropna(inplace=True)
Dtmp['dir_lab'] = 1
Dtmp.loc[Dtmp.returns>0.07,'dir_lab'] = 2
Dtmp.loc[Dtmp.returns<-0.07,'dir_lab'] = 0
Dtmp= Dtmp.iloc[-359:,:]
Dtmp.reset_index(drop= True , inplace= True)
start = Dtmp.Date.values[0]
end = Dtmp.Date.values[-1]
end = datetime.datetime.strptime(end, '%Y-%m-%d')
end = end + datetime.timedelta(days= 1)
end = end.strftime('%Y-%m-%d')
tmp = tmp.ix[(tmp.Date.values>start)&(tmp.Date.values<end)].reset_index(drop=True)
forward = 59
tmpfac = np.zeros((1, forward*16*29))
for i in np.arange(forward, int(len(Dtmp))):
tp = tmp.iloc[16*(i-forward):16*i,1:]
tp = np.array(tp).ravel(order='C').transpose()
tmpfac = np.vstack((tmpfac, tp))
tmpfac = np.delete(tmpfac,0,axis=0)
trainX = tmpfac.reshape([-1,59,16*29])
trainY = (Dtmp.iloc[-300:,-1])
def dense_to_one_hot(labels_dense):
"""标签 转换one hot 编码
输入labels_dense 必须为非负数
2016-11-21
"""
num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
raws_labels = labels_dense.shape[0]
index_offset = np.arange(raws_labels) * num_classes
labels_one_hot = np.zeros((raws_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
trainY = dense_to_one_hot(trainY)
trainY.shape
(300, 3)
import tensorflow as tf
import collections
import math
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import clip_ops
from tensorflow.python.ops import embedding_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn_ops
from tensorflow.python.ops import partitioned_variables
from tensorflow.python.ops import variable_scope as vs
from tensorflow.python.ops.math_ops import sigmoid
from tensorflow.python.ops.math_ops import tanh
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.util import nest
def _linear(args, output_size, bias, bias_start=0.0, scope=None):
"""Linear map: sum_i(args[i] * W[i]), where W[i] is a variable.
Args:
args: a 2D Tensor or a list of 2D, batch x n, Tensors.
output_size: int, second dimension of W[i].
bias: boolean, whether to add a bias term or not.
bias_start: starting value to initialize the bias; 0 by default.
scope: VariableScope for the created subgraph; defaults to "Linear".
Returns:
A 2D Tensor with shape [batch x output_size] equal to
sum_i(args[i] * W[i]), where W[i]s are newly created matrices.
Raises:
ValueError: if some of the arguments has unspecified or wrong shape.
"""
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
# Calculate the total size of arguments on dimension 1.
total_arg_size = 0
shapes = [a.get_shape().as_list() for a in args]
for shape in shapes:
if len(shape) != 2:
raise ValueError("Linear is expecting 2D arguments: %s" % str(shapes))
if not shape[1]:
raise ValueError("Linear expects shape[1] of arguments: %s" % str(shapes))
else:
total_arg_size += shape[1]
dtype = [a.dtype for a in args][0]
# Now the computation.
with vs.variable_scope(scope or "Linear"):
matrix = vs.get_variable(
"Matrix", [total_arg_size, output_size], dtype=dtype)
if len(args) == 1:
res = math_ops.matmul(args[0], matrix)
else:
res = math_ops.matmul(array_ops.concat(1, args), matrix)
if not bias:
return res
bias_term = vs.get_variable(
"Bias", [output_size],
dtype=dtype,
initializer=init_ops.constant_initializer(
bias_start, dtype=dtype))
return res + bias_term
class ACTCell(tf.contrib.rnn.RNNCell):
def __init__(self,
num_units,
cell,
epsilon,
max_computation,
batch_size,
sigmoid_output=False
):
self._num_units = num_units
self.cell = cell
self.batch_size = batch_size
self.one_sub_epsilon = tf.constant(value=1.-epsilon, shape=[self.batch_size])
self.N = tf.constant(value=max_computation, dtype=tf.float32, shape=[self.batch_size])
self.sigmoid_output = sigmoid_output
self.ACT_Rt = []
self.ACT_n = []
@property
def input_size(self):
return self._num_units
@property
def output_size(self):
return self._num_units
@property
def state_size(self):
return self._num_units
def act_step(self,
accumulation_ht_n_sub_1,
condiction_1_stop_signal,
x_t_input,
state_t_n_sub_1,
n,
previous_step_modify_the_stop_signal,
state_t,
output_t):
"""
Equation 3
二进制标签,设置标志区别对于模型是否是t时刻的第一次输入第一次输入,
标签为1 表示在t时刻第一次输入xinput
标签为0 表示在t时刻这不是第一次输入xinput数据,这是重复计算。
标签矩阵由 htn 的累加和决定,如果 accumulation_htn 在该批次全部为0,
则认为这是在t时刻第一次输入数据,不是自适应的重复计算。
注意equation 3 中xtn只有在第一次输入的时候才有delta=1
"""
binary_flag = tf.cond(pred= tf.reduce_all(tf.equal(accumulation_ht_n_sub_1,0.)),
fn1= lambda: tf.ones(shape=[self.batch_size,1], dtype=tf.float32),
fn2= lambda: tf.zeros(shape=[self.batch_size,1], dtype=tf.float32))
x_t_n_input = tf.concat(values=[binary_flag, x_t_input], axis=1)
output_t_n, state_t_n = tf.contrib.rnn.static_rnn(cell=self.cell, inputs=[x_t_n_input],
initial_state= state_t_n_sub_1, scope=type(self.cell).__name__)
"""
equation 5
计算 halting unit htn
修改格式为1维
"""
with tf.variable_scope('halting_unit'):
ht_n = tf.sigmoid(_linear(args= state_t_n, output_size= 1, bias=True, bias_start=.1))
ht_n = tf.squeeze(ht_n)
"""
equation 6 && equation 13
equation 13 可以转换为判断
1> htn在n步的累加和 < 1 - epsilon 也就是 n < min{} eq6
2> n < self.N
"""
condition_1 = tf.less(accumulation_ht_n_sub_1 + ht_n, self.one_sub_epsilon)
#condition_1_float = tf.cast(x=condition_1, dtype=tf.float32)
modify_the_stop_signal = tf.logical_and(x=condition_1, y= previous_step_modify_the_stop_signal)
modify_the_stop_signal_float = tf.cast(x=modify_the_stop_signal, dtype=tf.float32)
accumulation_ht_n = ht_n*modify_the_stop_signal_float + accumulation_ht_n_sub_1
# 只有在pt_n第一次切换到R(t)起作用的停止信号
# 对batch 训练最后一个pt_n切换到R(t)的时候起作用
condiction_1_stop_signal += ht_n * tf.cast(x=previous_step_modify_the_stop_signal, dtype=tf.float32)
n = n + modify_the_stop_signal_float
condition_2 = tf.less(x=n, y=self.N)
"""
条件1 条件2 同时成立 p_t_n = h_t_n
否则 p_t_n = R_t
"""
p_t_n_select = tf.logical_and(condition_1, condition_2)
R_t = tf.expand_dims(input=1.-accumulation_ht_n_sub_1, axis=-1)
ht_n = tf.expand_dims(input=ht_n, axis=-1)
p_t_n = tf.where(condition=p_t_n_select, x=ht_n, y=R_t)
signal_select_channel = tf.expand_dims(input=tf.cast(x=previous_step_modify_the_stop_signal, dtype=tf.float32), axis=-1)
state_t = (state_t_n * p_t_n * signal_select_channel) + state_t
output_t = (output_t_n * p_t_n * signal_select_channel) + output_t
state_t = tf.squeeze(state_t)
output_t = tf.squeeze(output_t)
return [
accumulation_ht_n,
condiction_1_stop_signal,
x_t_input,
state_t_n,
n,
modify_the_stop_signal,
state_t,
output_t
]
"""
上面的act_step设计缺陷在使用 batch 训练的时候会出现在同一batch内 n 停止数值不同,
假设 batch_size = 128, 也就是使用SGD训练的时候同时对128行数据进行计算,
这个时候出现对于ACT迭代模型出现问题为,可能在 n = 8 的时候 row_1就达到条件切换到R(t)
但是row_2没有达到停止条件,这个时候在 n = 9的时候出现问题,也就是这个时候上面的算法,
在n = 9的情况下继续进行计算,这个时候出现问题是了, 对于row_1 而言在n=8已经切换到ptn = R(t)
在n = 9的时候 row_1还会继续运算,这个时候出现问题是,row_1 会累加第二个 R(t),而不是停止运算,
这里为了处理这个问题,将设置batch_mask 在n=9的时候, 设置row_1 累加0解决这个问题。
注意在使用修正信号的时候注意最后处理前一步的修正信号
因为修正信号_float 是0,1 表示 False,True所以 最终更新state_t和y_t的时候可以设置乘以0 来修正额外的更新错误,
但是这个地方需要注意的是,只要在步骤n p_t_n从h_t_n切换到R(t) 修正信号就为False 这是表示成float的信号就是0,
但是第一次切换到R(t)的时候 n步骤的修正信号为0,n-1步骤修正信号为1,这个时候需要* n-1步骤的修正信号放行第一次出现R(t)
"""
def __call__(self, Xinput, state, timestep=0, scope=None):
with tf.variable_scope(scope or type(self).__name__):
accumulation_ht_n = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_halting_probability')
condiction_1_stop_signal = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_condiction_1_stop_signal')
n = tf.constant(value=0, dtype=tf.float32, shape=[self.batch_size], name='ininital_counter')
modify_the_stop_signal = tf.constant(value=True, dtype=tf.bool, shape=[self.batch_size])
cache_output_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_output_t')
cache_state_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_state_t')
def halting_function(accumulation_ht_n, condiction_1_stop_signal, x_t_input, state_t_n, n, modify_the_stop_signal, state_t, output_t):
return tf.reduce_any(tf.logical_and(tf.less(n, self.N), tf.less(condiction_1_stop_signal, self.one_sub_epsilon)))
Rt,_,_,_, n, _, state_t, output_t = tf.while_loop(cond=halting_function, body=self.act_step,
loop_vars=[accumulation_ht_n, condiction_1_stop_signal, Xinput, state, n, modify_the_stop_signal, cache_state_t, cache_output_t])
self.ACT_Rt.append(tf.reduce_sum(1.- Rt))
self.ACT_n.append(tf.reduce_sum(n))
if self.sigmoid_output:
output_t = tf.sigmoid(_linear(args=output_t, output_size=1, bias=True, bias_start=.1))
return output_t, state_t
def calculate_ponder_cost(self, time_penalty=0.01):
return time_penalty * tf.reduce_sum(
tf.add_n(self.ACT_Rt)/len(self.ACT_Rt) +
tf.to_float(tf.add_n(self.ACT_n)/len(self.ACT_n))
)
batch_size = 100
in_length = 59
in_width = 464
num_units = 20
outclasses = 3
learning_rate = 0.001
max_grad_norm = 5
dropout = 0.618
input_data = tf.placeholder(shape=[None, in_length, in_width], dtype=tf.float32)
targets = tf.placeholder(shape=[None,outclasses], dtype=tf.float32)
layer_1 = tf.contrib.rnn.GRUCell(num_units=num_units)
layer_1 = tf.contrib.rnn.DropoutWrapper(cell=layer_1, output_keep_prob=1.- dropout)
layer_1 = ACTCell(num_units=num_units, cell=layer_1, epsilon=0.01, max_computation=20, batch_size=batch_size)
layer_2 = tf.contrib.rnn.GRUCell(num_units=num_units)
layer_2 = tf.contrib.rnn.DropoutWrapper(cell=layer_2, output_keep_prob=1.- dropout)
layer_2 = ACTCell(num_units=num_units, cell=layer_2, epsilon=0.01, max_computation=20, batch_size=batch_size)
layer_3 = tf.contrib.rnn.GRUCell(num_units=num_units)
layer_3 = tf.contrib.rnn.DropoutWrapper(cell=layer_3, output_keep_prob=1.- dropout)
layer_3 = ACTCell(num_units=num_units, cell=layer_3, epsilon=0.01, max_computation=20, batch_size=batch_size)
layer_4 = tf.contrib.rnn.GRUCell(num_units=num_units)
layer_4 = tf.contrib.rnn.DropoutWrapper(cell=layer_4, output_keep_prob=1.- dropout)
layer_4 = ACTCell(num_units=num_units, cell=layer_4, epsilon=0.01, max_computation=20, batch_size=batch_size)
MultiGRU = tf.contrib.rnn.MultiRNNCell(cells=[layer_1, layer_2, layer_3, layer_4], state_is_tuple=True)
test = tf.transpose(input_data, [1,0,2])
test = tf.reshape(test, [-1, in_width])
test = tf.split(value=test, num_or_size_splits=in_length, axis=0)
output, state = tf.contrib.rnn.static_rnn(cell=MultiGRU, inputs=test, dtype=tf.float32)
ponder_loss = (layer_1.calculate_ponder_cost(time_penalty=.001) + \
layer_2.calculate_ponder_cost(time_penalty=.001) + \
layer_3.calculate_ponder_cost(time_penalty=.001) + \
layer_4.calculate_ponder_cost(time_penalty=.001))
final_output = output[-1]
softmax_W = tf.Variable(tf.truncated_normal(shape=([num_units, outclasses]), dtype=tf.float32, name='softmax_W'))
softmax_b = tf.Variable(tf.truncated_normal(shape=([outclasses]), dtype=tf.float32, name='softmax_b'))
final_output = tf.matmul(final_output, softmax_W) + softmax_b
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=final_output, labels=targets))
cost = cost + ponder_loss
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars), max_grad_norm)
optimizer = tf.train.AdamOptimizer(learning_rate)
optimizer = optimizer.apply_gradients(zip(grads, tvars))
correct_pred = tf.equal(tf.argmax(final_output,1), tf.argmax(targets,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
%%time
loss_list = []
acc_list = []
pon_list = []
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for epoch in np.arange(500):
for i in np.arange(int(len(trainX)/batch_size)):
batch_x = trainX[i*batch_size: (i+1)*batch_size]
batch_y = trainY[i*batch_size: (i+1)*batch_size]
sess.run(optimizer, feed_dict={input_data:batch_x, targets:batch_y})
pon, acc, loss = sess.run([ponder_loss, accuracy, cost], feed_dict={input_data:batch_x, targets:batch_y})
loss_list.append(loss)
acc_list.append(acc)
pon_list.append(pon)
Wall time: 15min 31s
tmp = pd.Series(loss_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x41b73748>
tmp = pd.Series(acc_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0xed4a95c0>
tmp = pd.Series(pon_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0xf0594ef0>
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...