第一块放因子
第二块是计算用到的所有方程
第三块开始计算
第四块pickle存储
多因子回测框架(下)--检验因子
#【模块一:因子对象】
import jqdata
import datetime
from multiprocessing.dummy import Pool as ThreadPool
from jqfactor import Factor,calc_factors
import pandas as pd
import statsmodels.api as sm
import scipy.stats as st
class MC(Factor):
name='MC'
max_window=1
dependencies=['market_cap']
def calc(self,data):
ans=data['market_cap']
return ans.mean()
class DSRI(Factor):
name='DSRI'
max_window = 1
dependencies = ['account_receivable','account_receivable_1','operating_revenue','operating_revenue_1']
def calc(self, data):
ans=(data['account_receivable']/data['operating_revenue'])/(data['account_receivable_1']/data['operating_revenue_1'])
return ans.mean()
class AQI(Factor):
name='AQI'
max_window=1
dependencies = ['intangible_assets','intangible_assets_1','total_assets','total_assets_1']
def calc(self, data):
ans=(data['intangible_assets']/data['total_assets'])/(data['intangible_assets_1']/data['total_assets_1'])
return ans.mean()
class SGAI(Factor):
name='SGAI'
max_window=1
dependencies = ['administration_expense','sale_expense','operating_revenue','administration_expense_1','sale_expense_1','operating_revenue_1']
def calc(self, data):
ans=((data['administration_expense']+data['sale_expense'])/data['operating_revenue'])/((data['administration_expense_1']+data['sale_expense_1'])/data['operating_revenue_1'])
return ans.mean()
class GMI(Factor):
name='GMI'
max_window=1
dependencies=['gross_profit_margin','gross_profit_margin_1']
def calc(self,data):
ans=data['gross_profit_margin_1']/data['gross_profit_margin']
return ans.mean()
class SGI(Factor):
name='SGI'
max_window=1
dependencies=['operating_revenue','operating_revenue_1']
def calc(self,data):
ans=data['operating_revenue']/data['operating_revenue_1']
return ans.mean()
class LVGI(Factor):
name='LVGI'
max_window=1
dependencies=['total_liability','total_assets','total_liability_1','total_assets_1']
def calc(self,data):
ans=(data['total_liability']/data['total_assets'])/(data['total_liability_1']/data['total_assets_1'])
return ans.mean()
class TATA(Factor):
name='TATA'
max_window=1
dependencies=['accounts_payable','total_assets']
def calc(self,data):
ans=data['accounts_payable']/data['total_assets']
return ans.mean()
#【模块二:因子计算-底层方程】
#--------------------Step I: 准备数据--------------------------
def get_trade_dates(end,count=250,interval=20):
date_list=list(jqdata.get_trade_days(end_date=end,count=count))
date_list=date_list[::-1]
date_list=list(filter(lambda x:date_list.index(x)%interval==0,date_list))
date_list=date_list[::-1]
return date_list
def get_stock_pool(date,index='all'): # [date时刻的股票池] (filtered all/index stocks)
df=get_all_securities(types=['stock'],date=date)
dayBefore=jqdata.get_trade_days(end_date=date,count=60)[0] #上市不足60天
df=df[df['start_date']<dayBefore] #上市不足count天的去掉
universe_pool=list(df.index)
if index=='all':
stock_pool=universe_pool
else:
index_pool=get_index_stocks(index,date=date)
stock_pool=list(set(index_pool)&set(universe_pool))
return stock_pool
def get_stock_universe(trade_days_list,index='all'): # [all date的股票总列表]* (filtered all/index stocks)
univ_list=[]
for date in trade_days_list:
stock_pool=get_stock_pool(date,index)
univ_list.append(stock_pool)
return univ_list
def get_return(trade_date_list,count=250): # [获得所有股票的历史回报]* (all stocks)
date=max(trade_date_list)
universe=get_stock_pool(date,index='all')
price=get_price(universe,end_date=date,count=count,fields=['close'])['close']
return_df=price.loc[trade_date_list].pct_change().shift(-1)
#return_df.index=dateTransform(return_df.index)
all_return_df=price.pct_change().shift(-1)
return return_df,all_return_df
def get_factor_by_day(date): # [获取某一日的因子] (unfiltered all/index stocks)
#factor_dict的key是因子名字,value是DataFrame,其中行为日期,列为股票。
factor_list=g_factor_list
index=g_index
if index == 'all':
universe = get_all_securities(types=['stock'],date=date).index.tolist()
else:
universe = get_index_stocks(index,date=date)
factor_dict = calc_factors(universe,factor_list,date,date)
return factor_dict
def get_market_cap_by_day(date): # [获取某一日的市值] (unfiltered all/index stocks)
#返回市值的dateframe
index=g_index
if index == 'all':
universe = get_all_securities(types=['stock'], date=date).index.tolist()
else:
universe = get_index_stocks(index, date=date)
MC_df = calc_factors(universe,[MC()],date,date)
return MC_df
def get_Industry_by_day(date): # [获取某一日的行业] (unfiltered all/index stocks)
# 返回行业的dataframe
index=g_index
if index == 'all':
universe = get_all_securities(types=['stock'], date=date).index.tolist()
else:
universe = get_index_stocks(index, date=date)
industry_set = ['801010', '801020', '801030', '801040', '801050', '801080', '801110', '801120', '801130',
'801140', '801150', '801160', '801170', '801180', '801200', '801210', '801230', '801710',
'801720', '801730', '801740', '801750', '801760', '801770', '801780', '801790', '801880','801890']
industry_df = pd.DataFrame(index=[date],columns=universe)
for industry in industry_set:
industry_stocks = get_industry_stocks(industry,date = date)
industry_stocks = list(set(industry_stocks)&set(universe))
industry_df.loc[date,industry_stocks] = industry
return industry_df
def dateTransform(date_list):
date_list_str=map(lambda x: x.strftime('%Y-%m-%d'),date_list)
date_list_datetime=map(lambda x:datetime.datetime.strptime(x,'%Y-%m-%d'),date_list_str)
return list(date_list_datetime)
## ---------------------------------Step II: 处理数据---------------------------
def get_new_univ_dict(univ_dict,all_industry_df,MC_df):
new_univ_dict={}
for date in list(univ_dict.keys()):
new_univ_list=list(set(univ_dict[date])&\
set(all_industry_df.loc[date].dropna().index)&\
set(MC_df.loc[date].dropna().index))
new_univ_dict[date]=new_univ_list
return new_univ_dict
def replace_nan_indu(all_industry_df,factor_df,univ_dict):
fill_factor=pd.DataFrame()
for date in list(univ_dict.keys()):
univ=univ_dict[date]
factor_by_day=factor_df.loc[date,univ].to_frame('values')
industry_by_day=all_industry_df.loc[date,univ].dropna().to_frame('industry')
factor_by_day=factor_by_day.merge(industry_by_day,left_index=True,right_index=True,how='inner')
mid=factor_by_day.groupby('industry').median()
factor_by_day=factor_by_day.merge(mid,left_on='industry',right_index=True,how='left')
factor_by_day.loc[pd.isnull(factor_by_day['values_x']),'values_x']=factor_by_day.loc[pd.isnull(factor_by_day['values_x']),'values_y']
fill_factor=fill_factor.append(factor_by_day['values_x'].to_frame(date).T)
return fill_factor
def winsorize(se):
q=se.quantile([0.025, 0.975])
if isinstance(q,pd.Series) and len(q) == 2:
se[se<q.iloc[0]]=q.iloc[0]
se[se>q.iloc[1]]=q.iloc[1]
return se
def standardize(se):
mean=se.mean()
std=se.std()
se=(se - mean)/std
return se
def neutralize(factor_se,industry_se,market_cap_se):
# 行业数据
group=array(industry_se.tolist())
dummy=sm.categorical(group,drop=True)
# 市值对数化
market_cap_se_log=np.log(market_cap_se.tolist())
# 自变量
X=np.c_[dummy,market_cap_se_log]
y=factor_se
# 回归
model=sm.OLS(y,X)
results=model.fit()
y_fitted=results.fittedvalues
neutralize_factor_se=factor_se-y_fitted
return neutralize_factor_se
def pretreat_factor(factor_df,all_industry_df,MC_df,univ_dict,factor_name):
pretreat_factor_df=pd.DataFrame()
danger_list=[]
for date in list(univ_dict.keys()): #循环从这儿开始
# 把该日的因子,行业,市值数据取好。
univ=univ_dict[date]
factor_se_withnan=factor_df.loc[date,univ]
factor_se=factor_df.loc[date,univ].dropna()
stock_list=factor_se.index
market_cap_se=MC_df.loc[date,stock_list]
industry_se=all_industry_df.loc[date,stock_list]
# 进行数据处理
factor_se=winsorize(factor_se)
factor_se=neutralize(factor_se,industry_se,market_cap_se)
factor_se=standardize(factor_se)
# 把中性化的数据赋值
factor_se_withnan[factor_se.index]=factor_se
pretreat_factor_df=pretreat_factor_df.append(factor_se_withnan.to_frame(date).T)
danger=isnan(factor_se_withnan).sum()/len(factor_se_withnan)
danger_list.append(danger)
return pretreat_factor_df,danger_list
# Step III: 处理结果
def ic_calculator(factor,return_df,univ_dict):
ic_list=[]
p_value_list=[]
for date in list(univ_dict.keys())[:-1]: #这里是循环
univ=univ_dict[date]
univ=list(set(univ)&set(factor.loc[date].dropna().index)&set(return_df.loc[date].dropna().index))
if len(univ)<10:
continue
factor_se=factor.loc[date,univ]
return_se=return_df.loc[date,univ]
ic,p_value=st.spearmanr(factor_se,return_se)
ic_list.append(ic)
p_value_list.append(p_value)
return ic_list
# 【模块三:因子计算中层方程】
# Step 1: 初始化准备数据 PrepareData
def prepareData(trade_date_list):
print('1.1正在汇总股票....')
univ_list=get_stock_universe(trade_date_list,g_index)
print('1.2正在汇总回报....')
return_df,all_return_df=get_return(trade_date_list,g_count)
#return_df.index=dateTransform(return_df.index) **********************
print('1.3正在汇总因子字典....')
pool=ThreadPool(processes=16)
frame_list=pool.map(get_factor_by_day,trade_date_list)
pool.close()
pool.join()
all_factor_dict={}
for fac in g_factor_list:
y=[x[fac.name] for x in frame_list]
y=pd.concat(y,axis=0)
#y.index=dateTransform(y.index) ************************
all_factor_dict[fac.name]=y
print('1.4正在市值行业....')
pool=ThreadPool(processes=16)
frame_list=pool.map(get_market_cap_by_day,trade_date_list)
pool.close()
pool.join()
y=[x['MC'] for x in frame_list]
MC_df=pd.concat(y,axis=0)
#MC_df.index=dateTransform(MC_df.index) ***********************
print('1.5正在汇总行业....')
pool=ThreadPool(processes=16)
frame_list=pool.map(get_Industry_by_day,trade_date_list)
pool.close()
pool.join()
all_industry_df=pd.concat(frame_list)
#all_industry_df.index=dateTransform(all_industry_df.index) ***********************
print('完成')
# 获取univ_dict
univ_dict={}
for i in range(len(return_df)):
univ_dict[return_df.index[i]]=univ_list[i]
#股票要保证有行业信息和市值信息,才能保证后续的正确运行。所以要根据行业和市值数据更新一下univ_dict。
univ_dict=get_new_univ_dict(univ_dict,all_industry_df,MC_df)
return univ_dict,return_df,all_return_df,all_factor_dict,MC_df,all_industry_df
# Step 2: 修理数据 all_factor_dict
def TrimData(univ_dict,all_factor_dict,MC_df,all_industry_df):
i=1
new_all_factor_dict={}
print('修理数据进度\n')
for factor in g_factor_list:
factor_df=all_factor_dict[factor.name]
#2.1 把nan用行业中位数代替,依然会有nan,比如说整个行业没有该项数据,或者该行业仅有此一只股票,且为nan。
factor_df=replace_nan_indu(all_industry_df,factor_df,univ_dict)
#2.2 去极值、中性化、标准化,上述的nan依然为nan。
factor_df,danger_list=pretreat_factor(factor_df,all_industry_df,MC_df,univ_dict,factor.name)
new_all_factor_dict[factor.name]=factor_df
print("%.2f %%" %(i/len(g_factor_list)*100))
i=i+1
if max(danger_list)>0.05:
print("dangerous factor %s %f %f" % (factor.name,min(danger_list),max(danger_list)),end=',')
return new_all_factor_dict
# 【故事开始了......】
global g_index
global g_factor_list
global g_count
global g_univ_dict
g_univ_dict=0
g_index='000300.XSHG'
g_factor_list=[DSRI(),AQI(),SGAI(),GMI(),SGI(),LVGI(),TATA()]
g_count=250
# 获取当前日期
today=datetime.date.today()
yesterday=jqdata.get_trade_days(end_date=today,count=2)[0]
trade_date_list=get_trade_dates(yesterday,g_count,20) # 将用于计算的时间序列
trade_date_list=dateTransform(trade_date_list)
# Step 1: 初始化准备数据 PrepareData
univ_dict,return_df,all_return_df,all_factor_dict,MC_df,all_industry_df=prepareData(trade_date_list)
# Step 2: 修理数据
all_factor_dict=TrimData(univ_dict,all_factor_dict,MC_df,all_industry_df)
1.1正在汇总股票.... 1.2正在汇总回报.... 1.3正在汇总因子字典.... 1.4正在市值行业.... 1.5正在汇总行业.... 完成 修理数据进度 14.29 % 28.57 % 42.86 % dangerous factor SGAI 0.070000 0.070000,57.14 % dangerous factor GMI 0.070000 0.070000,71.43 % 85.71 % 100.00 % dangerous factor TATA 0.070000 0.070000,
# 【将结果写入Package并Pickle序列化】
Package=[univ_dict,return_df,all_return_df,all_factor_dict,MC_df,all_industry_df]
#使用pickle模块将数据对象保存到文件
import pickle
pkl_file = open('Z1Package.pkl', 'wb')
pickle.dump(Package, pkl_file, 0)
pkl_file.close()
import pickle
pkl_file = open('Z1Package.pkl', 'rb')
load_Package = pickle.load(pkl_file)
univ_dict,return_df,all_return_df,all_factor_dict,MC_df,all_industry_df=load_Package
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...