第3章


vn.py基础


  
  本章将对vn.py基础进行介绍,首先从vn.py整体架构入手,再分别介绍vn.py的交易链路中的部分,最后结合VeighNa Trader的界面对该软件的各部分功能进行说明。通过本章的学习,读者将会对vn.py的整个数据链路有一定的了解,对于量化交易软件的构成部分有深刻的理解。
3.1?vn.py的整体架构
  vn.py的架构主要分为三层,首先是底层接口,其负责与各交易所通信并提供各品种交易的接口;在底层接口之上,vn.py提供了一套事件或时间驱动的中层引擎,其用于完成事件的分发、订单的路由与数据的分发等;最上层的接口用于实现不同的应用,例如CTA策略模块、GUI界面、行情记录等,接下来将分别介绍这三层架构的内容。
   3.1.1?底层接口
  vn.py的底层接口主要提供行情与交易的API,将行情数据发送到中层的引擎并接收来自引擎的订单数据并将其发送。对于目前vn.py 3.x版本而言,其对不同厂商的接口进行了分离,表3-1列出了部分vn.py 3.x版本支持的部分交易接口与品种。
表3-1?vn.py 3.x版本支持的部分交易接口与品种
接??口
品??种
接??口
品??种
CTP
国内期货、期权
中泰XTP
A股、ETF期权
CTP Mini
国内期货、期权
华鑫奇点
A股、ETF
CTP证券
ETF期权
国泰君安
A股、两融
飞马
国内期货
东证OST
A股
恒生UFT
国内期货、ETF期权
东方财富EMT
A股
易盛
国内期货、黄金TD
飞鼠
黄金TD、国内期货
顶点飞创
ETF期权
金仕达黄金
黄金TD
顶点HTS
ETF期权
融航
期货资管
                                                                                    续表    
接??口
品??种
接??口
品??种
杰宜斯
期货资管
恒生云UF
A股仿真
中汇亿达
银行间市场
TTS
国内期货仿真
掘金
A股仿真
火象
国内期货仿真
     
  从表3-1不难看出,vn.py支持许多交易接口,当用户需要交易特定的品种时,可以自由选择对应的接口完成交易等功能。得益于好的抽象,用户在切换交易接口时是无感的,因为vn.py已经对不同的交易接口进行了统一的封装。
   3.1.2?中层引擎
  vn.py文件中的中层引擎包括事件引擎、订单路由及数据引擎等,其中事件引擎负责将底层接口的行情、交易执行情况等消息推送到订阅了该信息的上层应用,而订单路由则负责将上层应用的下单请求推送到底层接口的路由;数据引擎则负责处理上层应用对数据库数据的读取与处理。
  中层引擎将程序中的各组件(不同的底层接口、数据库接口等)处理为统一的数据格式,以便上层应用来进行调用。
  1. 主引擎MainEngine
  主引擎MainEngine中包含交易中所需要的一系列方法,方法及其用途如表3-2所示。
表3-2?MainEngine提供的方法及其用途
方??法
用??途
add_engine
添加引擎(如日志引擎、订单管理引擎、邮件引擎等)
add_gateway
添加底层交易接口(如CTP接口、UFT接口等)
add_app
添加上层应用(如实盘交易应用、回测应用等)
init_engines
初始化所有已添加的引擎(默认包含日志引擎、订单管理引擎、邮件引擎)
write_log
写日志,向日志引擎的队列中放入日志消息
get_gateway
根据名称获取底层交易接口实例
get_engine
根据名称获取引擎实例
get_default_setting
根据接口名称获取底层接口的默认配置信息
get_all_gateway_names
获取所有底层接口的名称
get_all_apps
获取所有应用的实例
get_all_exchanges
获取所有交易所
connect
根据名称完成底层接口的连接
subscribe
根据名称订阅品种行情信息
send_order
根据名称使用底层接口发送订单
cancel_order
根据名称使用底层接口取消订单
                                                                                    续表     
方??法
用??途
send_quote
根据名称使用底层接口发送双边报价请求
cancel_quote
根据名称使用底层接口取消双边报价请求
query_history
根据名称使用底层接口查询历史K线信息
close
停止各引擎和各底层接口
     
  从表3-2中不难看出,MainEngine是一个聚合了各底层接口(gateway)和上层应用(app)的类,起到了承上启下的作用,并且由于其支持添加多个底层接口与上层应用,MainEngine支持的交互过程变得十分灵活,读者可以很方便地在MainEngine中定制自己的应用与接口的交互逻辑。
  2. 事件引擎EventEngine
  事件引擎EventEngine负责根据事件的类型将其分发到对应的处理函数(handler)上处理,目前EventEngine支持的事件类型如表3-3所示。
表3-3?EventEngine支持的事件类型
类??型
说??明
EVENT_TIMER
定时任务
EVENT_TICK
收到tick数据的事件
EVENT_TRADE
收到成交回报的事件
EVENT_ORDER
收到下单回报的事件
EVENT_POSITION
收到持仓更改消息的事件
EVENT_QUOTE
收到双边报价回报的事件
EVENT_CONTRACT
收到合约信息查询回报的事件
EVENT_LOG
收到写日志请求的事件
     
  在订单管理引擎OmsEngine中(主引擎MainEngine默认注册的引擎之一)完成了大部分不同类型事件与其处理函数的绑定,代码如下:
     
     //ch3/trader_engine.py
     def register_event(self) -> None:
         """"""
         self.event_engine.register(EVENT_TICK, self.process_tick_event)
         self.event_engine.register(EVENT_ORDER, self.process_order_event)
         self.event_engine.register(EVENT_TRADE, self.process_trade_event)
         self.event_engine.register(EVENT_POSITION, self.process_position_event)
         self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)
         self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
         self.event_engine.register(EVENT_QUOTE, self.process_quote_event)
     
  当上层应用或底层接口产生了表3-3中的类型事件时,通过事件引擎EventEngine的            put方法将待处理事件Event放入EventEngine的队列_queue中,与此同时,EventEngine中有一个启动的单独线程不断地从队列_queue中获取待处理事件并传给handler进行处理,代码如下:
     
     //ch3/event_engine.py
     …
     #将待处理事件放入队列
     def put(self, event: Event) -> None:
         """
         Put an event object into event queue.
         """
         self._queue.put(event)
     …
     …
     #不断地从队列中获取事件
     def _run(self) -> None:
         """
         Get event from queue and then process it.
         """
         while self._active:
             try:
                 event: Event = self._queue.get(block=True, timeout=1)
                 self._process(event)
             except Empty:
                 pass
     …
     #将不同类型的事件分发给不同的handler
     def _process(self, event: Event) -> None:
         """
         First distribute event to those handlers registered listening
         to this type.
     
         Then distribute event to those general handlers which listens
         to all types.
         """
         if event.type in self._handlers:
             [handler(event) for handler in self._handlers[event.type]]
     
         if self._general_handlers:
             [handler(event) for handler in self._general_handlers]
     …
     
  3. 数据引擎BaseDatabase
  vn.py支持很多数据库作为后端,其默认使用SQLite,除此之外目前其还支持MySQL、PostgreSQL、DolphinDB、Arctic、TDengine、TimescaleDB、MongoDB、InfluxDB和LevelDB。由于vn.py目前以分模块设计,所以具体数据库对应的模块都位于具体的名为vnpy_[数据库名]的包中(如vnpy_mysql)。
  本节以数据引擎的基类BaseDatabase为例讲解其包含的方法,不涉及具体某个数据库的执行逻辑。表3-4列出了BaseDatabase中包含的方法及其说明。
表3-4  BaseDatabase包含的方法及说明
方??法
说??明
save_bar_data
保存K线数据
save_tick_data
保存tick数据
load_bar_data
读取某品种特定时间与周期内的K线数据
load_tick_data
读取某品种特定时间内的tick数据
delete_bar_data
删除某品种特定周期的K线数据
delete_tick_data
删除某品种的tick数据
get_bar_overview
获取所有品种的K线统计数据
get_tick_overview
获取所有品种的tick统计数据
     
  从表3-4可以看出,数据引擎主要负责对K线与tick数据进行增、删、查、改操作。
   3.1.3?上层应用
  vn.py文件中的上层应用仅与中层引擎进行交互,通过引擎调用底层接口的具体方法完成交易链路与回报链路的通信。vn.py文件中的上层应用众多,本节不举例进行讲解,本章的3.4节~3.14节将对不同的上层应用进行更为详细的讲解。
3.2?vn.py文件中的交易接口
  如3.1.1节中所述,vn.py支持众多交易接口,本节以CTP和UFT接口为例进行讲解。
   3.2.1?CTP接口
  综合交易平台(ComprehensiveTransactionPlatform,CTP)是上海期货信息技术有限公司开发的一套柜面系统。交易者通过CTP可以将报文发送给各个交易所,在vnpy_ctp模块中,可以看到其包含一个api文件夹和一个gateway文件夹,其中api文件夹中包含CTP系统的一些库文件,其用C++编写或以dll或lib文件出现。
  以api文件夹下的include/ctp文件夹中的文件为例,其中的C++头文件ThostFtdcMdApi.h包含获取与行情相关的指令,C++头文件ThostFtdcTraderApi.h包含与交易相关的指令,C++头文件ThostFtdcUserApiDataType.h包含所有用到的数据类型,C++头文件ThostFtdcUserApiStruct.h包含了所有用到的数据结构。
  在api/libs下包含两个lib文件thostmduserapi_se.lib和thosttraderapi_se.lib,其分别是行情部分和交易部分的静态链接库,由于lib文件无法直接打开,因此用户在进行CTP编程时只需关注C++头文件中定义的方法。
  在gateway文件夹下有一个名为ctp_gateway.py的文件,这个文件对CTP的C++ API进行了Python层面的接口封装,通过pybind11使Python与C++之间的数据对象进行互相转换。在此不过多分析C++层面的编程细节,主要分析ctp_gateway.py文件中的Python??API封装。
  下面选取了部分ctp_gateway.py文件中的代码:
     
     //ch3/ctp_gateway.py
     #委托状态映射
     STATUS_CTP2VT: Dict[str, Status] = {
         THOST_FTDC_OST_NoTradeQueueing: Status.NOTTRADED,
         THOST_FTDC_OST_PartTradedQueueing: Status.PARTTRADED,
         THOST_FTDC_OST_AllTraded: Status.ALLTRADED,
         THOST_FTDC_OST_Canceled: Status.CANCELLED,
         THOST_FTDC_OST_Unknown: Status.SUBMITTING
     }
     
     #多空方向映射
     DIRECTION_VT2CTP: Dict[Direction, str] = {
         Direction.LONG: THOST_FTDC_D_Buy,
         Direction.SHORT: THOST_FTDC_D_Sell
     }
     DIRECTION_CTP2VT: Dict[str, Direction] = {v: k for k, v in DIRECTION_VT2CTP.items()}
     DIRECTION_CTP2VT[THOST_FTDC_PD_Long] = Direction.LONG
     DIRECTION_CTP2VT[THOST_FTDC_PD_Short] = Direction.SHORT
     
     #委托类型映射
     ORDERTYPE_VT2CTP: Dict[OrderType, tuple] = {
         OrderType.LIMIT: (THOST_FTDC_OPT_LimitPrice, THOST_FTDC_TC_GFD, THOST_
FTDC_VC_AV),
         OrderType.MARKET: (THOST_FTDC_OPT_AnyPrice, THOST_FTDC_TC_GFD, THOST_
FTDC_VC_AV),
         OrderType.FAK: (THOST_FTDC_OPT_LimitPrice, THOST_FTDC_TC_IOC, THOST_
FTDC_VC_AV),
         OrderType.FOK: (THOST_FTDC_OPT_LimitPrice, THOST_FTDC_TC_IOC, THOST_
FTDC_VC_CV),
     }
     ORDERTYPE_CTP2VT: Dict[Tuple, OrderType] = {v: k for k, v in ORDERTYPE_
VT2CTP.items()}
     
     #开平方向映射
     OFFSET_VT2CTP: Dict[Offset, str] = {
         Offset.OPEN: THOST_FTDC_OF_Open,
         Offset.CLOSE: THOST_FTDC_OFEN_Close,
         Offset.CLOSETODAY: THOST_FTDC_OFEN_CloseToday,
         Offset.CLOSEYESTERDAY: THOST_FTDC_OFEN_CloseYesterday,
     }
     OFFSET_CTP2VT: Dict[str, Offset] = {v: k for k, v in OFFSET_VT2CTP.items()}
     
     #交易所映射
     EXCHANGE_CTP2VT: Dict[str, Exchange] = {
         "CFFEX": Exchange.CFFEX,
         "SHFE": Exchange.SHFE,
         "CZCE": Exchange.CZCE,
         "DCE": Exchange.DCE,
         "INE": Exchange.INE,
         "GFEX": Exchange.GFEX
     }
     
     #产品类型映射
     PRODUCT_CTP2VT: Dict[str, Product] = {
         THOST_FTDC_PC_Futures: Product.FUTURES,
         THOST_FTDC_PC_Options: Product.OPTION,
         THOST_FTDC_PC_SpotOption: Product.OPTION,
         THOST_FTDC_PC_Combination: Product.SPREAD
     }
     
     #期权类型映射
     OPTIONTYPE_CTP2VT: Dict[str, OptionType] = {
         THOST_FTDC_CP_CallOptions: OptionType.CALL,
         THOST_FTDC_CP_PutOptions: OptionType.PUT
     }
     
  以上代码主要定义了CTP接口使用的数据字典到vn.py中使用的数据字典之间的映射,包含下单、行情、产品类型等的映射。经过vn.py的映射,不同接口的行情或从vn.py下单到不同接口的数据类型都会被转换为vn.py内部使用的统一格式。
  在ctp_gateway.py文件中包含了一个核心类CtpGateway,其封装了CTP接口与vn.py之间交互的各种方法,代码如下:
     
     //ch3/ctp_gateway.py
     class CtpGateway(BaseGateway):
         """
         VeighNa用于对接期货CTP柜台的交易接口
         """
     
         default_name: str = "CTP"
     
         default_setting: Dict[str, str] = {
             "用户名": "",
             "密码": "",
             "经纪商代码": "",
             "交易服务器": "",
             "行情服务器": "",
             "产品名称": "",
             "授权编码": ""
         }
     
         exchanges: List[str] = list(EXCHANGE_CTP2VT.values())
     
         def __init__(self, event_engine: EventEngine, gateway_name: str) -> None:
             """构造函数"""
             super().__init__(event_engine, gateway_name)
     
             self.td_api: "CtpTdApi" = CtpTdApi(self)
             self.md_api: "CtpMdApi" = CtpMdApi(self)
     
         def connect(self, setting: dict) -> None:
             """连接交易接口"""
             userid: str = setting["用户名"]
             password: str = setting["密码"]
             brokerid: str = setting["经纪商代码"]
             td_address: str = setting["交易服务器"]
             md_address: str = setting["行情服务器"]
             appid: str = setting["产品名称"]
             auth_code: str = setting["授权编码"]
     
             if (
                 (not td_address.startswith("tcp://"))
                 and (not td_address.startswith("ssl://"))
                 and (not td_address.startswith("socks"))
             ):
                 td_address = "tcp://" + td_address
     
             if (
                 (not md_address.startswith("tcp://"))
                 and (not md_address.startswith("ssl://"))
                 and (not md_address.startswith("socks"))
             ):
                 md_address = "tcp://" + md_address
     
             self.td_api.connect(td_address, userid, password, brokerid, auth_
code, appid)
             self.md_api.connect(md_address, userid, password, brokerid)
     
             self.init_query()
     
         def subscribe(self, req: SubscribeRequest) -> None:
             """订阅行情"""
             self.md_api.subscribe(req)
     
         def send_order(self, req: OrderRequest) -> str:
             """委托下单"""
             return self.td_api.send_order(req)
     
         def cancel_order(self, req: CancelRequest) -> None:
             """委托撤单"""
             self.td_api.cancel_order(req)
     
         def query_account(self) -> None:
             """查询资金"""
             self.td_api.query_account()
     
         def query_position(self) -> None:
             """查询持仓"""
             self.td_api.query_position()
     
         def close(self) -> None:
             """关闭接口"""
             self.td_api.close()
             self.md_api.close()
     
         def write_error(self, msg: str, error: dict) -> None:
             """输出错误信息日志"""
             error_id: int = error["ErrorID"]
             error_msg: str = error["ErrorMsg"]
             msg: str = f"{msg},代码:{error_id},信息:{error_msg}"
             self.write_log(msg)
     
         def process_timer_event(self, event) -> None:
             """定时事件处理"""
             self.count += 1
             if self.count < 2:
                 return
             self.count = 0
     
             func = self.query_functions.pop(0)
             func()
             self.query_functions.append(func)
     
             self.md_api.update_date()
     
         def init_query(self) -> None:
             """初始化查询任务"""
             self.count: int = 0
             self.query_functions: list = [self.query_account, self.query_position]
             self.event_engine.register(EVENT_TIMER, self.process_timer_event)
     
  代码中包含连接CTP柜台的配置,包括行情、交易服务器地址(以TCP进行连接)、登录账号和密码等信息。不同的行为,例如订阅行情(subscribe)、委托下单(send_order)须调用行情API或交易API的具体接口完成。
  读者容易在ctp_gateway.py文件中看到CtpMdApi和CtpTdApi类,其分别对应着CTP接口的行情和交易类,以CtpMdApi为例,代码如下:
     
     //ch3/ctp_gateway.py
     class CtpMdApi(MdApi):
         """"""
     
         def __init__(self, gateway: CtpGateway) -> None:
             """构造函数"""
             super().__init__()
     
             self.gateway: CtpGateway = gateway
             self.gateway_name: str = gateway.gateway_name
     
             self.reqid: int = 0
     
             self.connect_status: bool = False
             self.login_status: bool = False
             self.subscribed: set = set()
     
             self.userid: str = ""
             self.password: str = ""
             self.brokerid: str = ""
     
             self.current_date: str = datetime.now().strftime("%Y%m%d")
     
         def onFrontConnected(self) -> None:
             """服务器连接成功回报"""
             self.gateway.write_log("行情服务器连接成功")
             self.login()
     
         def onFrontDisconnected(self, reason: int) -> None:
             """服务器连接断开回报"""
             self.login_status = False
             self.gateway.write_log(f"行情服务器连接断开,原因{reason}")
     
         def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool) -> None:
             """用户登录请求回报"""
             if not error["ErrorID"]:
                 self.login_status = True
                 self.gateway.write_log("行情服务器登录成功")
     
                 for symbol in self.subscribed:
                     self.subscribeMarketData(symbol)
             else:
                 self.gateway.write_error("行情服务器登录失败", error)
     
         def onRspError(self, error: dict, reqid: int, last: bool) -> None:
             """请求报错回报"""
             self.gateway.write_error("行情接口报错", error)
     
         def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool) -> None:
             """订阅行情回报"""
             if not error or not error["ErrorID"]:
                 return
     
             self.gateway.write_error("行情订阅失败", error)
     
         def onRtnDepthMarketData(self, data: dict) -> None:
             """行情数据推送"""
             #过滤没有时间戳的异常行情数据
             if not data["UpdateTime"]:
                 return
     
             #过滤还没有收到合约数据前的行情推送
             symbol: str = data["InstrumentID"]
             contract: ContractData = symbol_contract_map.get(symbol, None)
             if not contract:
                 return
     
             #对大商所的交易日字段取本地日期
             if not data["ActionDay"] or contract.exchange == Exchange.DCE:
                 date_str: str = self.current_date
             else:
                 date_str: str = data["ActionDay"]
     
             timestamp: str = f"{date_str} {data['UpdateTime']}.{int(data
['UpdateMillisec']/100)}"
             dt: datetime = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
             dt: datetime = dt.replace(tzinfo=CHINA_TZ)
     
             tick: TickData = TickData(
                 symbol=symbol,
                 exchange=contract.exchange,
                 datetime=dt,
                 name=contract.name,
                 volume=data["Volume"],
                 turnover=data["Turnover"],
                 open_interest=data["OpenInterest"],
                 last_price=adjust_price(data["LastPrice"]),
                 limit_up=data["UpperLimitPrice"],
                 limit_down=data["LowerLimitPrice"],
                 open_price=adjust_price(data["OpenPrice"]),
                 high_price=adjust_price(data["HighestPrice"]),
                 low_price=adjust_price(data["LowestPrice"]),
                 pre_close=adjust_price(data["PreClosePrice"]),
                 bid_price_1=adjust_price(data["BidPrice1"]),
                 ask_price_1=adjust_price(data["AskPrice1"]),
                 bid_volume_1=data["BidVolume1"],
                 ask_volume_1=data["AskVolume1"],
                 gateway_name=self.gateway_name
             )
     
             if data["BidVolume2"] or data["AskVolume2"]:
                 tick.bid_price_2 = adjust_price(data["BidPrice2"])
                 tick.bid_price_3 = adjust_price(data["BidPrice3"])
                 tick.bid_price_4 = adjust_price(data["BidPrice4"])
                 tick.bid_price_5 = adjust_price(data["BidPrice5"])
     
                 tick.ask_price_2 = adjust_price(data["AskPrice2"])
                 tick.ask_price_3 = adjust_price(data["AskPrice3"])
                 tick.ask_price_4 = adjust_price(data["AskPrice4"])
                 tick.ask_price_5 = adjust_price(data["AskPrice5"])
     
                 tick.bid_volume_2 = data["BidVolume2"]
                 tick.bid_volume_3 = data["BidVolume3"]
                 tick.bid_volume_4 = data["BidVolume4"]
                 tick.bid_volume_5 = data["BidVolume5"]
     
                 tick.ask_volume_2 = data["AskVolume2"]
                 tick.ask_volume_3 = data["AskVolume3"]
                 tick.ask_volume_4 = data["AskVolume4"]
                 tick.ask_volume_5 = data["AskVolume5"]
     
             self.gateway.on_tick(tick)
     
         def connect(self, address: str, userid: str, password: str, brokerid: str) -> None:
             """连接服务器"""
             self.userid = userid
             self.password = password
             self.brokerid = brokerid
     
             #禁止重复发起连接,会导致异常,从而崩溃
             if not self.connect_status:
                 path: Path = get_folder_path(self.gateway_name.lower())
                 self.createFtdcMdApi((str(path) + "\\Md").encode("GBK"))
     
                 self.registerFront(address)
                 self.init()
     
                 self.connect_status = True
     
         def login(self) -> None:
             """用户登录"""
             ctp_req: dict = {
                 "UserID": self.userid,
                 "Password": self.password,
                 "BrokerID": self.brokerid
             }
     
             self.reqid += 1
             self.reqUserLogin(ctp_req, self.reqid)
     
         def subscribe(self, req: SubscribeRequest) -> None:
             """订阅行情"""
             if self.login_status:
                 self.subscribeMarketData(req.symbol)
             self.subscribed.add(req.symbol)
     
         def close(self) -> None:
             """关闭连接"""
             if self.connect_status:
                 self.exit()
     
         def update_date(self) -> None:
             """更新当前日期"""
             self.current_date = datetime.now().strftime("%Y%m%d")
     
  CtpMdApi类中主要包含三类方法(除构造函数__init__以外),一类是如上代码所示的login、connect、close等方法的CTP行情接口连接状态管理函数;其次是像代码中的subscribe、update_date等方法,用于管理类中的成员变量值;剩下的以on开头的函数则是回调函数,其继承了MdApi中的方法,在行情接口推送了已订阅的标的行情后,以函数onRtnDepthMarketData为例,对于底层C++接口推送来的逐笔行情,CtpMdApi将其转换为vn.py文件中定义的TickData对象并最终通过BaseGateway类(CtpGateway的父类)中的on_tick方法把tick行情通过引擎进行推送。其他的回调函数的执行逻辑与onRtnDepthMarketData方法类似,读者可以自行学习。
   3.2.2?UFT接口
  UFT是一个由恒生开发的极速交易系统,其依托于统一接入的系统UFX,可以认为UFT是一个快速交易的订单子系统,其专门负责进行极速交易,而UFX的后台业务系统可以基于UFT实现。
  vn.py的UFT接口相关文件位于vnpy_uft项目中。与CTP接口类似,UFT接口同样也维护了一套自身的行情与交易服务器,与CTP接口不同的是,UFT接口有一套不同的登录所需的字段,代码如下:
     
     //ch3/uft_gateway.py
     def connect(self, setting: dict) -> None:
         """连接交易接口"""
         userid: str = setting["用户名"]
         password: str = setting["密码"]
         md_address: str = setting["行情服务器"]
         td_address: str = setting["交易服务器"]
         self.server: str = setting["服务器类型"]
         appid: str = setting["产品名称"]
         auth_code: str = setting["授权编码"]
         application_type: str = setting["委托类型"]
     
         if not md_address.startswith("tcp://"):
             md_address = "tcp://" + md_address
     
         if not td_address.startswith("tcp://"):
             td_address = "tcp://" + td_address
     
         license_path: Path = TRADER_DIR.joinpath("license.dat")
     
         if license_path.exists():
             server_license: str = str(license_path)
         else:
             if self.server == "期货":
                 server_license: str = FUTURES_LICENSE
             else:
                 server_license: str = OPTION_LICENSE
     
         …
     )
     
  从代码中可以看出,与CTP接口连接不同,UFT接口的connect函数需要额外的服务器类型、委托类型字段,并需要根据不同的服务器类型验证证书。剩余代码与处理逻辑与CTP接口基本类似,在本节不再赘述。
3.3?vn.py文件中的数据库
  在3.1.2节中的第3部分已经简要介绍了vn.py支持的数据库后端,其默认的数据库为SQLite,基于文件系统的SQLite单文件最大支持128TB数据,并且其在索引列上的读写性能并不差,因此本节将讲解SQLite作为vn.py数据库后端的应用。
  vn.py文件中与SQLite数据库交互的模块叫作vnpy_sqlite,使用vnpy_sqlite会在用户目录下的.vntrader文件夹内生成一个database.db文件,vn.py产生的K线和tick数据都存储于其中。核心类为SqliteDatabase,SQLite中的ORM继承了peewee包中的Model进行实现。它的基类为3.1.2节中所介绍的BaseDatabase,因此其实现了表3-4中方法的具体逻辑,下面将逐一分析这些方法的实现。
  1. save_bar_data
  save_bar_data的具体逻辑代码如下:
     
     //ch3/sqlite_database.py
     def save_bar_data(self, bars: List[BarData], stream: bool = False) -> bool:
         """保存K线数据"""
         #读取主键参数
         bar: BarData = bars[0]
         symbol: str = bar.symbol
         exchange: Exchange = bar.exchange
         interval: Interval = bar.interval
     
         #将BarData数据转换为字典,并调整时区
         data: list = []
     
         for bar in bars:
             bar.datetime = convert_tz(bar.datetime)
     
             d: dict = bar.__dict__
             d["exchange"] = d["exchange"].value
             d["interval"] = d["interval"].value
             d.pop("gateway_name")
             d.pop("vt_symbol")3
             data.append(d)
     
         #使用upsert操作将数据更新到数据库中
         with self.db.atomic():
             for c in chunked(data, 50):
                 DbBarData.insert_many(c).on_conflict_replace().execute()
     
         #更新K线汇总数据
         overview: DbBarOverview = DbBarOverview.get_or_none(
             DbBarOverview.symbol == symbol,
             DbBarOverview.exchange == exchange.value,
             DbBarOverview.interval == interval.value,
         )
     
         if not overview:
             overview = DbBarOverview()
             overview.symbol = symbol
             overview.exchange = exchange.value
             overview.interval = interval.value
             overview.start = bars[0].datetime
             overview.end = bars[-1].datetime
             overview.count = len(bars)
         elif stream:
             overview.end = bars[-1].datetime
             overview.count += len(bars)
         else:
             overview.start = min(bars[0].datetime, overview.start)
             overview.end = max(bars[-1].datetime, overview.end)
     
             s: ModelSelect = DbBarData.select().where(
                 (DbBarData.symbol == symbol)
                 & (DbBarData.exchange == exchange.value)
                 & (DbBarData.interval == interval.value)
             )
             overview.count = s.count()
     
         overview.save()
     
         return True
     
  其中,DbBarData类的定义如下:
     
     //ch3/sqlite_database.py
     class DbBarData(Model):
         """K线数据表映射对象"""
     
         id: AutoField = AutoField()
     
         symbol: str = CharField()
         exchange: str = CharField()
         datetime: datetime = DateTimeField()
         interval: str = CharField()
     
         volume: float = FloatField()
         turnover: float = FloatField()
         open_interest: float = FloatField()
         open_price: float = FloatField()
         high_price: float = FloatField()
         low_price: float = FloatField()
         close_price: float = FloatField()
     
         class Meta:
             database: PeeweeSqliteDatabase = db
             indexes: tuple = ((("symbol", "exchange", "interval", "datetime"), True),)
     
  从DbBarData的代码可以看出,其对于一个K线对象不同的数据域进行了定义,例如标的代码、时间、OHLC等。在save_bar_data中,其取出一个待存储的K线对象,并获取了待存储对象的相同数据(save_bar_data接收的一批K线对象是按照同一标的进行聚合的)。
  接着将每个K线对象转换为字典对象,并使用upsert方法将字典对象按照列名存入SQLite数据库。除此之外,save_bar_data还在数据库中维护了一个K线汇总对象,便于获取与K线相关的统计信息。
  2. save_tick_data
  与save_bar_data类似,save_tick_data的代码如下:
     
     //ch3/sqlite_database.py
     def save_tick_data(self, ticks: List[TickData], stream: bool = False) -> bool:
         """保存tick数据"""
         #读取主键参数
         tick: TickData = ticks[0]
         symbol: str = tick.symbol
         exchange: Exchange = tick.exchange
     
         #将TickData数据转换为字典,并调整时区
         data: list = []
     
         for tick in ticks:
             tick.datetime = convert_tz(tick.datetime)
     
             d: dict = tick.__dict__
             d["exchange"] = d["exchange"].value
             d.pop("gateway_name")
             d.pop("vt_symbol")
             data.append(d)
     
         #使用upsert操作将数据更新到数据库中
         with self.db.atomic():
             for c in chunked(data, 10):
                 DbTickData.insert_many(c).on_conflict_replace().execute()
     
         #更新tick汇总数据
         overview: DbTickOverview = DbTickOverview.get_or_none(
             DbTickOverview.symbol == symbol,
             DbTickOverview.exchange == exchange.value,
         )
     
         if not overview:
             overview: DbTickOverview = DbTickOverview()
             overview.symbol = symbol
             overview.exchange = exchange.value
             overview.start = ticks[0].datetime
             overview.end = ticks[-1].datetime
             overview.count = len(ticks)
         elif stream:
             overview.end = ticks[-1].datetime
             overview.count += len(ticks)
         else:
             overview.start = min(ticks[0].datetime, overview.start)
             overview.end = max(ticks[-1].datetime, overview.end)
     
             s: ModelSelect = DbTickData.select().where(
                 (DbTickData.symbol == symbol)
                 & (DbTickData.exchange == exchange.value)
             )
             overview.count = s.count()
     
         overview.save()
     
         return True
     
  save_tick_data的代码结构与save_bar_data相同,大致也分为获取tick数据的共同部分数据、存储tick数据(tick数据的域定义与K线定义自然不同)与修改或创建tick数据对象。
  3. load_bar_data
  load_bar_data根据传入的筛选条件(标的代码、交易所、K线周期、起止日期)返回数据库中的K线数据,其代码如下:
     
     //ch3/sqlite_database.py
     
     def load_bar_data(
         self,
         symbol: str,
         exchange: Exchange,
         interval: Interval,
         start: datetime,
         end: datetime
     ) -> List[BarData]:
         """读取K线数据"""
         s: ModelSelect = (
             DbBarData.select().where(
                 (DbBarData.symbol == symbol)
                 & (DbBarData.exchange == exchange.value)
                 & (DbBarData.interval == interval.value)
                 & (DbBarData.datetime >= start)
                 & (DbBarData.datetime <= end)
             ).order_by(DbBarData.datetime)
         )
     
         bars: List[BarData] = []
         for db_bar in s:
             bar: BarData = BarData(
                 symbol=db_bar.symbol,
                 exchange=Exchange(db_bar.exchange),
                 datetime=datetime.fromtimestamp(db_bar.datetime.timestamp(), DB_TZ),
                 interval=Interval(db_bar.interval),
                 volume=db_bar.volume,
                 turnover=db_bar.turnover,
                 open_interest=db_bar.open_interest,
                 open_price=db_bar.open_price,
                 high_price=db_bar.high_price,
                 low_price=db_bar.low_price,
                 close_price=db_bar.close_price,
                 gateway_name="DB"
             )
             bars.append(bar)
     
         return bars
     
  使用ORM对象SELECT后,其还对筛选出的数据进行了数据类型转换,使其返回的结果为vn.py文件中的BarData对象列表,这么做的好处是能统一不同数据库同一API的返回值,便于上层应用的进一步处理。
  4. load_tick_data
  与load_bar_data类似,load_tick_data对数据库中的tick数据按照筛选条件进行获取并返回一个TickData对象的列表,代码如下:
     
     //ch3/sqlite_database.py
     def load_tick_data(
         self,
         symbol: str,
         exchange: Exchange,
         start: datetime,
         end: datetime