请 [注册] 或 [登录]  | 返回主站

量化交易吧 /  数理科学 帖子:3364712 新帖:0

自适应计算时间RNN

专门套利发表于:5 月 10 日 05:13回复(1)

前几天看博客看到Alex Graves设计的ACT (Adaptive Computation Time Graves, 2016)通过构建包裹函数实现RNN模型在时间序列t可以进行n迭代计算,取权重累加和作为时间序列在时刻t的输出,其核心逻辑为通过拉长RNN的在时间序列上的长度增强RNN模型的复杂性,提高非线性表达。

由于RNN使用tanh激活函数,超过5层的RNN计算成本很大,近两年的论文认为拉长RNN的时间序列长度可以提高RNN模型的效果,通过对一个时刻的RNN反复迭代计算并输出权重和作为时间序列在时刻t的输出,可以变相拉长RNN的长度。ACT模型通过使用时间损失函数来压迫模型学习在有限时间(有限的迭代步长)学习判断输出。

这里使用ACT包裹GRU模型对HS300指数进行预测,对比GRU模型,ACT包裹GRU有一定的效果的提升。

import numpy as np
import pandas as pd
import tensorflow as tf
import talib 
import datetime
import matplotlib.pylab as plt
%matplotlib inline
import seaborn as sns
sns.set_style('whitegrid')

tmp = pd.read_csv('G:/QuantCodes/Adaptive_Computation_Time/A1.csv')
Dtmp = pd.read_csv('G:/QuantCodes/Adaptive_Computation_Time/A2.csv')
del tmp['Unnamed: 0']
del Dtmp['Unnamed: 0']

tmp['oc'] = (tmp['close'] - tmp['open'])/tmp['open']
tmp['oh'] = (tmp['high'] - tmp['open'])/tmp['open']
tmp['ol'] = (tmp['low'] - tmp['open'])/tmp['open']
tmp['ch'] = (tmp['high'] - tmp['close'])/tmp['close']
tmp['cl'] = (tmp['low'] - tmp['close'])/tmp['close']
tmp['lh'] = (tmp['high'] - tmp['low'])/tmp['low']
tmp['oc_4'] = (tmp['close'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['oh_4'] = (tmp['high'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['ol_4'] = (tmp['low'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['co_8'] = (tmp['open'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['ch_8'] = (tmp['high'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['cl_8'] = (tmp['low'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['co_80'] = (tmp['open'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['ch_80'] = (tmp['high'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['cl_80'] = (tmp['low'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['ATR14'] = talib.ATR(tmp.high.values, tmp.low.values, tmp.close.values, timeperiod=14)
tmp['ATR6'] = talib.ATR(tmp.high.values, tmp.low.values, tmp.close.values, timeperiod=6)
tmp['EMA6'] = talib.EMA(tmp.close.values, timeperiod=6)
tmp['EMA12'] = talib.EMA(tmp.close.values, timeperiod=12)
tmp['EMA26'] = talib.EMA(tmp.close.values, timeperiod=26)
tmp['tEMA6'] = talib.EMA(tmp.total_turnover.values, timeperiod=6)
tmp['tEMA12'] = talib.EMA(tmp.total_turnover.values, timeperiod=12)
tmp['tEMA26'] = talib.EMA(tmp.total_turnover.values, timeperiod=26)
tmp['VEMA6'] = talib.EMA(tmp.volume.values, timeperiod=6)
tmp['VEMA12'] = talib.EMA(tmp.volume.values, timeperiod=12)
tmp['VEMA26'] = talib.EMA(tmp.volume.values, timeperiod=26)
tmp['MACD_DIF'], tmp['MACD_DEA'], tmp['MACD_bar'] = talib.MACD(
tmp.close.values, fastperiod=12, slowperiod=24, signalperiod=9)

faclist = ['oc', 'oh', 'ol', 'ch', 'cl', 'lh', 'oc_4', 'oh_4', 'ol_4', 'co_8',
       'ch_8', 'cl_8', 'co_80', 'ch_80', 'cl_80', 'ATR14', 'ATR6', 'EMA6',
       'EMA12', 'EMA26', 'tEMA6', 'tEMA12', 'tEMA26', 'VEMA6', 'VEMA12',
       'VEMA26', 'MACD_DIF', 'MACD_DEA', 'MACD_bar']
del_list = ['total_turnover', 'volume', 'open', 'close', 'high', 'low']

# 去极值
for i in faclist:
    mean = tmp.ix[:,i].mean()
    std = tmp.ix[:,i].std()
    tmp.loc[(tmp[i] > mean+3*std),i] = (mean + 3*std)

# 标准化
for i in faclist:
    mean = tmp.ix[:,i].mean()
    std = tmp.ix[:,i].std()
    tmp[i] = (tmp[i] - mean)/std

tmp.drop(labels=del_list, axis=1, inplace=True)
tmp.dropna(inplace= True)

Dtmp['returns'] = Dtmp.close.shift(-17)/Dtmp.close -1.
Dtmp.dropna(inplace=True)
Dtmp['dir_lab'] = 1
Dtmp.loc[Dtmp.returns>0.07,'dir_lab'] = 2 
Dtmp.loc[Dtmp.returns<-0.07,'dir_lab'] = 0 
Dtmp= Dtmp.iloc[-359:,:]
Dtmp.reset_index(drop= True , inplace= True)

start = Dtmp.Date.values[0]
end = Dtmp.Date.values[-1]
end = datetime.datetime.strptime(end, '%Y-%m-%d')
end = end + datetime.timedelta(days= 1)
end = end.strftime('%Y-%m-%d')

tmp = tmp.ix[(tmp.Date.values>start)&(tmp.Date.values<end)].reset_index(drop=True)
forward = 59
tmpfac = np.zeros((1, forward*16*29))
for i in np.arange(forward, int(len(Dtmp))):
    tp = tmp.iloc[16*(i-forward):16*i,1:]
    tp = np.array(tp).ravel(order='C').transpose()
    tmpfac = np.vstack((tmpfac, tp))
tmpfac = np.delete(tmpfac,0,axis=0)

trainX = tmpfac.reshape([-1,59,16*29])
trainY = (Dtmp.iloc[-300:,-1])

def dense_to_one_hot(labels_dense):
    """标签 转换one hot 编码
    输入labels_dense 必须为非负数
    2016-11-21
    """
    num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
    raws_labels = labels_dense.shape[0]
    index_offset = np.arange(raws_labels) * num_classes
    labels_one_hot = np.zeros((raws_labels, num_classes))
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot  

trainY = dense_to_one_hot(trainY)
trainY.shape
(300, 3)

单层GRU¶

batch_size = 100
in_length = 59
in_width = 464
num_units = 128
outclasses = 3
learning_rate = 0.001
max_grad_norm = 5
input_data = tf.placeholder(shape=[None, in_length, in_width], dtype=tf.float32)
targets = tf.placeholder(shape=[None,outclasses], dtype=tf.float32)

test = tf.transpose(input_data, [1,0,2])
test = tf.reshape(test, [-1, in_width])
test = tf.split(value=test, num_or_size_splits=in_length, axis=0)

GRUCell = tf.contrib.rnn.GRUCell(num_units=num_units)

output, state = tf.contrib.rnn.static_rnn(cell=GRUCell, dtype=tf.float32, inputs= test)

final_output = output[-1]

softmax_W = tf.Variable(tf.truncated_normal(shape=([num_units, outclasses]), dtype=tf.float32, name='softmax_W'))
softmax_b = tf.Variable(tf.truncated_normal(shape=([outclasses]), dtype=tf.float32, name='softmax_b'))

final_output = tf.matmul(final_output, softmax_W) + softmax_b
#final_output = tf.nn.softmax(final_output)
final_output

cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=final_output, labels=targets))

optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)
correct_pred = tf.equal(tf.argmax(final_output,1), tf.argmax(targets,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
%%time
loss_list = []
acc_list = []
with tf.Session() as sess:    
    sess.run(tf.global_variables_initializer())
    for epoch in np.arange(30):
        for i in np.arange(int(len(trainX)/batch_size)):
            batch_x = trainX[i*batch_size: (i+1)*batch_size]
            batch_y = trainY[i*batch_size: (i+1)*batch_size]
            sess.run(optimizer, feed_dict={input_data:batch_x, targets:batch_y})
        acc, loss = sess.run([accuracy, cost], feed_dict={input_data:batch_x, targets:batch_y})    
        loss_list.append(loss)
        acc_list.append(acc)
Wall time: 23.2 s
tmp = pd.Series(loss_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x4e10908>
tmp = pd.Series(acc_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x16fbfcf8>

单层ACT¶

tf.Graph().as_default()
<contextlib._GeneratorContextManager at 0x171481d0>
import collections
import math

import tensorflow as tf
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import clip_ops
from tensorflow.python.ops import embedding_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn_ops
from tensorflow.python.ops import partitioned_variables
from tensorflow.python.ops import variable_scope as vs

from tensorflow.python.ops.math_ops import sigmoid
from tensorflow.python.ops.math_ops import tanh

from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.util import nest
def _state_size_with_prefix(state_size, prefix=None):
    result_state_size = tensor_shape.as_shape(state_size).as_list()
    if prefix is not None:
        if not isinstance(prefix, list):
            raise TypeError("prefix of _state_size_with_prefix should be a list.")
        result_state_size = prefix + result_state_size
    return result_state_size



def _linear(args, output_size, bias, bias_start=0.0, scope=None):
  """Linear map: sum_i(args[i] * W[i]), where W[i] is a variable.

  Args:
    args: a 2D Tensor or a list of 2D, batch x n, Tensors.
    output_size: int, second dimension of W[i].
    bias: boolean, whether to add a bias term or not.
    bias_start: starting value to initialize the bias; 0 by default.
    scope: VariableScope for the created subgraph; defaults to "Linear".

  Returns:
    A 2D Tensor with shape [batch x output_size] equal to
    sum_i(args[i] * W[i]), where W[i]s are newly created matrices.

  Raises:
    ValueError: if some of the arguments has unspecified or wrong shape.
  """
  if args is None or (nest.is_sequence(args) and not args):
    raise ValueError("`args` must be specified")
  if not nest.is_sequence(args):
    args = [args]

  # Calculate the total size of arguments on dimension 1.
  total_arg_size = 0
  shapes = [a.get_shape().as_list() for a in args]
  for shape in shapes:
    if len(shape) != 2:
      raise ValueError("Linear is expecting 2D arguments: %s" % str(shapes))
    if not shape[1]:
      raise ValueError("Linear expects shape[1] of arguments: %s" % str(shapes))
    else:
      total_arg_size += shape[1]

  dtype = [a.dtype for a in args][0]

  # Now the computation.
  with vs.variable_scope(scope or "Linear"):
    matrix = vs.get_variable(
        "Matrix", [total_arg_size, output_size], dtype=dtype)
    if len(args) == 1:
      res = math_ops.matmul(args[0], matrix)
    else:
      res = math_ops.matmul(array_ops.concat(1, args), matrix)
    if not bias:
      return res
    bias_term = vs.get_variable(
        "Bias", [output_size],
        dtype=dtype,
        initializer=init_ops.constant_initializer(
            bias_start, dtype=dtype))
  return res + bias_term
class RNNCell(object):
    def zero_state(self, batch_size, dtype):
        state_size = self.state_size
        if nest.is_sequence(state_size):
            state_size_flat = nest.flatten(state_size)
            zeros_flat = [
                array_ops.zeros(
                array_ops.stack(_state_size_with_prefix(
                s, prefix=[batch_size])),
                    dtype= dtype)
                for s in state_size_flat
            ]
            for s,z in zip(state_size_flat, zeros_flat):
                z.set_shape(_state_size_with_prefix(s, prefix=[None]))
                zeros = nest.pack_sequence_as(structure=state_size,
                                             flat_sequence=zeros_flat)
        else:
            zeros_size = _state_size_with_prefix(state_size, prefix=[batch_size])
            zeros = array_ops.zeros(array_ops.stack(zeros_size), dtype=dtype)
            zeros.set_shape(_state_size_with_prefix(state_size, prefix=[None]))
        return zeros

    

class GRUCell(RNNCell):
  """Gated Recurrent Unit cell (cf. http://arxiv.org/abs/1406.1078)."""

  def __init__(self, num_units, input_size=None, activation=tanh):
    if input_size is not None:
      logging.warn("%s: The input_size parameter is deprecated.", self)
    self._num_units = num_units
    self._activation = activation

  @property
  def state_size(self):
    return self._num_units

  @property
  def output_size(self):
    return self._num_units

  def __call__(self, inputs, state, scope=None):
    """Gated recurrent unit (GRU) with nunits cells."""
    with vs.variable_scope(scope or "gru_cell"):
      with vs.variable_scope("gates"):  # Reset gate and update gate.
        # We start with bias of 1.0 to not reset and not update.
        r, u = array_ops.split(
            value=_linear(
                [inputs, state], 2 * self._num_units, True, 1.0),
            num_or_size_splits=2,
            axis=1)
        r, u = sigmoid(r), sigmoid(u)
      with vs.variable_scope("candidate"):
        c = self._activation(_linear([inputs, r * state],
                                     self._num_units, True))
      new_h = u * state + (1 - u) * c
    return new_h, new_h



class ACTCell(tf.contrib.rnn.RNNCell):
    
    def __init__(self,
                num_units,
                cell,
                epsilon,
                max_computation,
                batch_size,
                sigmoid_output=False
                ):
        self._num_units = num_units
        self.cell = cell
        self.batch_size = batch_size
        self.one_sub_epsilon = tf.constant(value=1.-epsilon, shape=[self.batch_size])
        self.N = tf.constant(value=max_computation, dtype=tf.float32, shape=[self.batch_size])
        self.sigmoid_output = sigmoid_output
        self.ACT_Rt = []
        self.ACT_n = []
        
    @property
    def input_size(self):
        return self._num_units
    @property
    def output_size(self):
        return self._num_units
    @property
    def state_size(self):
        return self._num_units    
    
    def act_step(self, 
                 accumulation_ht_n_sub_1, 
                 condiction_1_stop_signal,
                 x_t_input, 
                 state_t_n_sub_1, 
                 n, 
                 previous_step_modify_the_stop_signal, 
                 state_t, 
                 output_t):
        """
        Equation 3
        二进制标签,设置标志区别对于模型是否是t时刻的第一次输入第一次输入,
        标签为1 表示在t时刻第一次输入xinput
        标签为0 表示在t时刻这不是第一次输入xinput数据,这是重复计算。
        标签矩阵由 htn 的累加和决定,如果 accumulation_htn 在该批次全部为0,
        则认为这是在t时刻第一次输入数据,不是自适应的重复计算。
        注意equation 3 中xtn只有在第一次输入的时候才有delta=1
        """
        binary_flag = tf.cond(pred= tf.reduce_all(tf.equal(accumulation_ht_n_sub_1,0.)),
                              fn1= lambda: tf.ones(shape=[self.batch_size,1], dtype=tf.float32),
                              fn2= lambda: tf.zeros(shape=[self.batch_size,1], dtype=tf.float32))

        x_t_n_input = tf.concat(values=[binary_flag, x_t_input], axis=1)
        
        output_t_n, state_t_n = tf.contrib.rnn.static_rnn(cell=self.cell, inputs=[x_t_n_input], 
                                                      initial_state= state_t_n_sub_1, scope=type(self.cell).__name__)     
        
        """
        equation 5 
        计算 halting unit htn
        修改格式为1维
        """               
        with tf.variable_scope('halting_unit'):
            ht_n = tf.sigmoid(_linear(args= state_t_n, output_size= 1, bias=True, bias_start=.1))
            ht_n = tf.squeeze(ht_n)
        
        
        """
        equation 6 && equation 13
        equation 13 可以转换为判断 
        1> htn在n步的累加和 < 1 - epsilon 也就是 n < min{} eq6
        2> n < self.N
        """       
        condition_1 = tf.less(accumulation_ht_n_sub_1 + ht_n, self.one_sub_epsilon)
        #condition_1_float = tf.cast(x=condition_1, dtype=tf.float32)
        modify_the_stop_signal = tf.logical_and(x=condition_1, y= previous_step_modify_the_stop_signal)
        modify_the_stop_signal_float = tf.cast(x=modify_the_stop_signal, dtype=tf.float32)
        
        accumulation_ht_n = ht_n*modify_the_stop_signal_float + accumulation_ht_n_sub_1
        # 只有在pt_n第一次切换到R(t)起作用的停止信号
        # 对batch 训练最后一个pt_n切换到R(t)的时候起作用
        condiction_1_stop_signal += ht_n * tf.cast(x=previous_step_modify_the_stop_signal, dtype=tf.float32) 
        
        n = n + modify_the_stop_signal_float
        condition_2 = tf.less(x=n, y=self.N)
        
        
        """
        条件1 条件2 同时成立 p_t_n = h_t_n
        否则 p_t_n = R_t
        """
        p_t_n_select = tf.logical_and(condition_1, condition_2)
        
        R_t = tf.expand_dims(input=1.-accumulation_ht_n_sub_1, axis=-1)
        ht_n = tf.expand_dims(input=ht_n, axis=-1)
        
        p_t_n = tf.where(condition=p_t_n_select, x=ht_n, y=R_t)
        
        
        signal_select_channel = tf.expand_dims(input=tf.cast(x=previous_step_modify_the_stop_signal, dtype=tf.float32), axis=-1)
        
        state_t = (state_t_n * p_t_n * signal_select_channel) + state_t
        output_t = (output_t_n * p_t_n * signal_select_channel) + output_t
        
        state_t = tf.squeeze(state_t)
        output_t = tf.squeeze(output_t)
        
        return [
            accumulation_ht_n,
            condiction_1_stop_signal,
            x_t_input,
            state_t_n,
            n,
            modify_the_stop_signal,
            state_t,
            output_t
        ]
        """
        上面的act_step设计缺陷在使用 batch 训练的时候会出现在同一batch内 n 停止数值不同,
        假设 batch_size = 128, 也就是使用SGD训练的时候同时对128行数据进行计算,
        这个时候出现对于ACT迭代模型出现问题为,可能在 n = 8 的时候 row_1就达到条件切换到R(t)
        但是row_2没有达到停止条件,这个时候在 n = 9的时候出现问题,也就是这个时候上面的算法,
        在n = 9的情况下继续进行计算,这个时候出现问题是了, 对于row_1 而言在n=8已经切换到ptn = R(t)
        在n = 9的时候 row_1还会继续运算,这个时候出现问题是,row_1 会累加第二个 R(t),而不是停止运算,
        这里为了处理这个问题,将设置batch_mask 在n=9的时候, 设置row_1 累加0解决这个问题。
        注意在使用修正信号的时候注意最后处理前一步的修正信号

        因为修正信号_float 是0,1 表示 False,True所以 最终更新state_t和y_t的时候可以设置乘以0 来修正额外的更新错误,
        但是这个地方需要注意的是,只要在步骤n p_t_n从h_t_n切换到R(t) 修正信号就为False 这是表示成float的信号就是0,
        但是第一次切换到R(t)的时候 n步骤的修正信号为0,n-1步骤修正信号为1,这个时候需要* n-1步骤的修正信号放行第一次出现R(t)
        """
    
    def __call__(self, Xinput, state, timestep=0, scope=None):
        with tf.variable_scope(scope or type(self).__name__):
            accumulation_ht_n = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_halting_probability')
            condiction_1_stop_signal = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_condiction_1_stop_signal')
            n = tf.constant(value=0, dtype=tf.float32, shape=[self.batch_size], name='ininital_counter')
            modify_the_stop_signal = tf.constant(value=True, dtype=tf.bool, shape=[self.batch_size])
            cache_output_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_output_t')
            cache_state_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_state_t')
            
            def halting_function(accumulation_ht_n, condiction_1_stop_signal, x_t_input, state_t_n, n, modify_the_stop_signal, state_t, output_t):
                return tf.reduce_any(tf.logical_and(tf.less(n, self.N), tf.less(condiction_1_stop_signal, self.one_sub_epsilon)))
            
            Rt,_,_,_, n, _, state_t, output_t = tf.while_loop(cond=halting_function, body=self.act_step, 
                          loop_vars=[accumulation_ht_n, condiction_1_stop_signal, Xinput, state, n, modify_the_stop_signal, cache_state_t, cache_output_t])
            
        self.ACT_Rt.append(tf.reduce_sum(1.- Rt))
        self.ACT_n.append(tf.reduce_sum(n))

        if self.sigmoid_output:
            output_t = tf.sigmoid(_linear(args=output_t, output_size=1, bias=True, bias_start=.1))
        
        return output_t, state_t
    
    def calculate_ponder_cost(self, time_penalty=0.01):
        return time_penalty * tf.reduce_sum(
            tf.add_n(self.ACT_Rt)/len(self.ACT_Rt) + 
            tf.to_float(tf.add_n(self.ACT_n)/len(self.ACT_n))
        )
    
    def test(self, Xinput, state, timestep=0, scope=None):
        with tf.variable_scope(scope or type(self).__name__):
            accumulation_ht_n = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_halting_probability')
            condiction_1_stop_signal = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_condiction_1_stop_signal')
            n = tf.constant(value=0, dtype=tf.float32, shape=[self.batch_size], name='ininital_counter')
            modify_the_stop_signal = tf.constant(value=True, dtype=tf.bool, shape=[self.batch_size])
            cache_output_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_output_t')
            cache_state_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_state_t')
            
            def halting_function(accumulation_ht_n, condiction_1_stop_signal, x_t_input, state_t_n, n, modify_the_stop_signal, state_t, output_t):
                return tf.reduce_any(tf.logical_and(tf.less(n, self.N), tf.less(condiction_1_stop_signal, self.one_sub_epsilon)))
            
            print ('accumulation_ht_n',accumulation_ht_n.get_shape().as_list())
            print ('condiction_1_stop_signal', condiction_1_stop_signal.get_shape().as_list())
            print ('n',n.get_shape().as_list())
            print ('modify_the_stop_signal', modify_the_stop_signal.get_shape().as_list())
            print ('cache_output_t', cache_output_t.get_shape().as_list())
            print ('cache_state_t', cache_state_t.get_shape().as_list())
            
            self.act_step(accumulation_ht_n_sub_1 = accumulation_ht_n,
                         condiction_1_stop_signal = condiction_1_stop_signal,
                         x_t_input = Xinput,
                         state_t_n_sub_1 = state,
                         n = n,
                         previous_step_modify_the_stop_signal = modify_the_stop_signal,
                         state_t = cache_state_t,
                         output_t = cache_output_t)
            
            print ('-----------------')
            print ('accumulation_ht_n',accumulation_ht_n.get_shape().as_list())
            print ('condiction_1_stop_signal', condiction_1_stop_signal.get_shape().as_list())
            print ('n',n.get_shape().as_list())
            print ('modify_the_stop_signal', modify_the_stop_signal.get_shape().as_list())
            print ('cache_output_t', cache_output_t.get_shape().as_list())
            print ('cache_state_t', cache_state_t.get_shape().as_list())            
batch_size = 100
in_length = 59
in_width = 464
num_units = 128
outclasses = 3
learning_rate = 0.001
max_grad_norm = 5
input_data = tf.placeholder(shape=[None, in_length, in_width], dtype=tf.float32)
targets = tf.placeholder(shape=[None,outclasses], dtype=tf.float32)

GRU = tf.contrib.rnn.GRUCell(num_units= num_units)
ACT = ACTCell(num_units=num_units, cell=GRU, epsilon=0.01, max_computation=50, batch_size=batch_size)

test = tf.transpose(input_data, [1,0,2])
test = tf.reshape(test, [-1, in_width])
test = tf.split(value=test, num_or_size_splits=in_length, axis=0)

output, state = tf.contrib.rnn.static_rnn(cell=ACT, inputs=test, dtype=tf.float32)

final_output = output[-1]

softmax_W = tf.Variable(tf.truncated_normal(shape=([num_units, outclasses]), dtype=tf.float32, name='softmax_W'))
softmax_b = tf.Variable(tf.truncated_normal(shape=([outclasses]), dtype=tf.float32, name='softmax_b'))

final_output = tf.matmul(final_output, softmax_W) + softmax_b

ponder_loss = ACT.calculate_ponder_cost(time_penalty=.01)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=final_output, labels=targets))
cost = cost + ponder_loss

tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars), max_grad_norm)
optimizer = tf.train.AdamOptimizer(learning_rate)
optimizer = optimizer.apply_gradients(zip(grads, tvars))

correct_pred = tf.equal(tf.argmax(final_output,1), tf.argmax(targets,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
%%time
loss_list = []
acc_list = []
pon_list = []
with tf.Session() as sess:    
    sess.run(tf.global_variables_initializer())
    for epoch in np.arange(30):
        for i in np.arange(int(len(trainX)/batch_size)):
            batch_x = trainX[i*batch_size: (i+1)*batch_size]
            batch_y = trainY[i*batch_size: (i+1)*batch_size]
            sess.run(optimizer, feed_dict={input_data:batch_x, targets:batch_y})
        pon, acc, loss = sess.run([ponder_loss, accuracy, cost], feed_dict={input_data:batch_x, targets:batch_y})    
        loss_list.append(loss)
        acc_list.append(acc)
        pon_list.append(pon)
Wall time: 1min 20s
tmp = pd.Series(loss_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x16fcac18>
tmp = pd.Series(acc_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x4122b978>
tmp = pd.Series(pon_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x17150b38>

多层ACT GRU¶

tf.Graph().as_default()
<contextlib._GeneratorContextManager at 0x1712f6a0>
import numpy as np
import pandas as pd
import tensorflow as tf
import talib 
import datetime
import matplotlib.pylab as plt
%matplotlib inline
import seaborn as sns
sns.set_style('whitegrid')
tmp = pd.read_csv('G:/QuantCodes/Adaptive_Computation_Time/A1.csv')
Dtmp = pd.read_csv('G:/QuantCodes/Adaptive_Computation_Time/A2.csv')
del tmp['Unnamed: 0']
del Dtmp['Unnamed: 0']

tmp['oc'] = (tmp['close'] - tmp['open'])/tmp['open']
tmp['oh'] = (tmp['high'] - tmp['open'])/tmp['open']
tmp['ol'] = (tmp['low'] - tmp['open'])/tmp['open']
tmp['ch'] = (tmp['high'] - tmp['close'])/tmp['close']
tmp['cl'] = (tmp['low'] - tmp['close'])/tmp['close']
tmp['lh'] = (tmp['high'] - tmp['low'])/tmp['low']
tmp['oc_4'] = (tmp['close'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['oh_4'] = (tmp['high'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['ol_4'] = (tmp['low'] - tmp['open'].shift(4))/tmp['open'].shift(4)
tmp['co_8'] = (tmp['open'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['ch_8'] = (tmp['high'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['cl_8'] = (tmp['low'] - tmp['close'].shift(8))/tmp['close'].shift(8)
tmp['co_80'] = (tmp['open'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['ch_80'] = (tmp['high'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['cl_80'] = (tmp['low'] - tmp['close'].shift(80))/tmp['close'].shift(80)
tmp['ATR14'] = talib.ATR(tmp.high.values, tmp.low.values, tmp.close.values, timeperiod=14)
tmp['ATR6'] = talib.ATR(tmp.high.values, tmp.low.values, tmp.close.values, timeperiod=6)
tmp['EMA6'] = talib.EMA(tmp.close.values, timeperiod=6)
tmp['EMA12'] = talib.EMA(tmp.close.values, timeperiod=12)
tmp['EMA26'] = talib.EMA(tmp.close.values, timeperiod=26)
tmp['tEMA6'] = talib.EMA(tmp.total_turnover.values, timeperiod=6)
tmp['tEMA12'] = talib.EMA(tmp.total_turnover.values, timeperiod=12)
tmp['tEMA26'] = talib.EMA(tmp.total_turnover.values, timeperiod=26)
tmp['VEMA6'] = talib.EMA(tmp.volume.values, timeperiod=6)
tmp['VEMA12'] = talib.EMA(tmp.volume.values, timeperiod=12)
tmp['VEMA26'] = talib.EMA(tmp.volume.values, timeperiod=26)
tmp['MACD_DIF'], tmp['MACD_DEA'], tmp['MACD_bar'] = talib.MACD(
tmp.close.values, fastperiod=12, slowperiod=24, signalperiod=9)

faclist = ['oc', 'oh', 'ol', 'ch', 'cl', 'lh', 'oc_4', 'oh_4', 'ol_4', 'co_8',
       'ch_8', 'cl_8', 'co_80', 'ch_80', 'cl_80', 'ATR14', 'ATR6', 'EMA6',
       'EMA12', 'EMA26', 'tEMA6', 'tEMA12', 'tEMA26', 'VEMA6', 'VEMA12',
       'VEMA26', 'MACD_DIF', 'MACD_DEA', 'MACD_bar']
del_list = ['total_turnover', 'volume', 'open', 'close', 'high', 'low']

# 去极值
for i in faclist:
    mean = tmp.ix[:,i].mean()
    std = tmp.ix[:,i].std()
    tmp.loc[(tmp[i] > mean+3*std),i] = (mean + 3*std)

# 标准化
for i in faclist:
    mean = tmp.ix[:,i].mean()
    std = tmp.ix[:,i].std()
    tmp[i] = (tmp[i] - mean)/std

tmp.drop(labels=del_list, axis=1, inplace=True)
tmp.dropna(inplace= True)

Dtmp['returns'] = Dtmp.close.shift(-17)/Dtmp.close -1.
Dtmp.dropna(inplace=True)
Dtmp['dir_lab'] = 1
Dtmp.loc[Dtmp.returns>0.07,'dir_lab'] = 2 
Dtmp.loc[Dtmp.returns<-0.07,'dir_lab'] = 0 
Dtmp= Dtmp.iloc[-359:,:]
Dtmp.reset_index(drop= True , inplace= True)

start = Dtmp.Date.values[0]
end = Dtmp.Date.values[-1]
end = datetime.datetime.strptime(end, '%Y-%m-%d')
end = end + datetime.timedelta(days= 1)
end = end.strftime('%Y-%m-%d')

tmp = tmp.ix[(tmp.Date.values>start)&(tmp.Date.values<end)].reset_index(drop=True)
forward = 59
tmpfac = np.zeros((1, forward*16*29))
for i in np.arange(forward, int(len(Dtmp))):
    tp = tmp.iloc[16*(i-forward):16*i,1:]
    tp = np.array(tp).ravel(order='C').transpose()
    tmpfac = np.vstack((tmpfac, tp))
tmpfac = np.delete(tmpfac,0,axis=0)

trainX = tmpfac.reshape([-1,59,16*29])
trainY = (Dtmp.iloc[-300:,-1])

def dense_to_one_hot(labels_dense):
    """标签 转换one hot 编码
    输入labels_dense 必须为非负数
    2016-11-21
    """
    num_classes = len(np.unique(labels_dense)) # np.unique 去掉重复函数
    raws_labels = labels_dense.shape[0]
    index_offset = np.arange(raws_labels) * num_classes
    labels_one_hot = np.zeros((raws_labels, num_classes))
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot  

trainY = dense_to_one_hot(trainY)
trainY.shape
(300, 3)
import tensorflow as tf
import collections
import math

from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import clip_ops
from tensorflow.python.ops import embedding_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn_ops
from tensorflow.python.ops import partitioned_variables
from tensorflow.python.ops import variable_scope as vs

from tensorflow.python.ops.math_ops import sigmoid
from tensorflow.python.ops.math_ops import tanh

from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.util import nest

def _linear(args, output_size, bias, bias_start=0.0, scope=None):
  """Linear map: sum_i(args[i] * W[i]), where W[i] is a variable.

  Args:
    args: a 2D Tensor or a list of 2D, batch x n, Tensors.
    output_size: int, second dimension of W[i].
    bias: boolean, whether to add a bias term or not.
    bias_start: starting value to initialize the bias; 0 by default.
    scope: VariableScope for the created subgraph; defaults to "Linear".

  Returns:
    A 2D Tensor with shape [batch x output_size] equal to
    sum_i(args[i] * W[i]), where W[i]s are newly created matrices.

  Raises:
    ValueError: if some of the arguments has unspecified or wrong shape.
  """
  if args is None or (nest.is_sequence(args) and not args):
    raise ValueError("`args` must be specified")
  if not nest.is_sequence(args):
    args = [args]

  # Calculate the total size of arguments on dimension 1.
  total_arg_size = 0
  shapes = [a.get_shape().as_list() for a in args]
  for shape in shapes:
    if len(shape) != 2:
      raise ValueError("Linear is expecting 2D arguments: %s" % str(shapes))
    if not shape[1]:
      raise ValueError("Linear expects shape[1] of arguments: %s" % str(shapes))
    else:
      total_arg_size += shape[1]

  dtype = [a.dtype for a in args][0]

  # Now the computation.
  with vs.variable_scope(scope or "Linear"):
    matrix = vs.get_variable(
        "Matrix", [total_arg_size, output_size], dtype=dtype)
    if len(args) == 1:
      res = math_ops.matmul(args[0], matrix)
    else:
      res = math_ops.matmul(array_ops.concat(1, args), matrix)
    if not bias:
      return res
    bias_term = vs.get_variable(
        "Bias", [output_size],
        dtype=dtype,
        initializer=init_ops.constant_initializer(
            bias_start, dtype=dtype))
  return res + bias_term

class ACTCell(tf.contrib.rnn.RNNCell):
    
    def __init__(self,
                num_units,
                cell,
                epsilon,
                max_computation,
                batch_size,
                sigmoid_output=False
                ):
        self._num_units = num_units
        self.cell = cell
        self.batch_size = batch_size
        self.one_sub_epsilon = tf.constant(value=1.-epsilon, shape=[self.batch_size])
        self.N = tf.constant(value=max_computation, dtype=tf.float32, shape=[self.batch_size])
        self.sigmoid_output = sigmoid_output
        self.ACT_Rt = []
        self.ACT_n = []
        
    @property
    def input_size(self):
        return self._num_units
    @property
    def output_size(self):
        return self._num_units
    @property
    def state_size(self):
        return self._num_units    
    
    def act_step(self, 
                 accumulation_ht_n_sub_1, 
                 condiction_1_stop_signal,
                 x_t_input, 
                 state_t_n_sub_1, 
                 n, 
                 previous_step_modify_the_stop_signal, 
                 state_t, 
                 output_t):
        """
        Equation 3
        二进制标签,设置标志区别对于模型是否是t时刻的第一次输入第一次输入,
        标签为1 表示在t时刻第一次输入xinput
        标签为0 表示在t时刻这不是第一次输入xinput数据,这是重复计算。
        标签矩阵由 htn 的累加和决定,如果 accumulation_htn 在该批次全部为0,
        则认为这是在t时刻第一次输入数据,不是自适应的重复计算。
        注意equation 3 中xtn只有在第一次输入的时候才有delta=1
        """
        binary_flag = tf.cond(pred= tf.reduce_all(tf.equal(accumulation_ht_n_sub_1,0.)),
                              fn1= lambda: tf.ones(shape=[self.batch_size,1], dtype=tf.float32),
                              fn2= lambda: tf.zeros(shape=[self.batch_size,1], dtype=tf.float32))

        x_t_n_input = tf.concat(values=[binary_flag, x_t_input], axis=1)
        
        output_t_n, state_t_n = tf.contrib.rnn.static_rnn(cell=self.cell, inputs=[x_t_n_input], 
                                                      initial_state= state_t_n_sub_1, scope=type(self.cell).__name__)     
        
        """
        equation 5 
        计算 halting unit htn
        修改格式为1维
        """               
        with tf.variable_scope('halting_unit'):
            ht_n = tf.sigmoid(_linear(args= state_t_n, output_size= 1, bias=True, bias_start=.1))
            ht_n = tf.squeeze(ht_n)
        
        
        """
        equation 6 && equation 13
        equation 13 可以转换为判断 
        1> htn在n步的累加和 < 1 - epsilon 也就是 n < min{} eq6
        2> n < self.N
        """       
        condition_1 = tf.less(accumulation_ht_n_sub_1 + ht_n, self.one_sub_epsilon)
        #condition_1_float = tf.cast(x=condition_1, dtype=tf.float32)
        modify_the_stop_signal = tf.logical_and(x=condition_1, y= previous_step_modify_the_stop_signal)
        modify_the_stop_signal_float = tf.cast(x=modify_the_stop_signal, dtype=tf.float32)
        
        accumulation_ht_n = ht_n*modify_the_stop_signal_float + accumulation_ht_n_sub_1
        # 只有在pt_n第一次切换到R(t)起作用的停止信号
        # 对batch 训练最后一个pt_n切换到R(t)的时候起作用
        condiction_1_stop_signal += ht_n * tf.cast(x=previous_step_modify_the_stop_signal, dtype=tf.float32) 
        
        n = n + modify_the_stop_signal_float
        condition_2 = tf.less(x=n, y=self.N)
        
        
        """
        条件1 条件2 同时成立 p_t_n = h_t_n
        否则 p_t_n = R_t
        """
        p_t_n_select = tf.logical_and(condition_1, condition_2)
        
        R_t = tf.expand_dims(input=1.-accumulation_ht_n_sub_1, axis=-1)
        ht_n = tf.expand_dims(input=ht_n, axis=-1)
        
        p_t_n = tf.where(condition=p_t_n_select, x=ht_n, y=R_t)
        
        
        signal_select_channel = tf.expand_dims(input=tf.cast(x=previous_step_modify_the_stop_signal, dtype=tf.float32), axis=-1)
        
        state_t = (state_t_n * p_t_n * signal_select_channel) + state_t
        output_t = (output_t_n * p_t_n * signal_select_channel) + output_t
        
        state_t = tf.squeeze(state_t)
        output_t = tf.squeeze(output_t)
        
        return [
            accumulation_ht_n,
            condiction_1_stop_signal,
            x_t_input,
            state_t_n,
            n,
            modify_the_stop_signal,
            state_t,
            output_t
        ]
        """
        上面的act_step设计缺陷在使用 batch 训练的时候会出现在同一batch内 n 停止数值不同,
        假设 batch_size = 128, 也就是使用SGD训练的时候同时对128行数据进行计算,
        这个时候出现对于ACT迭代模型出现问题为,可能在 n = 8 的时候 row_1就达到条件切换到R(t)
        但是row_2没有达到停止条件,这个时候在 n = 9的时候出现问题,也就是这个时候上面的算法,
        在n = 9的情况下继续进行计算,这个时候出现问题是了, 对于row_1 而言在n=8已经切换到ptn = R(t)
        在n = 9的时候 row_1还会继续运算,这个时候出现问题是,row_1 会累加第二个 R(t),而不是停止运算,
        这里为了处理这个问题,将设置batch_mask 在n=9的时候, 设置row_1 累加0解决这个问题。
        注意在使用修正信号的时候注意最后处理前一步的修正信号

        因为修正信号_float 是0,1 表示 False,True所以 最终更新state_t和y_t的时候可以设置乘以0 来修正额外的更新错误,
        但是这个地方需要注意的是,只要在步骤n p_t_n从h_t_n切换到R(t) 修正信号就为False 这是表示成float的信号就是0,
        但是第一次切换到R(t)的时候 n步骤的修正信号为0,n-1步骤修正信号为1,这个时候需要* n-1步骤的修正信号放行第一次出现R(t)
        """
    
    def __call__(self, Xinput, state, timestep=0, scope=None):
        with tf.variable_scope(scope or type(self).__name__):
            accumulation_ht_n = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_halting_probability')
            condiction_1_stop_signal = tf.constant(value=0., dtype=tf.float32, shape=[self.batch_size], name='initial_condiction_1_stop_signal')
            n = tf.constant(value=0, dtype=tf.float32, shape=[self.batch_size], name='ininital_counter')
            modify_the_stop_signal = tf.constant(value=True, dtype=tf.bool, shape=[self.batch_size])
            cache_output_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_output_t')
            cache_state_t = tf.zeros_like(tensor=state, dtype=tf.float32, name='cache_for_state_t')
            
            def halting_function(accumulation_ht_n, condiction_1_stop_signal, x_t_input, state_t_n, n, modify_the_stop_signal, state_t, output_t):
                return tf.reduce_any(tf.logical_and(tf.less(n, self.N), tf.less(condiction_1_stop_signal, self.one_sub_epsilon)))
            
            Rt,_,_,_, n, _, state_t, output_t = tf.while_loop(cond=halting_function, body=self.act_step, 
                          loop_vars=[accumulation_ht_n, condiction_1_stop_signal, Xinput, state, n, modify_the_stop_signal, cache_state_t, cache_output_t])
            
        self.ACT_Rt.append(tf.reduce_sum(1.- Rt))
        self.ACT_n.append(tf.reduce_sum(n))

        if self.sigmoid_output:
            output_t = tf.sigmoid(_linear(args=output_t, output_size=1, bias=True, bias_start=.1))
        
        return output_t, state_t
    
    def calculate_ponder_cost(self, time_penalty=0.01):
        return time_penalty * tf.reduce_sum(
            tf.add_n(self.ACT_Rt)/len(self.ACT_Rt) + 
            tf.to_float(tf.add_n(self.ACT_n)/len(self.ACT_n))
        )
batch_size = 100
in_length = 59
in_width = 464
num_units = 20
outclasses = 3
learning_rate = 0.001
max_grad_norm = 5
dropout = 0.618
input_data = tf.placeholder(shape=[None, in_length, in_width], dtype=tf.float32)
targets = tf.placeholder(shape=[None,outclasses], dtype=tf.float32)
layer_1 = tf.contrib.rnn.GRUCell(num_units=num_units)
layer_1 = tf.contrib.rnn.DropoutWrapper(cell=layer_1, output_keep_prob=1.- dropout)
layer_1 = ACTCell(num_units=num_units, cell=layer_1, epsilon=0.01, max_computation=20, batch_size=batch_size)

layer_2 = tf.contrib.rnn.GRUCell(num_units=num_units)
layer_2 = tf.contrib.rnn.DropoutWrapper(cell=layer_2, output_keep_prob=1.- dropout)
layer_2 = ACTCell(num_units=num_units, cell=layer_2, epsilon=0.01, max_computation=20, batch_size=batch_size)

layer_3 = tf.contrib.rnn.GRUCell(num_units=num_units)
layer_3 = tf.contrib.rnn.DropoutWrapper(cell=layer_3, output_keep_prob=1.- dropout)
layer_3 = ACTCell(num_units=num_units, cell=layer_3, epsilon=0.01, max_computation=20, batch_size=batch_size)

layer_4 = tf.contrib.rnn.GRUCell(num_units=num_units)
layer_4 = tf.contrib.rnn.DropoutWrapper(cell=layer_4, output_keep_prob=1.- dropout)
layer_4 = ACTCell(num_units=num_units, cell=layer_4, epsilon=0.01, max_computation=20, batch_size=batch_size)
MultiGRU = tf.contrib.rnn.MultiRNNCell(cells=[layer_1, layer_2, layer_3, layer_4], state_is_tuple=True)

test = tf.transpose(input_data, [1,0,2])
test = tf.reshape(test, [-1, in_width])
test = tf.split(value=test, num_or_size_splits=in_length, axis=0)

output, state = tf.contrib.rnn.static_rnn(cell=MultiGRU, inputs=test, dtype=tf.float32)

ponder_loss = (layer_1.calculate_ponder_cost(time_penalty=.001) + \
layer_2.calculate_ponder_cost(time_penalty=.001) + \
layer_3.calculate_ponder_cost(time_penalty=.001) + \
layer_4.calculate_ponder_cost(time_penalty=.001))

final_output = output[-1]

softmax_W = tf.Variable(tf.truncated_normal(shape=([num_units, outclasses]), dtype=tf.float32, name='softmax_W'))
softmax_b = tf.Variable(tf.truncated_normal(shape=([outclasses]), dtype=tf.float32, name='softmax_b'))

final_output = tf.matmul(final_output, softmax_W) + softmax_b

cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=final_output, labels=targets))
cost = cost + ponder_loss

tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars), max_grad_norm)
optimizer = tf.train.AdamOptimizer(learning_rate)
optimizer = optimizer.apply_gradients(zip(grads, tvars))

correct_pred = tf.equal(tf.argmax(final_output,1), tf.argmax(targets,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
%%time
loss_list = []
acc_list = []
pon_list = []
with tf.Session() as sess:    
    sess.run(tf.global_variables_initializer())
    for epoch in np.arange(500):
        for i in np.arange(int(len(trainX)/batch_size)):
            batch_x = trainX[i*batch_size: (i+1)*batch_size]
            batch_y = trainY[i*batch_size: (i+1)*batch_size]
            sess.run(optimizer, feed_dict={input_data:batch_x, targets:batch_y})
        pon, acc, loss = sess.run([ponder_loss, accuracy, cost], feed_dict={input_data:batch_x, targets:batch_y})    
        loss_list.append(loss)
        acc_list.append(acc)
        pon_list.append(pon)
Wall time: 15min 31s
tmp = pd.Series(loss_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0x41b73748>
tmp = pd.Series(acc_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0xed4a95c0>
tmp = pd.Series(pon_list)
tmp.plot()
<matplotlib.axes._subplots.AxesSubplot at 0xf0594ef0>

全部回复

0/140

量化课程

    移动端课程