# encoding: UTF-8

"""

本模块中主要包含:

  1. 从通联数据下载历史行情的引擎

  2. 用来把MultiCharts导出的历史数据载入到MongoDB中用的函数

  3. 增加从通达信导出的历史数据载入到MongoDB中的函数

"""

from datetime import datetime, timedelta

import pymongo

from time import time

from multiprocessing.pool import ThreadPool

from ctaBase import *

from vtConstant import *

from vtFunction import loadMongoSetting

from datayesClient import DatayesClient

# 以下为vn.trader和通联数据规定的交易所代码映射

VT_TO_DATAYES_EXCHANGE = {}

VT_TO_DATAYES_EXCHANGE[EXCHANGE_CFFEX] = 'CCFX' # 中金所

VT_TO_DATAYES_EXCHANGE[EXCHANGE_SHFE] = 'XSGE' # 上期所

VT_TO_DATAYES_EXCHANGE[EXCHANGE_CZCE] = 'XZCE' # 郑商所

VT_TO_DATAYES_EXCHANGE[EXCHANGE_DCE] = 'XDCE' # 大商所

DATAYES_TO_VT_EXCHANGE = {v:k for k,v in VT_TO_DATAYES_EXCHANGE.items()}

########################################################################

class HistoryDataEngine(object):

"""CTA模块用的历史数据引擎"""



\#----------------------------------------------------------------------

def \_\_init\_\_\(self\):

    """Constructor"""

    host, port, logging = loadMongoSetting\(\)



    self.dbClient = pymongo.MongoClient\(host, port\)

    self.datayesClient = DatayesClient\(\)



\#----------------------------------------------------------------------

def lastTradeDate\(self\):

    """获取最近交易日(只考虑工作日,无法检查国内假期)"""

    today = datetime.now\(\)

    oneday = timedelta\(1\)



    if today.weekday\(\) == 5:

        today = today - oneday

    elif today.weekday\(\) == 6:

        today = today - oneday\*2        



    return today.strftime\('%Y%m%d'\)



\#----------------------------------------------------------------------

def readFuturesProductSymbol\(self\):

    """查询所有期货产品代码"""

    cx = self.dbClient\[SETTING\_DB\_NAME\]\['FuturesSymbol'\].find\(\)

    return set\(\[d\['productSymbol'\] for d in cx\]\)    \# 这里返回的是集合(因为会重复)



\#----------------------------------------------------------------------

def readFuturesSymbol\(self\):

    """查询所有期货合约代码"""

    cx = self.dbClient\[SETTING\_DB\_NAME\]\['FuturesSymbol'\].find\(\)

    return \[d\['symbol'\] for d in cx\]    \# 这里返回的是列表



\#----------------------------------------------------------------------

def downloadFuturesSymbol\(self, tradeDate=''\):

    """下载所有期货的代码"""

    if not tradeDate:

        tradeDate = self.lastTradeDate\(\)



    self.dbClient\[SETTING\_DB\_NAME\]\['FuturesSymbol'\].ensure\_index\(\[\('symbol', pymongo.ASCENDING\)\], 

                                                                   unique=True\)





    path = 'api/market/getMktMFutd.json'



    params = {}

    params\['tradeDate'\] = tradeDate



    data = self.datayesClient.downloadData\(path, params\)



    if data:

        for d in data:

            symbolDict = {}

            symbolDict\['symbol'\] = d\['ticker'\]

            symbolDict\['productSymbol'\] = d\['contractObject'\]

            flt = {'symbol': d\['ticker'\]}



            self.dbClient\[SETTING\_DB\_NAME\]\['FuturesSymbol'\].update\_one\(flt, {'$set':symbolDict}, 

                                                                       upsert=True\)

        print u'期货合约代码下载完成'

    else:

        print u'期货合约代码下载失败'



\#----------------------------------------------------------------------

def downloadFuturesDailyBar\(self, symbol\):

    """

    下载期货合约的日行情,symbol是合约代码,

    若最后四位为0000(如IF0000),代表下载连续合约。

    """

    print u'开始下载%s日行情' %symbol



    \# 查询数据库中已有数据的最后日期

    cl = self.dbClient\[DAILY\_DB\_NAME\]\[symbol\]

    cx = cl.find\(sort=\[\('datetime', pymongo.DESCENDING\)\]\)

    if cx.count\(\):

        last = cx\[0\]

    else:

        last = ''



    \# 主力合约

    if '0000' in symbol:

        path = 'api/market/getMktMFutd.json'



        params = {}

        params\['contractObject'\] = symbol.replace\('0000', ''\)

        params\['mainCon'\] = 1

        if last:

            params\['startDate'\] = last\['date'\]

    \# 交易合约

    else:

        path = 'api/market/getMktFutd.json'



        params = {}

        params\['ticker'\] = symbol

        if last:

            params\['startDate'\] = last\['date'\]



    \# 开始下载数据

    data = self.datayesClient.downloadData\(path, params\)



    if data:

        \# 创建datetime索引

        self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\], 

                                                                  unique=True\)                



        for d in data:

            bar = CtaBarData\(\)

            bar.vtSymbol = symbol

            bar.symbol = symbol

            try:

                bar.exchange = DATAYES\_TO\_VT\_EXCHANGE.get\(d.get\('exchangeCD', ''\), ''\)

                bar.open = d.get\('openPrice', 0\)

                bar.high = d.get\('highestPrice', 0\)

                bar.low = d.get\('lowestPrice', 0\)

                bar.close = d.get\('closePrice', 0\)

                bar.date = d.get\('tradeDate', ''\).replace\('-', ''\)

                bar.time = ''

                bar.datetime = datetime.strptime\(bar.date, '%Y%m%d'\)

                bar.volume = d.get\('turnoverVol', 0\)

                bar.openInterest = d.get\('openInt', 0\)

            except KeyError:

                print d



            flt = {'datetime': bar.datetime}

            self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)            



            print u'%s下载完成' %symbol

    else:

        print u'找不到合约%s' %symbol



\#----------------------------------------------------------------------

def downloadAllFuturesDailyBar\(self\):

    """下载所有期货的主力合约日行情"""

    start = time\(\)

    print u'开始下载所有期货的主力合约日行情'



    productSymbolSet = self.readFuturesProductSymbol\(\)



    print u'代码列表读取成功,产品代码:%s' %productSymbolSet



    \# 这里也测试了线程池,但可能由于下载函数中涉及较多的数据格

    \# 式转换,CPU开销较大,多线程效率并无显著改变。

    \#p = ThreadPool\(10\)

    \#p.map\(self.downloadFuturesDailyBar, productSymbolSet\)

    \#p.close\(\)

    \#p.join\(\)



    for productSymbol in productSymbolSet:

        self.downloadFuturesDailyBar\(productSymbol+'0000'\)



    print u'所有期货的主力合约日行情已经全部下载完成, 耗时%s秒' %\(time\(\)-start\)



\#----------------------------------------------------------------------

def downloadFuturesIntradayBar\(self, symbol\):

    """下载期货的日内分钟行情"""

    print u'开始下载%s日内分钟行情' %symbol



    \# 日内分钟行情只有具体合约

    path = 'api/market/getFutureBarRTIntraDay.json'



    params = {}

    params\['instrumentID'\] = symbol

    params\['unit'\] = 1



    data = self.datayesClient.downloadData\(path, params\)



    if data:

        today = datetime.now\(\).strftime\('%Y%m%d'\)



        \# 创建datetime索引

        self.dbClient\[MINUTE\_DB\_NAME\]\[symbol\].ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\], 

                                                                  unique=True\)                



        for d in data:

            bar = CtaBarData\(\)

            bar.vtSymbol = symbol

            bar.symbol = symbol

            try:

                bar.exchange = DATAYES\_TO\_VT\_EXCHANGE.get\(d.get\('exchangeCD', ''\), ''\)

                bar.open = d.get\('openPrice', 0\)

                bar.high = d.get\('highestPrice', 0\)

                bar.low = d.get\('lowestPrice', 0\)

                bar.close = d.get\('closePrice', 0\)

                bar.date = today

                bar.time = d.get\('barTime', ''\)

                bar.datetime = datetime.strptime\(bar.date + ' ' + bar.time, '%Y%m%d %H:%M'\)

                bar.volume = d.get\('totalVolume', 0\)

                bar.openInterest = 0

            except KeyError:

                print d



            flt = {'datetime': bar.datetime}

            self.dbClient\[MINUTE\_DB\_NAME\]\[symbol\].update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)            



        print u'%s下载完成' %symbol

    else:

        print u'找不到合约%s' %symbol   



\#----------------------------------------------------------------------

def downloadEquitySymbol\(self, tradeDate=''\):

    """下载所有股票的代码"""

    if not tradeDate:

        tradeDate = self.lastTradeDate\(\)



    self.dbClient\[SETTING\_DB\_NAME\]\['EquitySymbol'\].ensure\_index\(\[\('symbol', pymongo.ASCENDING\)\], 

                                                                   unique=True\)





    path = 'api/market/getMktEqud.json'



    params = {}

    params\['tradeDate'\] = tradeDate



    data = self.datayesClient.downloadData\(path, params\)



    if data:

        for d in data:

            symbolDict = {}

            symbolDict\['symbol'\] = d\['ticker'\]

            flt = {'symbol': d\['ticker'\]}



            self.dbClient\[SETTING\_DB\_NAME\]\['EquitySymbol'\].update\_one\(flt, {'$set':symbolDict}, 

                                                                       upsert=True\)

        print u'股票代码下载完成'

    else:

        print u'股票代码下载失败'



\#----------------------------------------------------------------------

def downloadEquityDailyBar\(self, symbol\):

    """

    下载股票的日行情,symbol是股票代码

    """

    print u'开始下载%s日行情' %symbol



    \# 查询数据库中已有数据的最后日期

    cl = self.dbClient\[DAILY\_DB\_NAME\]\[symbol\]

    cx = cl.find\(sort=\[\('datetime', pymongo.DESCENDING\)\]\)

    if cx.count\(\):

        last = cx\[0\]

    else:

        last = ''



    \# 开始下载数据

    path = 'api/market/getMktEqud.json'



    params = {}

    params\['ticker'\] = symbol

    if last:

        params\['beginDate'\] = last\['date'\]



    data = self.datayesClient.downloadData\(path, params\)



    if data:

        \# 创建datetime索引

        self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\], 

                                                            unique=True\)                



        for d in data:

            bar = CtaBarData\(\)

            bar.vtSymbol = symbol

            bar.symbol = symbol

            try:

                bar.exchange = DATAYES\_TO\_VT\_EXCHANGE.get\(d.get\('exchangeCD', ''\), ''\)

                bar.open = d.get\('openPrice', 0\)

                bar.high = d.get\('highestPrice', 0\)

                bar.low = d.get\('lowestPrice', 0\)

                bar.close = d.get\('closePrice', 0\)

                bar.date = d.get\('tradeDate', ''\).replace\('-', ''\)

                bar.time = ''

                bar.datetime = datetime.strptime\(bar.date, '%Y%m%d'\)

                bar.volume = d.get\('turnoverVol', 0\)

            except KeyError:

                print d



            flt = {'datetime': bar.datetime}

            self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)            



        print u'%s下载完成' %symbol

    else:

        print u'找不到合约%s' %symbol    

#----------------------------------------------------------------------

def downloadEquityDailyBarts(self, symbol):

    """

    下载股票的日行情,symbol是股票代码

    """

    print u'开始下载%s日行情' %symbol



    \# 查询数据库中已有数据的最后日期

    cl = self.dbClient\[DAILY\_DB\_NAME\]\[symbol\]

    cx = cl.find\(sort=\[\('datetime', pymongo.DESCENDING\)\]\)

    if cx.count\(\):

        last = cx\[0\]

    else:

        last = ''

    \# 开始下载数据

    import tushare as ts



    if last:

        start = last\['date'\]\[:4\]+'-'+last\['date'\]\[4:6\]+'-'+last\['date'\]\[6:\]



    data = ts.get\_k\_data\(symbol,start\)



    if not data.empty:

        \# 创建datetime索引

        self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\], 

                                                            unique=True\)                



        for index, d in data.iterrows\(\):

            bar = CtaBarData\(\)

            bar.vtSymbol = symbol

            bar.symbol = symbol

            try:

                bar.open = d.get\('open'\)

                bar.high = d.get\('high'\)

                bar.low = d.get\('low'\)

                bar.close = d.get\('close'\)

                bar.date = d.get\('date'\).replace\('-', ''\)

                bar.time = ''

                bar.datetime = datetime.strptime\(bar.date, '%Y%m%d'\)

                bar.volume = d.get\('volume'\)

            except KeyError:

                print d



            flt = {'datetime': bar.datetime}

            self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)            



        print u'%s下载完成' %symbol

    else:

        print u'找不到合约%s' %symbol

#----------------------------------------------------------------------

def loadMcCsv(fileName, dbName, symbol):

"""将Multicharts导出的csv格式的历史数据插入到Mongo数据库中"""

import csv



start = time\(\)

print u'开始读取CSV文件%s中的数据插入到%s的%s中' %\(fileName, dbName, symbol\)



\# 锁定集合,并创建索引

host, port, logging = loadMongoSetting\(\)



client = pymongo.MongoClient\(host, port\)    

collection = client\[dbName\]\[symbol\]

collection.ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\], unique=True\)   



\# 读取数据和插入到数据库

reader = csv.DictReader\(file\(fileName, 'r'\)\)

for d in reader:

    bar = CtaBarData\(\)

    bar.vtSymbol = symbol

    bar.symbol = symbol

    bar.open = float\(d\['Open'\]\)

    bar.high = float\(d\['High'\]\)

    bar.low = float\(d\['Low'\]\)

    bar.close = float\(d\['Close'\]\)

    bar.date = datetime.strptime\(d\['Date'\], '%Y/%m/%d'\).strftime\('%Y%m%d'\)

    bar.time = d\['Time'\]

    bar.datetime = datetime.strptime\(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S'\)

    bar.volume = d\['TotalVolume'\]



    flt = {'datetime': bar.datetime}

    collection.update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)  

    print bar.date, bar.time



print u'插入完毕,耗时:%s' % \(time\(\)-start\)

#----------------------------------------------------------------------

def loadTdxCsv(fileName, dbName, symbol):

"""将通达信导出的csv格式的历史分钟数据插入到Mongo数据库中"""

import csv



start = time\(\)

print u'开始读取CSV文件%s中的数据插入到%s的%s中' %\(fileName, dbName, symbol\)



\# 锁定集合,并创建索引

host, port, logging = loadMongoSetting\(\)



client = pymongo.MongoClient\(host, port\)    

collection = client\[dbName\]\[symbol\]

collection.ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\], unique=True\)   



\# 读取数据和插入到数据库

reader = csv.reader\(file\(fileName, 'r'\)\)

for d in reader:

    bar = CtaBarData\(\)

    bar.vtSymbol = symbol

    bar.symbol = symbol

    bar.open = float\(d\[2\]\)

    bar.high = float\(d\[3\]\)

    bar.low = float\(d\[4\]\)

    bar.close = float\(d\[5\]\)

    bar.date = datetime.strptime\(d\[0\], '%Y/%m/%d'\).strftime\('%Y%m%d'\)

    bar.time = d\[1\]\[:2\]+':'+d\[1\]\[2:4\]+':00'

    bar.datetime = datetime.strptime\(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S'\)

    bar.volume = d\[6\]

    bar.openInterest = d\[7\]



    flt = {'datetime': bar.datetime}

    collection.update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)  

    print bar.date, bar.time



print u'插入完毕,耗时:%s' % \(time\(\)-start\)

if __name__ == '__main__':

\#\# 简单的测试脚本可以写在这里

\#from time import sleep

\#e = HistoryDataEngine\(\)

\#sleep\(1\)

\#e.downloadEquityDailyBar\('000001'\)

\#e.downloadEquityDailyBarts\('000001'\)



\# 这里将项目中包含的股指日内分钟线csv导入MongoDB,作者电脑耗时大约3分钟

loadMcCsv\('IF0000\_1min.csv', MINUTE\_DB\_NAME, 'IF0000'\)

\#导入通达信历史分钟数据

\#loadTdxCsv\('CL8.csv', MINUTE\_DB\_NAME, 'c0000'\)

results matching ""

    No results matching ""