早就有了这个念头,终于找个周末写完了,效果不太好,以后有心情再优化吧,这里做个总结
主要思路:
总结:
代码又乱又杂,要是有人有兴趣读的话,找时间整理出来。
第一步要准备股票池和特征数据,根据对是市场的不同理解,选出的数据会大不一样,结果也会不同。 这里不多说,抬头看帖子吧。现在假设这些数据已经准备好了,把训练数据Load出来进行预处理
TrainDataGenerator会对数据做些初步处理,dorpna,设置预测函数,分离测试机和训练集,源码在后面。
TD=TrainDataGenerator("train_samples_1120")
TD.load()
TD.gen()
print(TD.X.shape)
print(TD.y.shape)
(5437, 600) (5437,)
MMModel 是核心,主要是
1.归一化: 目标均值0,单位方差
2.降维:这里用PCA,我分析了下PCA的输出,这个训练集n_pca 取180-200比较合适
3.训练:SVC,这里以后可以优化一下,减少过拟合
X_train=TD.X_train
y_train=TD.y_train
X_test=TD.X_test
y_test=TD.y_test
print(X_train.shape)
print(X_test.shape)
mmm = MMModel(name="model_test",n_pca=180, C_svr=1.0)
mmm.fit(X_train,y_train)
trains=mmm.score(X_train,y_train)
tests=mmm.score(X_test,y_test)
print("train set (accuracy, F1_score)",trains)
print("test set (accuracy, F1_score)", tests)
(4350, 600) (1087, 600) ('train set (accuracy, F1_score)', (0.86712643678160917, 0.8361678004535148)) ('test set (accuracy, F1_score)', (0.80588776448942046, 0.72632944228274965))
PCA 的主成分比例可视化一下,我是肉眼找最陡峭的地方,这里有很多技巧,有时间应该总结一下
plt.figure(1, figsize=(4, 3))
plt.clf()
plt.axes([.2, .2, .7, .7])
plt.plot(mmm.mod_demR.explained_variance_[10:], linewidth=2)
plt.axis('tight')
plt.xlabel('n_components')
plt.ylabel('explained_variance_')
<matplotlib.text.Text at 0x7f7b16bbc650>
源码部分,有点基础就能看懂吧,不多说了。 ML基本流程都差不多,可以作为框架,改改就能适合不同的场景需求
import seaborn as sns
import pandas as pd
import numpy as np
import datetime
import time
from jqdata import *
from six import StringIO
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn import metrics
def write_csv(data, file_name):
write_file(file_name,data.to_csv(), append=False)
def read_csv(file_name):
body=read_file(file_name)
data=pd.read_csv(StringIO(body),index_col=[0])
return data
class TrainDataGenerator:
def __init__(self, name="train_samples", need_target=True, train_size_perc=0.8):
self.samples=None
self.path="data/TrainPool/"
self.fullpath=self.path+name+".csv"
self.target_columns=['down','high','open']
self.need_target = need_target
self.train_size_perc = train_size_perc
self.high_threshhold=0.1
self.down_threshhold=-0.05
def save(self):
self.samples = self.samples.dropna()
write_csv(self.samples, self.fullpath)
def load(self):
self.samples = read_csv(self.fullpath)
self.samples = self.samples.dropna()
def gen(self):
self.permutation()
if(self.need_target):
self.split_XY()
self.gen_y()
else:
self.X=self.samples.copy()
if(self.train_size_perc < 1):
self.split_test()
def permutation(self):
self.samples=self.samples.iloc[np.random.permutation(len(self.samples))]
def split_XY(self):
self.Y= self.samples[self.target_columns]
self.X= self.samples.copy()
for c in self.target_columns:
self.X.pop(c)
return self.X, self.Y
def gen_y(self):
high_score=(self.Y['high']-self.Y['open'])/self.Y['open']
down_score=(self.Y['down']-self.Y['open'])/self.Y['open']
pos_mask=(high_score>self.high_threshhold)&(down_score>self.down_threshhold)
self.y=pd.Series(-np.ones(self.Y.shape[0]), index=pos_mask.index)
self.y[pos_mask]=1
return self.y
def split_test(self):
train_size = np.int16(np.round(self.train_size_perc * self.X.shape[0]))
self.X_train, self.y_train = self.X.iloc[:train_size, :], self.y.iloc[:train_size]
self.X_test, self.y_test = self.X.iloc[train_size:, :], self.y.iloc[train_size:]
class MMModel:
def __init__(self, name='mmm', path='data/TrainPool/',n_pca='mle', C_svr=1.0):
self.n_pca=n_pca
self.C_svr=C_svr
self.name=name
self.path=path
self.fullpath= self.path+self.name+".pkl"
def save(self):
dump_pickle(self, self.fullpath)
def load(self):
return load_pickle(self.fullpath)
def fit(self,X,y):
self.mod_norm=StandardScaler()
Xtrans = self.mod_norm.fit_transform(X)
self.mod_demR=PCA(n_components=self.n_pca, svd_solver='full')
Xtrans = self.mod_demR.fit_transform(Xtrans)
self.mod_train=SVC(kernel='rbf', C=self.C_svr)
w,weight=self.gen_svr_w(y)
if(weight<1 or weight>40):
print("unbalance sample: " + weight)
self.mod_train.fit(Xtrans,y,w)
def gen_svr_w(self,y):
tol=y.shape[0]
pos=y[y==1].shape[0]
neg=tol-pos
w=pd.Series(np.ones(y.shape[0]), y.index)
if(pos==0 or neg==0):
return w,0
if(pos<neg):
weight=float(neg)/pos
w[y==1]=weight
else:
weight=float(pos)/neg
w[y==-1]=weight
return w,weight
def transform(self,X,y=None):
Xtrans = self.mod_norm.transform(X)
Xtrans = self.mod_demR.transform(Xtrans)
return Xtrans
def predict(self,X,y=None):
Xtrans =self.transform(X)
return self.mod_train.predict(Xtrans)
def report(self,X,y=None):
Xtrans =self.transform(X)
p=self.mod_train.predict(Xtrans)
d=self.mod_train.decision_function(Xtrans)
return pd.DataFrame({'predict':p, 'dec_func':d})
def score(self,X,y=None):
Xtrans =self.transform(X)
return self.mod_train.score(Xtrans,y), metrics.f1_score(y,self.predict(X))
/opt/conda/envs/python2/lib/python2.7/site-packages/sklearn/externals/joblib/_multiprocessing_helpers.py:28: UserWarning: [Errno 30] Read-only file system. joblib will operate in serial mode warnings.warn('%s. joblib will operate in serial mode' % (e,))
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...