MongoDB是由C 语言编写的一个基于分布式文件存储的开源NoSQL数据库系统,它提供了可扩展的高性能数据存储解决方案。
由于MongoDB是一个面向文档存储的数据库,所以操作起来比较简单和容易。MongoDB支持丰富的查询表达式,高性能的插入与查询操作,并在负载调节方面提供了较好的支持。MongoDB的这些特性很好地适应了量化策略开发项目对证券数据存储的需求。本文利用聚宽平台的本地化API获取证券数据,并以K线数据为例,演示了如何将获取到的K线数据存储在本地。
我们首先建立一个访问数据库的配置文件(这是一个json文件),这是为了方便在项目中同时访问多个数据源。
{"default": {// 这是一个常规的配置方案// 把这个配置命名为"default"是有专门考量的,后文会提到这个问题"host": "127.0.0.1:57017","dbname": "jq","username": "user1","password": "12345","dbauth": "admin" },"server1": {// 当你的数据库没有配置用户角色的时候// 你只需要填写下面这两个参数即可"host": "127.0.0.1:57016","dbname": "ttt" },"server2": {// 这实际上是配置了一个复制集// 关于复制集的概念,就不展开了"host": "127.0.0.1","replicaset": "test_rs" },"shard_a": {// 这是配置了一个分片服务器// 看起来和配置一个常规的数据库没什么区别"host": "127.0.0.1:40000","dbname": "ttt" } }
创建好数据库的访问配置文件后,紧接着就是添加一个连接数据库的db.py文件了:
# -*- coding: utf-8 -*-from pymongo import * # MongoDB的连接库import json# 构建一个默认的MongoDB对象置于项目运行的内存当中,避免频繁# 的创建MongoDB对象config_file = 'E:/db_config.json' # 上面创建的那个json文件的路径try:with open(config_file) as f: l = f.read() a = json.loads(l) db = a['default'] # 默认访问的数据库,上文说过uri = '/?connectTimeoutMS=2000'if 'replicaset' in fields: uri = ';replicaSet=%s' % db['replicaset']if 'dbauth' in fields: uri = ';authSource=%s' % db['dbauth'] print('db-uri:', uri) connection = MongoClient(uri)if 'dbname' in fields: mongodb = connection[db['dbname']]else: mongodb = Noneexcept Exception:raise Exception('connection MongoDB raise a error')class MongoDB:"""数据库对象"""connection_count = 0# 当你在程序中,需要访问另一个数据库,那你就需要再临时创建一个连接对象了# 为了避免连续访问相同的数据库对象,设置location、connection# 两个类属性,保留最近一次访问的对象location = Noneconnection = Nonedb_name = None @classmethoddef db_connection(cls, location, db_name=None):""" 连接到数据库 :param db_name: :param location: :return: """try:if location == cls.location and cls.connection is not None:# 这个连接对象已经存在if db_name is not None:return cls.connection[db_name]else:# 正常情况下,cls.connection不为空的时候,cls.db_name# 也不为空return cls.connection[cls.db_name]else:with open(config_file) as cf: buffer = cf.read() jn = json.loads(buffer) db_ = jn[location] fields = db_.keys() uri = 'mongodb://'if 'username' in fields and 'password' in fields: uri = '{username}:{password}@'.format(username=db_['username'], password=db_['password'])# ip是必须的uri = db_['host']# if 'port' in fields:# uri = ':%s' % db_['port']uri = '/?connectTimeoutMS=2000'if 'replicaset' in fields: uri = ';replicaSet=%s' % db_['replicaset']if 'dbauth' in fields: uri = ';authSource=%s' % db_['dbauth'] print('new db-uri:', uri) connection = MongoClient(uri) tn = db_name if db_name is not None else db_['dbname'] cls.location = location cls.connection = connection cls.db_name = tnreturn connection[tn]except Exception:raise Exception('connection MongoDB raise a error')
有了上面的步骤,我们就获得了一个即能访问单点服务器,有能访问复制集、分片服务器的连接对象了。下面就是访问数据库表的数据了,在这里我们创建一个base_model.py文件,它扮演的角色类似于controller,可以完成对数据库中所有表的映射,这其实更多的得益于NoSQL的特性。
# -*- coding: utf-8 -*-import pymongofrom bson import ObjectIdfrom .db import MongoDB, ASCENDING, DESCENDING, mongodb, connectionclass BaseModel(object):""" _id 是 mongo 自带的,必须有这个字段 其余 __fields__ 的固定属性,未来会逐步添加 classtype 是类名的小写 """__fields__ = ['_id',# (字段名, 类型)# ('classtype', str),]def __init__(self, tn=None, location=None, dbname=None):name = self.__class__.__name__ self.tablename = tn.strip() if tn is not None and len(tn) else name.lower()if location is None and dbname is None:if mongodb is not None: self.mc = mongodb[self.tablename]else:raise Exception('Unable to find *ailable dbname')elif location is None and dbname is not None: self.mc = connection[dbname][self.tablename]else: self.location = location self.dbname = dbname self.mc = MongoDB.db_connection(self.location, self.dbname)[self.tablename]def insert(self, *args, **kwargs):""" 插入一条数据 """_ = kwargs if len(kwargs) else args[0] _['classtype'] = self.tablename# 去掉 _id 这个特殊的字段if '_id' in _: _['_id'] = ObjectId() m = self.mc.insert_one(_)return mdef insert_batch(self, *args):""" 批量插入数据 """_ = list()if len(args) == 1: _ = args[0]if isinstance(_, list):passelse: _ = [_]elif len(args) > 1: _ = args result = []for i in _:if '_id' in i: i['_id'] = ObjectId()# del i['_id']i['classtype'] = self.tablename.lower()try:if len(_): result = self.mc.insert_many(_)except pymongo.errors.BulkWriteError as e:if isinstance(_, list): r = _[0]else: r = _#log('insert_batch', self.tablename, r, msg=e.details['writeErrors'])def query(self, sql=None, field=None):""" 数据查询 返回 list 找不到则返回 [] """# _ = kwargs if len(kwargs) else args[0] if len(args) else Noneds = self.mc.find(sql, projection=field)return dsdef aggregate(self, pipeline, allowDiskUse=True):""" 聚合函数 :param pipeline: list 聚合表达式 :param allowDiskUse: 运行使用磁盘来处理超过100M的数据 :return: """return self.mc.aggregate(pipeline, allowDiskUse=allowDiskUse)def query_one(self, sql=None, field=None):""" 查找并返回第一个元素 找不到就返回 None """# _ = kwargs if len(kwargs) else args[0]l = self.mc.find_one(sql, projection=field)return ldef update(self, cond, form):""" """self.mc.find_one_and_update(cond, {"$set": form}, upsert=False)def update_batch(self, condition, form):""" 批量更新 :param condition: :param form: :return: """return self.mc.update_many(condition, {"set": form})# set前有一个美元符号passdef distinct(self, field, sql=None):return self.query(sql=sql).distinct(field)def remove(self, *args, **kwargs):_ = kwargs if len(kwargs) else args[0] result = self.mc.delete_many(_)
我们进一步对BaseModel进行封装,使其更适合数据分析人员,尤其是那些对非计算机专业的编程人员,因为他们希望看到的数据就是一张表,类似于Excel,而不是一堆字典或数组。我们再创建一个文件modeldata.py:
import datetimeimport pandas as pdimport numpy as npfrom ..db import BaseModel#from Calf.exception import MongoIOError, FileError, ExceptionInfo, \# WarningMessage, SuccessMessage # 这都是一些关于异常处理的自定义方法,可以先不管,代码中报错的可以先注释掉class ModelData(object):""" 有关公共模型所有的IO(数据库)将通过这个类实现. 通用的IO方法 """def __init__(self, location=None, dbname=None):self.location = location self.dbname = dbnamepass# @classmethoddef field(self, table_name, field_name, filter=None):""" Query the value of a field in the database :param filter: :param table_name: the database's table name :param field_name: the table's field name :return: all values in database """try:return BaseModel(table_name, self.location, self.dbname).distinct(field_name, filter)except Exception:raise MongoIOError('query the field raise a error')# @classmethoddef max(self, table_name, field='_id', **kw):""" 找到满足kw条件的field列上的最大值 :param table_name: :param field: :param kw: :return: """try:if not isinstance(field, str):raise TypeError('field must be an instance of str') cursor = BaseModel(table_name, self.location, self.dbname).query(sql=kw, field={field: True})if cursor.count(): d = pd.DataFrame(list(cursor)) m = d.loc[:, [field]].max()[field]else: m = Nonecursor.close()return mexcept Exception as e:raise e# @classmethoddef min(self, table_name, field='_id', **kw):""" 找到满足kw条件的field列上的最小值 :param table_name: :param field: :param kw: :return: """try:if not isinstance(field, str):raise TypeError('field must be an instance of str') cursor = BaseModel(table_name, self.location, self.dbname).query(sql=kw, field={field: True})if cursor.count(): d = pd.DataFrame(list(cursor)) m = d.loc[:, [field]].min()[field]else: m = Nonecursor.close()return mexcept Exception as e:raise e# @classmethoddef insert_data(self, table_name, data):""" 一个简易的数据插入接口 :param table_name: :param data: :return: """try:if len(data): d = data.to_dict(orient='records') BaseModel(table_name, self.location, self.dbname).insert_batch(d)except Exception:raise MongoIOError('Failed with insert data by MongoDB')def insert_one(self, table_name, data):""" insert one record :param table_name: :param data: a dict :return: """try: BaseModel(table_name, self.location, self.dbname).insert(data)except Exception:raise MongoIOError('Failed with insert data by MongoDB')def read_one(self, table_name, field=None, **kw):""" 有时候只需要读一条数据,没必要使用read_data, :param table_name: :param field: :param kw: :return: a dict or None """try: cursor = BaseModel(table_name, self.location, self.dbname).query_one(kw, field)except Exception as e: ExceptionInfo(e)finally:return cursor# @classmethoddef read_data(self, table_name, field=None, **kw):""" 一个简易的数据读取接口 :param table_name: :param field: :param kw: :return: """try: cursor = BaseModel(table_name, self.location, self.dbname).query(kw, field) data = pd.DataFrame()if cursor.count(): data = pd.DataFrame(list(cursor))except Exception as e: ExceptionInfo(e)finally: cursor.close()return datadef aggregate(self, table_name, pipeline):""" :param table_name: :param pipeline: :return: """try: cursor = BaseModel(table_name, self.location, self.dbname).aggregate(pipeline)# data = pd.DataFrame()# if cursor.count():data = pd.DataFrame(list(cursor))except Exception as e: ExceptionInfo(e)finally: cursor.close()return data# @classmethoddef update_data(self, table_name, condition, **kw):""" 按condition条件更新table_name表数据 :param table_name: :param condition: 形如{‘date':datetime.datetime(2018,1,1)}的一个字典 :param kw:形如close=0这样的参数组 :return: """try: r = BaseModel(table_name, self.location, self.dbname).update_batch(condition, kw)return rexcept Exception:raise MongoIOError('Failed with update by MongoDB')# @classmethoddef remove_data(self, table_name, **kw):""" 删除数据 :param table_name: :param kw: :return: """try: r = BaseModel(table_name, self.location, self.dbname).remove(kw)return rexcept Exception:raise MongoIOError('Failed with delete data by MongoDB')
有了ModelData类,数据库中的数据对我们来说全都是DataFrame了,对于数据分析人员来说,再好不过了。但如果你对pandas还不是很熟悉,老兄,对不起,你是时候改变了。
下面我们就利用聚宽的本地API接口把K线数据下载下来存在我们本地吧。
import pandas as pdimport jqdatasdk as jqimport ModelData as md jq.auth('***', '***') data = jq.get_all_securities(types=[], date=None)for i, r in data.iterrows(): d = jq.get_price(r.code, start_date=dt.datetime(2018, 1, 1), end_date=dt.datetime(2018, 10, 30), frequency='daily', fields=None, skip_paused=False, fq='pre', count=None)# 这里,其实你还可以对d进行一些调整# 将d插入数据库中,md()括号中不传参数,就会插入到默认的那个数据库中==>"default"md().insert_data(table_name='kline_day', data=d)# 当你要把d插入到另一个数据库中,则需要在md()中传入参数,像这样:# md(location='server1').insert_data(table_name='kline_day', data=d) # 'server1'我们在数据库配置那个json中见过的
本社区仅针对特定人员开放
查看需注册登录并通过风险意识测评
5秒后跳转登录页面...