请 [注册] 或 [登录]  | 返回主站

量化交易吧 /  数理科学 帖子:3364712 新帖:0

多因子回测框架(上)--生成因子

专门亏损发表于:5 月 10 日 07:09回复(1)

第一块放因子
第二块是计算用到的所有方程
第三块开始计算
第四块pickle存储

多因子回测框架(下)--检验因子

#【模块一:因子对象】
import jqdata
import datetime
from multiprocessing.dummy import Pool as ThreadPool
from jqfactor import Factor,calc_factors
import pandas as pd
import statsmodels.api as sm
import scipy.stats as st

class MC(Factor):
    name='MC'
    max_window=1
    dependencies=['market_cap']
    def calc(self,data):
        ans=data['market_cap']
        return ans.mean()

class DSRI(Factor):
    name='DSRI'
    max_window = 1
    dependencies = ['account_receivable','account_receivable_1','operating_revenue','operating_revenue_1']
    def calc(self, data):
        ans=(data['account_receivable']/data['operating_revenue'])/(data['account_receivable_1']/data['operating_revenue_1'])
        return ans.mean()

class AQI(Factor):
    name='AQI'
    max_window=1
    dependencies = ['intangible_assets','intangible_assets_1','total_assets','total_assets_1']
    def calc(self, data):
        ans=(data['intangible_assets']/data['total_assets'])/(data['intangible_assets_1']/data['total_assets_1'])
        return ans.mean()
    
class SGAI(Factor):
    name='SGAI'
    max_window=1
    dependencies = ['administration_expense','sale_expense','operating_revenue','administration_expense_1','sale_expense_1','operating_revenue_1']
    def calc(self, data):
        ans=((data['administration_expense']+data['sale_expense'])/data['operating_revenue'])/((data['administration_expense_1']+data['sale_expense_1'])/data['operating_revenue_1'])
        return ans.mean()
        
class GMI(Factor):
    name='GMI'
    max_window=1
    dependencies=['gross_profit_margin','gross_profit_margin_1']
    def calc(self,data):
        ans=data['gross_profit_margin_1']/data['gross_profit_margin']
        return ans.mean()
    
class SGI(Factor):
    name='SGI'
    max_window=1
    dependencies=['operating_revenue','operating_revenue_1']
    def calc(self,data):
        ans=data['operating_revenue']/data['operating_revenue_1']
        return ans.mean()    
    
class LVGI(Factor):
    name='LVGI'
    max_window=1
    dependencies=['total_liability','total_assets','total_liability_1','total_assets_1']
    def calc(self,data):
        ans=(data['total_liability']/data['total_assets'])/(data['total_liability_1']/data['total_assets_1'])
        return ans.mean()

class TATA(Factor):
    name='TATA'
    max_window=1
    dependencies=['accounts_payable','total_assets']
    def calc(self,data):
        ans=data['accounts_payable']/data['total_assets']
        return ans.mean()
#【模块二:因子计算-底层方程】
#--------------------Step I: 准备数据--------------------------
def get_trade_dates(end,count=250,interval=20):
    date_list=list(jqdata.get_trade_days(end_date=end,count=count))
    date_list=date_list[::-1]
    date_list=list(filter(lambda x:date_list.index(x)%interval==0,date_list))
    date_list=date_list[::-1]
    return date_list


def get_stock_pool(date,index='all'):                    # [date时刻的股票池]       (filtered all/index stocks)
    df=get_all_securities(types=['stock'],date=date)
    dayBefore=jqdata.get_trade_days(end_date=date,count=60)[0]      #上市不足60天
    df=df[df['start_date']<dayBefore]                               #上市不足count天的去掉
    universe_pool=list(df.index)
    if index=='all':
        stock_pool=universe_pool
    else:
        index_pool=get_index_stocks(index,date=date)
        stock_pool=list(set(index_pool)&set(universe_pool))
    return stock_pool

def get_stock_universe(trade_days_list,index='all'):     #  [all date的股票总列表]*   (filtered all/index stocks)           
    univ_list=[]
    for date in trade_days_list:
        stock_pool=get_stock_pool(date,index)
        univ_list.append(stock_pool)    
    return univ_list

def get_return(trade_date_list,count=250):               #  [获得所有股票的历史回报]*  (all stocks)
    date=max(trade_date_list)
    universe=get_stock_pool(date,index='all')
    price=get_price(universe,end_date=date,count=count,fields=['close'])['close']
    return_df=price.loc[trade_date_list].pct_change().shift(-1)
    #return_df.index=dateTransform(return_df.index)
    all_return_df=price.pct_change().shift(-1)
    return return_df,all_return_df

def get_factor_by_day(date):                            #  [获取某一日的因子]          (unfiltered all/index stocks)   
    #factor_dict的key是因子名字,value是DataFrame,其中行为日期,列为股票。
    factor_list=g_factor_list 
    index=g_index
    if index == 'all':
        universe = get_all_securities(types=['stock'],date=date).index.tolist()
    else:
        universe = get_index_stocks(index,date=date)     
    factor_dict = calc_factors(universe,factor_list,date,date)
    return factor_dict

def get_market_cap_by_day(date):                      #  [获取某一日的市值]         (unfiltered all/index stocks)
    #返回市值的dateframe
    index=g_index
    if index == 'all':
        universe = get_all_securities(types=['stock'], date=date).index.tolist()
    else:
        universe = get_index_stocks(index, date=date)  
    MC_df = calc_factors(universe,[MC()],date,date)
    return MC_df

def get_Industry_by_day(date):                       # [获取某一日的行业]          (unfiltered all/index stocks)
    # 返回行业的dataframe
    index=g_index
    if index == 'all':
        universe = get_all_securities(types=['stock'], date=date).index.tolist()
    else:
        universe = get_index_stocks(index, date=date)
    industry_set = ['801010', '801020', '801030', '801040', '801050', '801080', '801110', '801120', '801130', 
                  '801140', '801150', '801160', '801170', '801180', '801200', '801210', '801230', '801710',
                  '801720', '801730', '801740', '801750', '801760', '801770', '801780', '801790', '801880','801890']
    industry_df = pd.DataFrame(index=[date],columns=universe)
    for industry in industry_set:
        industry_stocks = get_industry_stocks(industry,date = date)
        industry_stocks = list(set(industry_stocks)&set(universe))
        industry_df.loc[date,industry_stocks] = industry
    return industry_df

def dateTransform(date_list):
    date_list_str=map(lambda x: x.strftime('%Y-%m-%d'),date_list)
    date_list_datetime=map(lambda x:datetime.datetime.strptime(x,'%Y-%m-%d'),date_list_str)
    return list(date_list_datetime)

## ---------------------------------Step II: 处理数据--------------------------- 
def get_new_univ_dict(univ_dict,all_industry_df,MC_df):
    new_univ_dict={}
    for date in list(univ_dict.keys()):
        new_univ_list=list(set(univ_dict[date])&\
                           set(all_industry_df.loc[date].dropna().index)&\
                           set(MC_df.loc[date].dropna().index))
        new_univ_dict[date]=new_univ_list
    return new_univ_dict

def replace_nan_indu(all_industry_df,factor_df,univ_dict):
    fill_factor=pd.DataFrame()
    for date in list(univ_dict.keys()):
        univ=univ_dict[date]
        factor_by_day=factor_df.loc[date,univ].to_frame('values')
        industry_by_day=all_industry_df.loc[date,univ].dropna().to_frame('industry')
        factor_by_day=factor_by_day.merge(industry_by_day,left_index=True,right_index=True,how='inner')
        mid=factor_by_day.groupby('industry').median()
        factor_by_day=factor_by_day.merge(mid,left_on='industry',right_index=True,how='left')
        factor_by_day.loc[pd.isnull(factor_by_day['values_x']),'values_x']=factor_by_day.loc[pd.isnull(factor_by_day['values_x']),'values_y']
        fill_factor=fill_factor.append(factor_by_day['values_x'].to_frame(date).T)
    return fill_factor

def winsorize(se):
    q=se.quantile([0.025, 0.975])
    if isinstance(q,pd.Series) and len(q) == 2:
        se[se<q.iloc[0]]=q.iloc[0]
        se[se>q.iloc[1]]=q.iloc[1]
    return se

def standardize(se):
    mean=se.mean()
    std=se.std()
    se=(se - mean)/std
    return se

def neutralize(factor_se,industry_se,market_cap_se):
    # 行业数据
    group=array(industry_se.tolist())
    dummy=sm.categorical(group,drop=True)
    # 市值对数化
    market_cap_se_log=np.log(market_cap_se.tolist())
    # 自变量
    X=np.c_[dummy,market_cap_se_log]
    y=factor_se
    # 回归
    model=sm.OLS(y,X)
    results=model.fit()
    y_fitted=results.fittedvalues
    neutralize_factor_se=factor_se-y_fitted
    return neutralize_factor_se

def pretreat_factor(factor_df,all_industry_df,MC_df,univ_dict,factor_name):
    pretreat_factor_df=pd.DataFrame()
    danger_list=[]
    for date in list(univ_dict.keys()):     #循环从这儿开始
        # 把该日的因子,行业,市值数据取好。
        univ=univ_dict[date]
        factor_se_withnan=factor_df.loc[date,univ]
        factor_se=factor_df.loc[date,univ].dropna()
        stock_list=factor_se.index
        market_cap_se=MC_df.loc[date,stock_list]
        industry_se=all_industry_df.loc[date,stock_list]
        # 进行数据处理
        factor_se=winsorize(factor_se)
        factor_se=neutralize(factor_se,industry_se,market_cap_se)
        factor_se=standardize(factor_se)
        # 把中性化的数据赋值
        factor_se_withnan[factor_se.index]=factor_se
        pretreat_factor_df=pretreat_factor_df.append(factor_se_withnan.to_frame(date).T)
        danger=isnan(factor_se_withnan).sum()/len(factor_se_withnan)
        danger_list.append(danger)
    return pretreat_factor_df,danger_list

# Step III: 处理结果
def ic_calculator(factor,return_df,univ_dict):
    ic_list=[]
    p_value_list=[]
    for date in list(univ_dict.keys())[:-1]:   #这里是循环
        univ=univ_dict[date]
        univ=list(set(univ)&set(factor.loc[date].dropna().index)&set(return_df.loc[date].dropna().index))
        if len(univ)<10:
            continue
        factor_se=factor.loc[date,univ]
        return_se=return_df.loc[date,univ]
        ic,p_value=st.spearmanr(factor_se,return_se)
        ic_list.append(ic)
        p_value_list.append(p_value)
    return ic_list
# 【模块三:因子计算中层方程】
# Step 1: 初始化准备数据  PrepareData
def prepareData(trade_date_list):
    print('1.1正在汇总股票....')
    univ_list=get_stock_universe(trade_date_list,g_index)
      
    print('1.2正在汇总回报....')
    return_df,all_return_df=get_return(trade_date_list,g_count)
    #return_df.index=dateTransform(return_df.index)                **********************

    print('1.3正在汇总因子字典....')
    pool=ThreadPool(processes=16)
    frame_list=pool.map(get_factor_by_day,trade_date_list)
    pool.close()
    pool.join()
    all_factor_dict={}
    for fac in g_factor_list:
        y=[x[fac.name] for x in frame_list]
        y=pd.concat(y,axis=0)
        #y.index=dateTransform(y.index)                           ************************
        all_factor_dict[fac.name]=y
        
    print('1.4正在市值行业....')
    pool=ThreadPool(processes=16)
    frame_list=pool.map(get_market_cap_by_day,trade_date_list)
    pool.close()
    pool.join()
    y=[x['MC'] for x in frame_list]
    MC_df=pd.concat(y,axis=0)
    #MC_df.index=dateTransform(MC_df.index)                        ***********************
        
    print('1.5正在汇总行业....')
    pool=ThreadPool(processes=16)
    frame_list=pool.map(get_Industry_by_day,trade_date_list)
    pool.close()
    pool.join()
    all_industry_df=pd.concat(frame_list)
    #all_industry_df.index=dateTransform(all_industry_df.index)    ***********************
    print('完成')
    
    # 获取univ_dict
    univ_dict={}
    for i in range(len(return_df)):
        univ_dict[return_df.index[i]]=univ_list[i]
    
    #股票要保证有行业信息和市值信息,才能保证后续的正确运行。所以要根据行业和市值数据更新一下univ_dict。
    univ_dict=get_new_univ_dict(univ_dict,all_industry_df,MC_df)
    
    return univ_dict,return_df,all_return_df,all_factor_dict,MC_df,all_industry_df

# Step 2: 修理数据  all_factor_dict 
def TrimData(univ_dict,all_factor_dict,MC_df,all_industry_df):
    i=1
    new_all_factor_dict={}
    print('修理数据进度\n')
    for factor in g_factor_list:
        factor_df=all_factor_dict[factor.name]
        #2.1 把nan用行业中位数代替,依然会有nan,比如说整个行业没有该项数据,或者该行业仅有此一只股票,且为nan。
        factor_df=replace_nan_indu(all_industry_df,factor_df,univ_dict)
        #2.2 去极值、中性化、标准化,上述的nan依然为nan。
        factor_df,danger_list=pretreat_factor(factor_df,all_industry_df,MC_df,univ_dict,factor.name)
        new_all_factor_dict[factor.name]=factor_df
        print("%.2f %%" %(i/len(g_factor_list)*100)) 
        i=i+1
        if max(danger_list)>0.05:
            print("dangerous factor %s %f %f" % (factor.name,min(danger_list),max(danger_list)),end=',')       
    return new_all_factor_dict
# 【故事开始了......】
global g_index
global g_factor_list
global g_count
global g_univ_dict

g_univ_dict=0
g_index='000300.XSHG'
g_factor_list=[DSRI(),AQI(),SGAI(),GMI(),SGI(),LVGI(),TATA()]
g_count=250

# 获取当前日期
today=datetime.date.today()
yesterday=jqdata.get_trade_days(end_date=today,count=2)[0]
trade_date_list=get_trade_dates(yesterday,g_count,20)   # 将用于计算的时间序列
trade_date_list=dateTransform(trade_date_list)

# Step 1: 初始化准备数据  PrepareData
univ_dict,return_df,all_return_df,all_factor_dict,MC_df,all_industry_df=prepareData(trade_date_list)

# Step 2: 修理数据
all_factor_dict=TrimData(univ_dict,all_factor_dict,MC_df,all_industry_df)
1.1正在汇总股票....
1.2正在汇总回报....
1.3正在汇总因子字典....
1.4正在市值行业....
1.5正在汇总行业....
完成
修理数据进度

14.29 %
28.57 %
42.86 %
dangerous factor SGAI 0.070000 0.070000,57.14 %
dangerous factor GMI 0.070000 0.070000,71.43 %
85.71 %
100.00 %
dangerous factor TATA 0.070000 0.070000,
# 【将结果写入Package并Pickle序列化】
Package=[univ_dict,return_df,all_return_df,all_factor_dict,MC_df,all_industry_df]

#使用pickle模块将数据对象保存到文件
import pickle
pkl_file = open('Z1Package.pkl', 'wb')
pickle.dump(Package, pkl_file, 0)
pkl_file.close()
import pickle
pkl_file = open('Z1Package.pkl', 'rb')
load_Package = pickle.load(pkl_file)
univ_dict,return_df,all_return_df,all_factor_dict,MC_df,all_industry_df=load_Package
 
 

全部回复

0/140

量化课程

    移动端课程