简单的强化学习选股框架,在选股方面训练虚拟交易员选股调仓,实现SH50指数增强。
github
深度强化学习选股¶
上证50指数增强¶
简单的强化学习选股框架,在选股方面训练虚拟交易员选股调仓,实现SH50指数增强。
在每个交易日,Agent根据获取的观测数据[Batch, Length, Factor]计算出一个行为向量[Batch],对50只成份股进行调仓,先卖后买使用,使用开盘价成交,在每交易日结束,使用收盘价评估持仓获得reward。
Agent 推断架构为2层LSTM后接MLP输出。
在与训练环境交互的时候使用gather处理n step折现问题。
注:交互环境-账户Account使用向量的方法并行SH50成份股,所以交互环境中股票的顺序是绑定的,并且股票种类使用Batch固定为50。
trainable neural network
import tensorflow as tf
import numpy as np
from sonnet.python.modules.base import AbstractModule
from sonnet.python.modules.basic import Linear as sntLinear
from sonnet.python.modules.gated_rnn import LSTM as sntLSTM
from sonnet.python.modules.basic_rnn import DeepRNN as sntDeepRNN
from sonnet.python.modules.basic import BatchApply as sntBatchApply
def swich(inputs):
return inputs * tf.nn.sigmoid(inputs)
def Linear(name, output_size):
initializers = {"w": tf.truncated_normal_initializer(stddev=0.1),
"b": tf.constant_initializer(value=0.1)}
regularizers = {"w": tf.contrib.layers.l2_regularizer(scale=0.1),
"b": tf.contrib.layers.l2_regularizer(scale=0.1)}
return sntLinear(output_size,
initializers=initializers,
regularizers=regularizers,
name=name)
def build_common_network(inputs):
"""common network
:param inputs: [Time, Batch, state_size]
:return: [Time, Batch, hidden_size]
"""
# build rnn
batch_size = inputs.get_shape().as_list()[1]
l1 = sntLSTM(64, name='rnn_first')
l2 = sntLSTM(32, name='rnn_second')
rnn = sntDeepRNN([l1, l2])
initial_state = rnn.initial_state(batch_size)
# looping
output_sequence, final_state = tf.nn.dynamic_rnn(
rnn, inputs, initial_state=initial_state, time_major=True)
return output_sequence
class ActorNet(AbstractModule):
"""actor network
"""
def __init__(self, name='Actor'):
super().__init__(name=name)
def _build(self, output_size, inputs):
# loop net -> [Time, Batch, hidden_size]
net = build_common_network(inputs) # rnn output (-1, 1)
# linear net
net = sntBatchApply(Linear('input_layer', 64))(net)
net = swich(net)
net = sntBatchApply(Linear('output_layer', output_size))(net)
return tf.nn.softmax(net) # [Time, Batch, output_size]
def get_regularization(self):
return self.get_variables(tf.GraphKeys.REGULARIZATION_LOSSES)
class CriticNet(AbstractModule):
"""critic network
"""
def __init__(self, name='critic'):
super().__init__(name=name)
def _build(self, inputs):
# loop net -> [Time, Batch, hidden_size]
net = build_common_network(inputs) # range (-1, 1)
# linear net
net = sntBatchApply(Linear('input_layer', 64))(net)
net = swich(net)
net = sntBatchApply(Linear('output_layer', 1))(net)
net = tf.squeeze(net, axis=2)
# net = tf.nn.tanh(net)
return tf.reduce_mean(net, axis=1) # [Time]
def get_regularization(self):
return self.get_variables(tf.GraphKeys.REGULARIZATION_LOSSES)
Access
LEARNING_RATE = 1e-3
DECAY_RATE = 0.99
class Access(object):
def __init__(self, batch_size, state_size, action_size):
with tf.variable_scope('Access'):
# placeholder
self.inputs = tf.placeholder(tf.float32, [None, batch_size, state_size], 'inputs')
# network interface
self.actor = ActorNet('actor')
self.critic = CriticNet('critic')
self.policy = tf.nn.softmax(self.actor(action_size, self.inputs))
self.value = self.critic(self.inputs)
# global optimizer
self.optimizer_actor = tf.train.RMSPropOptimizer(
LEARNING_RATE, DECAY_RATE, name='optimizer_actor')
self.optimizer_critic = tf.train.RMSPropOptimizer(
LEARNING_RATE, DECAY_RATE, name='optimizer_critic')
# saver
var_list = self.get_trainable()
var_list = list(var_list[0] + var_list[1])
self.saver = tf.train.Saver(var_list=var_list)
def get_trainable(self):
return [self.actor.get_variables(), self.critic.get_variables()]
def save(self, sess, path):
self.saver.save(sess, path)
def restore(self, sess, path):
var_list = list(self.get_trainable()[0] + self.get_trainable()[1])
saver = tf.train.Saver(var_list=var_list)
saver.restore(sess, path)
Agent
CLIP_MIN = 0.01
CLIP_MAX = 0.98
ENTROPY_BETA = 0.01
MAX_GRAD_NORM = 50
def batch_choice(a, p):
action_list = [np.random.choice(a, p=i) for i in p]
return np.array(action_list)
# local network for advantage actor-critic which are also know as A2C
class Agent(object):
def __init__(self, name, access, batch_size, state_size, action_size):
self.Access = access
self.action_size = action_size
self.batch_size = batch_size
with tf.variable_scope(name):
# placeholder
# [Time, Batch, N]
self.inputs = tf.placeholder(
tf.float32, [None, batch_size, state_size], 'inputs')
# [T_MAX, Batch]
self.actions = tf.placeholder(
tf.int32, [None, batch_size], "actions")
# [T_MAX]
self.targets = tf.placeholder(tf.float32, [None], "discounted_rewards")
self.gathers = tf.placeholder(tf.int32, [None], 'gather_list')
# build network
self.actor = ActorNet('actor')
self.critic = CriticNet('critic')
policy = self.actor(action_size, self.inputs) # [Time, Batch, action_size]
value = self.critic(self.inputs) # [Time]
# fix
policy = tf.clip_by_value(policy, CLIP_MIN, CLIP_MAX, 'constraint')
# interface
self.policy = tf.gather(policy, self.gathers) # [T_MAX, Batch, action_size]
self.value = tf.gather(value, self.gathers) # [T_MAX]
self.policy_step = policy[-1] # [Batch, action_size]
self.value_step = value[-1] # 1
# build function
self._build_losses()
self._build_async()
self._build_interface()
print('graph %s' % (str(name)))
def _build_losses(self):
# value loss
self.advantage = self.targets - self.value # [T_MAX]
value_loss = 0.5 * tf.square(self.advantage)
# policy loss
# [T_MAX, Batch, action_size] -> [T_MAX, Batch]
policy_action = tf.reduce_sum(
self.policy * tf.one_hot(self.actions, self.action_size), axis=2)
# [T_MAX, Batch]
policy_loss = -tf.log(policy_action) * tf.stop_gradient(
tf.expand_dims(self.advantage, axis=1))
# entropy loss [T_MAX, Batch]
entropy_loss = tf.reduce_sum(self.policy * tf.log(self.policy), axis=2)
# total loss
self.critic_loss = tf.reduce_mean(value_loss)
self.actor_loss = tf.reduce_mean(policy_loss + entropy_loss * ENTROPY_BETA)
# interface
self.a_entropy_loss = tf.reduce_mean(entropy_loss)
self.a_policy_loss = tf.reduce_mean(policy_loss)
self.a_value_loss = tf.reduce_mean(value_loss)
self.a_critic_loss = self.critic_loss
self.a_actor_loss = self.actor_loss
self.a_advantage = tf.reduce_mean(self.advantage)
self.a_value_mean = tf.reduce_mean(self.value)
self.a_policy_mean = tf.reduce_mean(self.policy)
def _build_async(self):
global_actor_params, global_critic_params = self.Access.get_trainable()
local_actor_params, local_critic_params = self.get_trainable()
actor_grads = tf.gradients(self.actor_loss, list(local_actor_params))
critic_grads = tf.gradients(self.critic_loss, list(local_critic_params))
# Set up optimizer with global norm clipping.
actor_grads, self.a_actor_grad = tf.clip_by_global_norm(actor_grads, MAX_GRAD_NORM)
critic_grads, self.a_critic_grad = tf.clip_by_global_norm(critic_grads, MAX_GRAD_NORM)
# update Access
actor_apply = self.Access.optimizer_actor.apply_gradients(
zip(list(actor_grads), list(global_actor_params)))
critic_apply = self.Access.optimizer_critic.apply_gradients(
zip(list(critic_grads), list(global_critic_params)))
self.update_global = [actor_apply, critic_apply]
# update ACNet
assign_list = []
for gv, lv in zip(global_actor_params, local_actor_params):
assign_list.append(tf.assign(lv, gv))
for gv, lv in zip(global_critic_params, local_critic_params):
assign_list.append(tf.assign(lv, gv))
self.update_local = assign_list
def _build_interface(self):
self.a_interface = [self.a_actor_loss,
self.a_actor_grad,
self.a_policy_mean,
self.a_policy_loss,
self.a_entropy_loss,
self.a_critic_loss,
self.a_critic_grad,
self.a_value_loss,
self.a_value_mean,
self.a_advantage]
def get_trainable(self):
return [self.actor.get_variables(), self.critic.get_variables()]
def init_or_update_local(self, sess):
"""
init or update local network
:param sess:
:return:
"""
sess.run(self.update_local)
def get_step_policy(self, sess, inputs):
return sess.run(self.policy_step, {self.inputs: inputs})
def get_step_value(self, sess, inputs):
return sess.run(self.value_step, {self.inputs: inputs})
def get_losses(self, sess, inputs, actions, targets, gather_list):
"""
get all loss functions of network
:param sess:
:param inputs:
:param actions:
:param targets:
:return:
"""
feed_dict = {self.inputs: inputs,
self.actions: actions,
self.targets: targets,
self.gathers: gather_list}
return sess.run(self.a_interface, feed_dict)
def train_step(self, sess, inputs, actions, targets, gathers):
feed_dict = {self.inputs: inputs,
self.actions: actions,
self.targets: targets,
self.gathers: gathers}
sess.run(self.update_global, feed_dict)
# get stochastic action for train
def get_stochastic_action(self, sess, inputs, epsilon=0.9):
if np.random.uniform() < epsilon:
policy = sess.run(self.policy_step, {self.inputs: inputs})
return batch_choice(self.action_size, policy)
else:
return np.random.randint(self.action_size, size=self.batch_size)
# get deterministic action for test
def get_deterministic_policy_action(self, sess, inputs):
policy_step = sess.run(self.policy_step, {self.inputs: inputs})
return np.argmax(policy_step, axis=1)
Framework
from agent.actor_critic import Agent
MAX_EPISODE_LENGTH = 200
GAMMA = 0.9
def batch_stack(inputs):
# gather index
gather_list = 63 + 16 * np.arange(len(inputs))
# stack
a = [inputs[0][:-16]]
b = [i[-16:] for i in inputs]
return np.vstack(a + b), gather_list
class Framework(object):
def __init__(self, name, access, batch_size, state_size, action_size):
self.Access = access
self.AC = Agent(name, self.Access, batch_size, state_size, action_size)
self.env = Account()
self.name = name
def run(self, sess, max_episodes, t_max=8):
buffer_score = []
buffer_loss = []
episode = 0
while episode < max_episodes:
episode += 1
episode_score, outputs = self.run_episode(sess, t_max)
buffer_score.append(episode_score)
buffer_loss.append(outputs)
return buffer_score, buffer_loss
def run_episode(self, sess, t_max=8):
t_start = t = 0
episode_score = 1
buffer_state = []
buffer_action = []
buffer_reward = []
self.AC.init_or_update_local(sess)
state = self.env.reset()
while True:
t += 1
action = self.AC.get_stochastic_action(sess, state)
next_state, reward, done = self.env.step(action)
# buffer for loop
episode_score *= (1 + reward / 100)
buffer_state.append(state)
buffer_action.append(action)
buffer_reward.append(reward)
state = next_state
if t - t_start == t_max or done:
t_start = t
terminal = self.get_bootstrap(sess, next_state, done)
buffer_target = []
for r in buffer_reward[::-1]:
terminal = r + GAMMA * terminal
buffer_target.append(terminal)
buffer_target.reverse()
# stack
inputs, gather_list = batch_stack(buffer_state)
actions = np.vstack(buffer_action)
targets = np.squeeze(np.vstack(buffer_target), axis=1)
# empty buffer
buffer_state = []
buffer_action = []
buffer_reward = []
# update Access gradients
self.AC.train_step(sess, inputs, actions, targets, gather_list)
# update local network
self.AC.init_or_update_local(sess)
if done or t > MAX_EPISODE_LENGTH:
outputs = self.get_losses(sess, inputs, actions, targets, gather_list)
outputs = tuple(outputs)
if self.name == 'W0':
print('actor: %f, actor_grad: %f, policy mean: %f, policy: %f, entropy: %f, '
'critic: %f, critic_grad: %f, value: %f, value_mean: %f, advantage: %f'
% outputs)
return episode_score, outputs
def get_bootstrap(self, sess, next_state, done):
if done:
terminal = 0
else:
terminal = self.AC.get_step_value(sess, next_state)
return terminal
def get_losses(self, sess, inputs, actions, targets, gather_list):
return self.AC.get_losses(sess, inputs, actions, targets, gather_list)
main
import multiprocessing
import threading
from env.env_main import Account
NUMS_CPU = multiprocessing.cpu_count()
state_size = 58
batch_size = 50
action_size = 3
max_episodes = 1
GD = {}
class Worker(Framework):
def __init__(self, name, access, batch_size, state_size, action_size):
super().__init__(name, access, batch_size, state_size, action_size)
def run(self, sess, max_episodes, t_max=8):
episode_score_list = []
episode = 0
while episode < max_episodes:
episode += 1
episode_socre, _ = self.run_episode(sess, t_max)
episode_score_list.append(episode_socre)
GD[str(self.name)] = episode_score_list
if self.name == 'W0':
print('Episode: %f, score: %f' % (episode, episode_socre))
print('\n')
with tf.Session() as sess:
with tf.device("/cpu:0"):
A = Access(batch_size, state_size, action_size)
F_list = []
for i in range(NUMS_CPU):
F_list.append(Worker('W%i' % i, A, batch_size, state_size, action_size))
COORD = tf.train.Coordinator()
sess.run(tf.global_variables_initializer())
sess.graph.finalize()
threads_list = []
for ac in F_list:
job = lambda: ac.run(sess, max_episodes)
t = threading.Thread(target=job)
t.start()
threads_list.append(t)
COORD.join(threads_list)
A.save(sess, 'model/saver_1.ckpt')
test
tf.reset_default_graph()
import pandas as pd
import seaborn as sns
sns.set_style('whitegrid')
%matplotlib inline
state_size = 58
batch_size = 50
action_size = 3
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
with tf.Session(config=config) as sess:
with tf.device("/cpu:0"):
A = Access(batch_size, state_size, action_size)
W = Agent('W0', A, batch_size, state_size, action_size)
A.restore(sess,'model/saver_1.ckpt')
W.init_or_update_local(sess)
env = Account()
state = env.reset()
for _ in range(200):
action = W.get_deterministic_policy_action(sess, state)
state, reward, done = env.step(action)
value, reward = env.plot_data()
pd.Series(value).plot(figsize=(16,6))
pd.Series(reward).plot(figsize=(16,6))
pd.Series(np.zeros_like(reward)).plot(figsize=(16,6), color='r')