# encoding: UTF-8
"""
本模块中主要包含:
从通联数据下载历史行情的引擎
用来把MultiCharts导出的历史数据载入到MongoDB中用的函数
增加从通达信导出的历史数据载入到MongoDB中的函数
"""
from datetime import datetime, timedelta
import pymongo
from time import time
from multiprocessing.pool import ThreadPool
from ctaBase import *
from vtConstant import *
from vtFunction import loadMongoSetting
from datayesClient import DatayesClient
# 以下为vn.trader和通联数据规定的交易所代码映射
VT_TO_DATAYES_EXCHANGE = {}
VT_TO_DATAYES_EXCHANGE[EXCHANGE_CFFEX] = 'CCFX' # 中金所
VT_TO_DATAYES_EXCHANGE[EXCHANGE_SHFE] = 'XSGE' # 上期所
VT_TO_DATAYES_EXCHANGE[EXCHANGE_CZCE] = 'XZCE' # 郑商所
VT_TO_DATAYES_EXCHANGE[EXCHANGE_DCE] = 'XDCE' # 大商所
DATAYES_TO_VT_EXCHANGE = {v:k for k,v in VT_TO_DATAYES_EXCHANGE.items()}
########################################################################
class HistoryDataEngine(object):
"""CTA模块用的历史数据引擎"""
\#----------------------------------------------------------------------
def \_\_init\_\_\(self\):
"""Constructor"""
host, port, logging = loadMongoSetting\(\)
self.dbClient = pymongo.MongoClient\(host, port\)
self.datayesClient = DatayesClient\(\)
\#----------------------------------------------------------------------
def lastTradeDate\(self\):
"""获取最近交易日(只考虑工作日,无法检查国内假期)"""
today = datetime.now\(\)
oneday = timedelta\(1\)
if today.weekday\(\) == 5:
today = today - oneday
elif today.weekday\(\) == 6:
today = today - oneday\*2
return today.strftime\('%Y%m%d'\)
\#----------------------------------------------------------------------
def readFuturesProductSymbol\(self\):
"""查询所有期货产品代码"""
cx = self.dbClient\[SETTING\_DB\_NAME\]\['FuturesSymbol'\].find\(\)
return set\(\[d\['productSymbol'\] for d in cx\]\) \# 这里返回的是集合(因为会重复)
\#----------------------------------------------------------------------
def readFuturesSymbol\(self\):
"""查询所有期货合约代码"""
cx = self.dbClient\[SETTING\_DB\_NAME\]\['FuturesSymbol'\].find\(\)
return \[d\['symbol'\] for d in cx\] \# 这里返回的是列表
\#----------------------------------------------------------------------
def downloadFuturesSymbol\(self, tradeDate=''\):
"""下载所有期货的代码"""
if not tradeDate:
tradeDate = self.lastTradeDate\(\)
self.dbClient\[SETTING\_DB\_NAME\]\['FuturesSymbol'\].ensure\_index\(\[\('symbol', pymongo.ASCENDING\)\],
unique=True\)
path = 'api/market/getMktMFutd.json'
params = {}
params\['tradeDate'\] = tradeDate
data = self.datayesClient.downloadData\(path, params\)
if data:
for d in data:
symbolDict = {}
symbolDict\['symbol'\] = d\['ticker'\]
symbolDict\['productSymbol'\] = d\['contractObject'\]
flt = {'symbol': d\['ticker'\]}
self.dbClient\[SETTING\_DB\_NAME\]\['FuturesSymbol'\].update\_one\(flt, {'$set':symbolDict},
upsert=True\)
print u'期货合约代码下载完成'
else:
print u'期货合约代码下载失败'
\#----------------------------------------------------------------------
def downloadFuturesDailyBar\(self, symbol\):
"""
下载期货合约的日行情,symbol是合约代码,
若最后四位为0000(如IF0000),代表下载连续合约。
"""
print u'开始下载%s日行情' %symbol
\# 查询数据库中已有数据的最后日期
cl = self.dbClient\[DAILY\_DB\_NAME\]\[symbol\]
cx = cl.find\(sort=\[\('datetime', pymongo.DESCENDING\)\]\)
if cx.count\(\):
last = cx\[0\]
else:
last = ''
\# 主力合约
if '0000' in symbol:
path = 'api/market/getMktMFutd.json'
params = {}
params\['contractObject'\] = symbol.replace\('0000', ''\)
params\['mainCon'\] = 1
if last:
params\['startDate'\] = last\['date'\]
\# 交易合约
else:
path = 'api/market/getMktFutd.json'
params = {}
params\['ticker'\] = symbol
if last:
params\['startDate'\] = last\['date'\]
\# 开始下载数据
data = self.datayesClient.downloadData\(path, params\)
if data:
\# 创建datetime索引
self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\],
unique=True\)
for d in data:
bar = CtaBarData\(\)
bar.vtSymbol = symbol
bar.symbol = symbol
try:
bar.exchange = DATAYES\_TO\_VT\_EXCHANGE.get\(d.get\('exchangeCD', ''\), ''\)
bar.open = d.get\('openPrice', 0\)
bar.high = d.get\('highestPrice', 0\)
bar.low = d.get\('lowestPrice', 0\)
bar.close = d.get\('closePrice', 0\)
bar.date = d.get\('tradeDate', ''\).replace\('-', ''\)
bar.time = ''
bar.datetime = datetime.strptime\(bar.date, '%Y%m%d'\)
bar.volume = d.get\('turnoverVol', 0\)
bar.openInterest = d.get\('openInt', 0\)
except KeyError:
print d
flt = {'datetime': bar.datetime}
self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)
print u'%s下载完成' %symbol
else:
print u'找不到合约%s' %symbol
\#----------------------------------------------------------------------
def downloadAllFuturesDailyBar\(self\):
"""下载所有期货的主力合约日行情"""
start = time\(\)
print u'开始下载所有期货的主力合约日行情'
productSymbolSet = self.readFuturesProductSymbol\(\)
print u'代码列表读取成功,产品代码:%s' %productSymbolSet
\# 这里也测试了线程池,但可能由于下载函数中涉及较多的数据格
\# 式转换,CPU开销较大,多线程效率并无显著改变。
\#p = ThreadPool\(10\)
\#p.map\(self.downloadFuturesDailyBar, productSymbolSet\)
\#p.close\(\)
\#p.join\(\)
for productSymbol in productSymbolSet:
self.downloadFuturesDailyBar\(productSymbol+'0000'\)
print u'所有期货的主力合约日行情已经全部下载完成, 耗时%s秒' %\(time\(\)-start\)
\#----------------------------------------------------------------------
def downloadFuturesIntradayBar\(self, symbol\):
"""下载期货的日内分钟行情"""
print u'开始下载%s日内分钟行情' %symbol
\# 日内分钟行情只有具体合约
path = 'api/market/getFutureBarRTIntraDay.json'
params = {}
params\['instrumentID'\] = symbol
params\['unit'\] = 1
data = self.datayesClient.downloadData\(path, params\)
if data:
today = datetime.now\(\).strftime\('%Y%m%d'\)
\# 创建datetime索引
self.dbClient\[MINUTE\_DB\_NAME\]\[symbol\].ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\],
unique=True\)
for d in data:
bar = CtaBarData\(\)
bar.vtSymbol = symbol
bar.symbol = symbol
try:
bar.exchange = DATAYES\_TO\_VT\_EXCHANGE.get\(d.get\('exchangeCD', ''\), ''\)
bar.open = d.get\('openPrice', 0\)
bar.high = d.get\('highestPrice', 0\)
bar.low = d.get\('lowestPrice', 0\)
bar.close = d.get\('closePrice', 0\)
bar.date = today
bar.time = d.get\('barTime', ''\)
bar.datetime = datetime.strptime\(bar.date + ' ' + bar.time, '%Y%m%d %H:%M'\)
bar.volume = d.get\('totalVolume', 0\)
bar.openInterest = 0
except KeyError:
print d
flt = {'datetime': bar.datetime}
self.dbClient\[MINUTE\_DB\_NAME\]\[symbol\].update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)
print u'%s下载完成' %symbol
else:
print u'找不到合约%s' %symbol
\#----------------------------------------------------------------------
def downloadEquitySymbol\(self, tradeDate=''\):
"""下载所有股票的代码"""
if not tradeDate:
tradeDate = self.lastTradeDate\(\)
self.dbClient\[SETTING\_DB\_NAME\]\['EquitySymbol'\].ensure\_index\(\[\('symbol', pymongo.ASCENDING\)\],
unique=True\)
path = 'api/market/getMktEqud.json'
params = {}
params\['tradeDate'\] = tradeDate
data = self.datayesClient.downloadData\(path, params\)
if data:
for d in data:
symbolDict = {}
symbolDict\['symbol'\] = d\['ticker'\]
flt = {'symbol': d\['ticker'\]}
self.dbClient\[SETTING\_DB\_NAME\]\['EquitySymbol'\].update\_one\(flt, {'$set':symbolDict},
upsert=True\)
print u'股票代码下载完成'
else:
print u'股票代码下载失败'
\#----------------------------------------------------------------------
def downloadEquityDailyBar\(self, symbol\):
"""
下载股票的日行情,symbol是股票代码
"""
print u'开始下载%s日行情' %symbol
\# 查询数据库中已有数据的最后日期
cl = self.dbClient\[DAILY\_DB\_NAME\]\[symbol\]
cx = cl.find\(sort=\[\('datetime', pymongo.DESCENDING\)\]\)
if cx.count\(\):
last = cx\[0\]
else:
last = ''
\# 开始下载数据
path = 'api/market/getMktEqud.json'
params = {}
params\['ticker'\] = symbol
if last:
params\['beginDate'\] = last\['date'\]
data = self.datayesClient.downloadData\(path, params\)
if data:
\# 创建datetime索引
self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\],
unique=True\)
for d in data:
bar = CtaBarData\(\)
bar.vtSymbol = symbol
bar.symbol = symbol
try:
bar.exchange = DATAYES\_TO\_VT\_EXCHANGE.get\(d.get\('exchangeCD', ''\), ''\)
bar.open = d.get\('openPrice', 0\)
bar.high = d.get\('highestPrice', 0\)
bar.low = d.get\('lowestPrice', 0\)
bar.close = d.get\('closePrice', 0\)
bar.date = d.get\('tradeDate', ''\).replace\('-', ''\)
bar.time = ''
bar.datetime = datetime.strptime\(bar.date, '%Y%m%d'\)
bar.volume = d.get\('turnoverVol', 0\)
except KeyError:
print d
flt = {'datetime': bar.datetime}
self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)
print u'%s下载完成' %symbol
else:
print u'找不到合约%s' %symbol
#----------------------------------------------------------------------
def downloadEquityDailyBarts(self, symbol):
"""
下载股票的日行情,symbol是股票代码
"""
print u'开始下载%s日行情' %symbol
\# 查询数据库中已有数据的最后日期
cl = self.dbClient\[DAILY\_DB\_NAME\]\[symbol\]
cx = cl.find\(sort=\[\('datetime', pymongo.DESCENDING\)\]\)
if cx.count\(\):
last = cx\[0\]
else:
last = ''
\# 开始下载数据
import tushare as ts
if last:
start = last\['date'\]\[:4\]+'-'+last\['date'\]\[4:6\]+'-'+last\['date'\]\[6:\]
data = ts.get\_k\_data\(symbol,start\)
if not data.empty:
\# 创建datetime索引
self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\],
unique=True\)
for index, d in data.iterrows\(\):
bar = CtaBarData\(\)
bar.vtSymbol = symbol
bar.symbol = symbol
try:
bar.open = d.get\('open'\)
bar.high = d.get\('high'\)
bar.low = d.get\('low'\)
bar.close = d.get\('close'\)
bar.date = d.get\('date'\).replace\('-', ''\)
bar.time = ''
bar.datetime = datetime.strptime\(bar.date, '%Y%m%d'\)
bar.volume = d.get\('volume'\)
except KeyError:
print d
flt = {'datetime': bar.datetime}
self.dbClient\[DAILY\_DB\_NAME\]\[symbol\].update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)
print u'%s下载完成' %symbol
else:
print u'找不到合约%s' %symbol
#----------------------------------------------------------------------
def loadMcCsv(fileName, dbName, symbol):
"""将Multicharts导出的csv格式的历史数据插入到Mongo数据库中"""
import csv
start = time\(\)
print u'开始读取CSV文件%s中的数据插入到%s的%s中' %\(fileName, dbName, symbol\)
\# 锁定集合,并创建索引
host, port, logging = loadMongoSetting\(\)
client = pymongo.MongoClient\(host, port\)
collection = client\[dbName\]\[symbol\]
collection.ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\], unique=True\)
\# 读取数据和插入到数据库
reader = csv.DictReader\(file\(fileName, 'r'\)\)
for d in reader:
bar = CtaBarData\(\)
bar.vtSymbol = symbol
bar.symbol = symbol
bar.open = float\(d\['Open'\]\)
bar.high = float\(d\['High'\]\)
bar.low = float\(d\['Low'\]\)
bar.close = float\(d\['Close'\]\)
bar.date = datetime.strptime\(d\['Date'\], '%Y/%m/%d'\).strftime\('%Y%m%d'\)
bar.time = d\['Time'\]
bar.datetime = datetime.strptime\(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S'\)
bar.volume = d\['TotalVolume'\]
flt = {'datetime': bar.datetime}
collection.update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)
print bar.date, bar.time
print u'插入完毕,耗时:%s' % \(time\(\)-start\)
#----------------------------------------------------------------------
def loadTdxCsv(fileName, dbName, symbol):
"""将通达信导出的csv格式的历史分钟数据插入到Mongo数据库中"""
import csv
start = time\(\)
print u'开始读取CSV文件%s中的数据插入到%s的%s中' %\(fileName, dbName, symbol\)
\# 锁定集合,并创建索引
host, port, logging = loadMongoSetting\(\)
client = pymongo.MongoClient\(host, port\)
collection = client\[dbName\]\[symbol\]
collection.ensure\_index\(\[\('datetime', pymongo.ASCENDING\)\], unique=True\)
\# 读取数据和插入到数据库
reader = csv.reader\(file\(fileName, 'r'\)\)
for d in reader:
bar = CtaBarData\(\)
bar.vtSymbol = symbol
bar.symbol = symbol
bar.open = float\(d\[2\]\)
bar.high = float\(d\[3\]\)
bar.low = float\(d\[4\]\)
bar.close = float\(d\[5\]\)
bar.date = datetime.strptime\(d\[0\], '%Y/%m/%d'\).strftime\('%Y%m%d'\)
bar.time = d\[1\]\[:2\]+':'+d\[1\]\[2:4\]+':00'
bar.datetime = datetime.strptime\(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S'\)
bar.volume = d\[6\]
bar.openInterest = d\[7\]
flt = {'datetime': bar.datetime}
collection.update\_one\(flt, {'$set':bar.\_\_dict\_\_}, upsert=True\)
print bar.date, bar.time
print u'插入完毕,耗时:%s' % \(time\(\)-start\)
if __name__ == '__main__':
\#\# 简单的测试脚本可以写在这里
\#from time import sleep
\#e = HistoryDataEngine\(\)
\#sleep\(1\)
\#e.downloadEquityDailyBar\('000001'\)
\#e.downloadEquityDailyBarts\('000001'\)
\# 这里将项目中包含的股指日内分钟线csv导入MongoDB,作者电脑耗时大约3分钟
loadMcCsv\('IF0000\_1min.csv', MINUTE\_DB\_NAME, 'IF0000'\)
\#导入通达信历史分钟数据
\#loadTdxCsv\('CL8.csv', MINUTE\_DB\_NAME, 'c0000'\)