前言
本篇文章將會以HbdmGateway為入口,帶你打通框架底層的執行流程。對底層程式碼沒興趣的小夥伴可以直接關閉了,他不會對以後的上層策略開發有大的影響。
連線Gateway
在run.py-》main_engine.add_gateway(HbdmGateway),主引擎新增gateway內部實現:
def add_gateway(self, gateway_class: Type[BaseGateway]) -> BaseGateway: """ Add gateway. """ gateway = gateway_class(self.event_engine) self.gateways[gateway.gateway_name] = gateway # Add gateway supported exchanges into engine for exchange in gateway.exchanges: if exchange not in self.exchanges: self.exchanges.append(exchange) return gateway
介面新增選單的時候會取得self.gateways,並設定到選單:
def init_menu(self) -> None: """""" bar = self.menuBar() # System menu sys_menu = bar.addMenu("系統") gateway_names = self.main_engine.get_all_gateway_names() for name in gateway_names: func = partial(self.connect, name) self.add_menu_action(sys_menu, f"連線{name}", "connect.ico", func) sys_menu.addSeparator() self.add_menu_action(sys_menu, "退出", "exit.ico", self.close) # App menu app_menu = bar.addMenu("功能") all_apps = self.main_engine.get_all_apps()
如下圖所示:
def connect(self, gateway_name: str) -> None: """ Open connect dialog for gateway connection. """ dialog = ConnectDialog(self.main_engine, gateway_name) dialog.exec_()
如下圖所示:
def connect(self) -> None: """ Get setting value from line edits and connect the gateway. """ setting = {} for field_name, tp in self.widgets.items(): widget, field_type = tp if field_type == list: field_value = str(widget.currentText()) else: field_value = field_type(widget.text()) setting[field_name] = field_value save_json(self.filename, setting) self.main_engine.connect(setting, self.gateway_name) self.accept()
分析上述程式碼可知,首先它會從widgets獲取引數值設定到setting,然後這句程式碼self.main_engine.connect(setting, self.gateway_name) 會呼叫對應gateway的connect方法
實現如下,首先獲取gateway例項,然後呼叫gateway的connect方法:
def connect(self, setting: dict, gateway_name: str) -> None: """ Start connection of a specific gateway. """ gateway = self.get_gateway(gateway_name) if gateway: gateway.connect(setting)
連線gateway的內部實現流程已經分析完了,下面我們介紹gateway本身內部實現。
Gateway實現
我們已經知道,連線gateway需要呼叫gateway的connect函式,這也表示了如果你需要開發一個框架現在沒有的交易所,那麼首先必須實現此函式。我們來看一下它的具體實現程式碼:
def connect(self, setting: dict): """""" key = setting["API Key"] secret = setting["Secret Key"] session_number = setting["會話數"] proxy_host = setting["代理地址"] proxy_port = setting["代理埠"] if proxy_port.isdigit(): proxy_port = int(proxy_port) else: proxy_port = 0 self.rest_api.connect(key, secret, session_number, proxy_host, proxy_port) self.trade_ws_api.connect(key, secret, proxy_host, proxy_port) self.market_ws_api.connect(key, secret, proxy_host, proxy_port) self.init_query()
分析它的程式碼我們可以看出,首先setting引數就是我們要設定的key等引數,由外部傳入。之後程式碼會分別初始化rest介面、trade介面和market介面。這三個介面接下來會詳細說,這裡我們先分析下主Gateway需要實現哪些函式。
訂閱,在介面上輸入要訂閱的symbol,按回車會呼叫到這裡的函式,程式碼如下:
def subscribe(self, req: SubscribeRequest): """""" self.market_ws_api.subscribe(req)
引數很簡單,是一個由symbol和exchange合成的vt_symbol:
@dataclassclass SubscribeRequest: """ Request sending to specific gateway for subscribing tick data update. """ symbol: str exchange: Exchange def __post_init__(self): """""" self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
之後會呼叫行情介面market_ws_api的訂閱函式,具體實現如下:
def subscribe_data(self, ws_symbol: str): """""" # Subscribe to market depth update self.req_id += 1 req = { "sub": f"market.{ws_symbol}.depth.step0", "id": str(self.req_id) } self.send_packet(req) # Subscribe to market detail update self.req_id += 1 req = { "sub": f"market.{ws_symbol}.detail", "id": str(self.req_id) } self.send_packet(req)
分析上述程式碼可知,行情訂閱實現了兩個功能,depth(掛單),detail(tick)。當交易所掛單資料有變更或者tick資料有變更的時候,交易所會自動推送資料過來,回撥函式如下:
def on_data(self, packet): # type: (dict)->None """""" channel = packet.get("ch", None) if channel: if "depth.step" in channel: self.on_market_depth(packet) elif "detail" in channel: self.on_market_detail(packet) elif "err_code" in packet: code = packet["err_code"] msg = packet["err_msg"] self.gateway.write_log(f"錯誤程式碼:{code}, 錯誤資訊:{msg}") def on_market_depth(self, data): """行情深度推送 """ ws_symbol = data["ch"].split(".")[1] tick = self.ticks[ws_symbol] tick.datetime = datetime.fromtimestamp(data["ts"] / 1000) tick_data = data["tick"] if "bids" not in tick_data or "asks" not in tick_data: return bids = tick_data["bids"] for n in range(5): price, volume = bids[n] tick.__setattr__("bid_price_" + str(n + 1), float(price)) tick.__setattr__("bid_volume_" + str(n + 1), float(volume)) asks = tick_data["asks"] for n in range(5): price, volume = asks[n] tick.__setattr__("ask_price_" + str(n + 1), float(price)) tick.__setattr__("ask_volume_" + str(n + 1), float(volume)) if tick.last_price: self.gateway.on_tick(copy(tick)) def on_market_detail(self, data): """市場細節推送""" ws_symbol = data["ch"].split(".")[1] tick = self.ticks[ws_symbol] tick.datetime = datetime.fromtimestamp(data["ts"] / 1000) tick_data = data["tick"] tick.open_price = tick_data["open"] tick.high_price = tick_data["high"] tick.low_price = tick_data["low"] tick.last_price = tick_data["close"] tick.volume = tick_data["vol"] if tick.bid_price_1: self.gateway.on_tick(copy(tick))
訂閱的程式碼分析完了,下面我們分析下委託。
def send_order(self, req: OrderRequest): """""" return self.rest_api.send_order(req)
引數如下,主要包括交易對程式碼symbol,交易方向direction,交易數量volume,交易價格price等等:
@dataclassclass OrderRequest: """ Request sending to specific gateway for creating a new order. """ symbol: str exchange: Exchange direction: Direction type: OrderType volume: float price: float = 0 offset: Offset = Offset.NONE reference: str = "" def __post_init__(self): """""" self.vt_symbol = f"{self.symbol}.{self.exchange.value}" def create_order_data(self, orderid: str, gateway_name: str) -> OrderData: """ Create order data from request. """ order = OrderData( symbol=self.symbol, exchange=self.exchange, orderid=orderid, type=self.type, direction=self.direction, offset=self.offset, price=self.price, volume=self.volume, gateway_name=gateway_name, ) return order
之後會呼叫rest介面具體的發單函式實現:
def send_order(self, req: OrderRequest): """""" local_orderid = self.new_local_orderid() order = req.create_order_data( local_orderid, self.gateway_name ) order.time = datetime.now().strftime("%H:%M:%S") # 新增 symbol, contract_type = req.symbol.split("_") contract_type = CONTRACT_TYPE_2MAP[contract_type] data = { # 註釋 # "contract_code": req.symbol, # 新增 "symbol": symbol, "contract_type": contract_type, "client_order_id": int(local_orderid), "price": req.price, "volume": int(req.volume), "direction": DIRECTION_VT2HBDM.get(req.direction, ""), "offset": OFFSET_VT2HBDM.get(req.offset, ""), "order_price_type": ORDERTYPE_VT2HBDM.get(req.type, ""), "lever_rate": 20 } self.add_request( method="POST", path="/api/v1/contract_order", callback=self.on_send_order, data=data, extra=order, on_error=self.on_send_order_error, on_failed=self.on_send_order_failed ) self.gateway.on_order(order) return order.vt_orderid
分析上述程式碼,發現程式碼很簡單,首先生成本地訂單ID建立訂單物件,然後解析下傳入的symbol以適應交易所需要的規範,之後就是post傳送請求到交易所,特別注意的是有幾個回撥函式需要設定下:callback、on_error、on_failed,用來處理髮單成功失敗等結果。
其他還有一些介面,比如查詢賬戶、查詢持倉、查詢歷史或者當前訂單等等,這些函式不是必須的。因為大部分的資料都通過了訂閱由交易所主動推送過來了。當然,如果你想主動去查詢也是可以的,這裡就不一一分析了。接下來我們再分析下訂單的推送,這個是很重要的,因為大部分的交易邏輯都是基於訂單推送的。
HbdmTradeWebsocketApi,負責處理交易資料的推送,首先還是分析訂閱:
def subscribe(self): """""" self.req_id += 1 req = { "op": "sub", "cid": str(self.req_id), "topic": f"orders.*" } self.send_packet(req) # 新增 # 訂閱資產變更事件 self.req_id += 1 req = { "op": "sub", "cid": str(self.req_id), "topic": f"accounts.*" } self.send_packet(req) # 新增 # 訂閱持倉變更事件 self.req_id += 1 req = { "op": "sub", "cid": str(self.req_id), "topic": f"positions.*" } self.send_packet(req)
分析上述程式碼我們可以看到,訂閱了三種資料,訂單資料推送,賬戶資產變更推送,持倉資料變更推送。下面的兩個新增的推送,是我另外實現的,這個不是必要的,之前已經說過了,大部分的交易邏輯其實不需要持倉和資產的資料,只需要訂單資料的推送就可以了。
我們重點關注訂單資料推送,看下火幣的說明:
訂閱訂單更新API Key 許可權:讀取訂單的更新推送由任一以下事件觸發: - 訂單建立(eventType=creation) - 訂單成交(eventType=trade) - 訂單撤銷(eventType=cancellation)不同事件型別所推送的訊息中,欄位列表略有不同。開發者可以採取以下兩種方式設計返回的資料結構: - 定義一個包含所有欄位的資料結構,並允許某些欄位為空 - 定義三個資料結構,分別包含各自的欄位,並繼承自一個包含公共資料欄位的資料結構
建立、成交、撤銷三種事件都會推送過來,足夠了。那麼推送過來的資料到底是啥樣的呢?
{ "action":"push", "ch":"orders#btcusdt", "data": { "orderSize":"2.000000000000000000", "orderCreateTime":1583853365586, "orderPrice":"77.000000000000000000", "type":"sell-limit", "orderId":27163533, "clientOrderId":"abc123", "orderStatus":"submitted", "symbol":"btcusdt", "eventType":"creation" }}
上面就是一個推送過來的資料例子,可以看到包含了訂單大小、建立時間、價格、訂單ID、訂單狀態、訂單方向(買或者賣)、訂單的交易對。
那麼我們的訂單回撥程式碼裡面只要把這些資料推送給各個註冊了事件的引擎就OK了,具體程式碼如下:
def on_order(self, data: dict): """""" dt = datetime.fromtimestamp(data["created_at"] / 1000) time = dt.strftime("%H:%M:%S") if data["client_order_id"]: orderid = data["client_order_id"] else: orderid = data["order_id"] order = OrderData( # 註釋 # symbol=data["contract_code"], # 新增 symbol=data["symbol"] + "_" + CONTRACT_TYPE_MAP[data["contract_type"]], exchange=Exchange.HUOBI, orderid=orderid, type=ORDERTYPE_HBDM2VT[data["order_price_type"]], direction=DIRECTION_HBDM2VT[data["direction"]], offset=OFFSET_HBDM2VT[data["offset"]], price=data["price"], volume=data["volume"], traded=data["trade_volume"], status=STATUS_HBDM2VT[data["status"]], time=time, gateway_name=self.gateway_name ) self.gateway.on_order(order) # Push trade event trades = data["trade"] if not trades: return for d in trades: dt = datetime.fromtimestamp(d["created_at"] / 1000) time = dt.strftime("%H:%M:%S") trade = TradeData( symbol=order.symbol, exchange=Exchange.HUOBI, orderid=order.orderid, tradeid=str(d["trade_id"]), direction=order.direction, offset=order.offset, price=d["trade_price"], volume=d["trade_volume"], time=time, gateway_name=self.gateway_name, ) self.gateway.on_trade(trade)
可以看到,當我們收到資料之後,會構建資料結構比如OrderData、TradeData。然後由事件引擎負責推送給所有註冊了指定事件型別的物件。以CTA策略引擎為例,它註冊了EVENT_ORDER和、EVENT_TRADE事件,那麼資料就會推送到他註冊的事件對應處理函式裡面。
結語
gateway主動查詢資料或者訂閱資料接收交易所的資料推送,產生資料之後通過事件引擎又推送給所有註冊了對應事件型別的模組。比如介面模組、CTA策略模組、價差模組等等。
以介面模組為例,它想要顯示資料,那麼他註冊了事件,gateway收到資料之後會put事件,介面模組收到事件訊息,得到資料,然後顯示到介面。比如訂單資料,資產資料、行情資料等等。
下一篇文章會對事件引擎詳細的介紹,希望大家繼續關注。