Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
271 views
in Technique[技术] by (71.8m points)

How to make non-blocking while loop in Python Tornado websocket to stream data

I am trying to pull data from Deribit's API, feed it into a Queue, then send it out in a while loop in a Python Tornado Websocket Server. The problem I'm facing is the while loop in the on_message method blocks the execution flow. I'm not sure how to create a way so that the websocket stays open and streams the data without blocking the the queue producing function. Is this possible to do? The code for each task is below.

Websocket code:

class web_socket_handler(ws.WebSocketHandler):
    @classmethod
    def route_urls(cls):
        return [(r'/',cls, {}),]
    
    def simple_init(self):
        self.last = time.time()
        self.stop = False
    
    def open(self):
        self.simple_init()
        print("New client connected")
        
    async def on_message(self, message):
        if message == 'BTC':
            while True:
                try:
                    reply = wsQueue.get(block=False)
                    self.write_message(reply)
                except:
                    pass
        else:
            self.write_message('bad request - closing connection!')
            self.close()
        self.last = time.time()
    
    def on_close(self):
        print("connection is closed")
        try:
            self.loop.stop()
        except:
            pass
    def close(self):
        self.ws_connection.close()
    def check_origin(self, origin):
        return True

API Code:

class Deribit:
    def __init__(self, contract, currency, q):
        self.baseUrl = "wss://www.deribit.com/ws/api/v2"
        self.contract = contract
        self.currency = currency
        self.q = q

    async def getIndex(self):
        msg = {
            "jsonrpc": "2.0",
            "method": "public/get_index",
            "params": {f"currency": "{self.currency}"},
        }
        async with websockets.connect(self.baseUrl) as websocket:
            await websocket.send(json.dumps(msg))
            while websocket.open:
                if flag == 'stop':
                    await websocket.close()
                    return
                elif not websocket.open:
                    await self.getIndex()
                else:        
                    response = await websocket.recv()
                    response = json.loads(response)
                    response['type'] = 'getIndex'
                    self.q.put(response)

    async def flowSubscribe(self):
        msg = {
            "jsonrpc": "2.0",
            "method": "public/subscribe",
            "params": {"channels": [f"trades.option.{self.currency}.raw"]},
        }
        async with websockets.connect(self.baseUrl) as websocket:
            await websocket.send(json.dumps(msg))
            while websocket.open:
                if flag == 'stop':
                    await websocket.close()
                    return
                else:        
                    response = await websocket.recv()
                    response = json.loads(response)
                    response['type'] = 'flowSubscribe'
                    self.q.put(response)
            '''if not websocket.open:
                print('flowSubscribe closed')
                await self.flowSubscribe()'''

    async def tradeSubscribe(self):
        msg = {
            "jsonrpc": "2.0",
            "method": "public/subscribe",
            "params": {"channels": [f"trades.{self.contract}.raw"]},
        }
        async with websockets.connect(self.baseUrl) as websocket:
            await websocket.send(json.dumps(msg))
            while websocket.open:
                if flag == 'stop':
                    await websocket.close()
                    return
                elif not websocket.open:
                    await self.tradeSubscribe()
                else:        
                    response = await websocket.recv()
                    response = json.loads(response)
                    response['type'] = 'tradeSubscribe'
                    self.q.put(response)

    async def orderbookSubscribe(self):
        msg = {
            "jsonrpc": "2.0",
            "method": "public/subscribe",
            "params": {"channels": [f"book.{self.contract}.100ms"]},
        }
        async with websockets.connect(self.baseUrl) as websocket:
            await websocket.send(json.dumps(msg))
            while websocket.open:
                if flag == 'stop':
                    await websocket.close()
                    return
                elif not websocket.open:
                    await self.orderbookSubscribe()
                else:        
                    response = await websocket.recv()
                    response = json.loads(response)
                    response['type'] = 'orderbookSubscribe'
                    self.q.put(response)

    async def orderbookUpdateSubscribe(self):
        msg = {
            "jsonrpc": "2.0",
            "method": "public/subscribe",
            "params": {"channels": [f"book.{self.contract}.1.raw"]},
        }
        async with websockets.connect(self.baseUrl) as websocket:
            await websocket.send(json.dumps(msg))
            while websocket.open:
                if flag == 'stop':
                    await websocket.close()
                    return 
                else:        
                    response = await websocket.recv()
                    response = json.loads(response)
                    response['type'] = 'orderbookUpdateSubscribe'
                    self.q.put(response)
            if not websocket.open:
                print('orderbookUpdateSubscribe Closed')
                await self.orderbookUpdateSubscribe()

    async def tickerSubscribe(self, i):
        msg = {
            "jsonrpc": "2.0",
            "method": "public/subscribe",
            "params": {"channels": i},
        }
        async with websockets.connect(self.baseUrl) as websocket:
            await websocket.send(json.dumps(msg))
            while websocket.open:
                if flag == 'stop':
                    await websocket.close()
                    return
                elif not websocket.open:
                    await self.tickerSubscribe(i)
                else:
                    response = await websocket.recv()
                    response = json.loads(response)
                    response['type'] = 'tickerSubscribe'
                    self.q.put(response)

    def getInstruments(self, expired):
        msg = f"https://www.deribit.com/api/v2/public/get_instruments?currency={self.currency}&expired={expired}&kind=option"
        response = requests.get(msg)
        return response.json()
    
    def getLastTradesByCurrency(self, count, expired):
        msg = f"https://www.deribit.com/api/v2/public/get_last_trades_by_currency?count={count}&currency={self.currency}"
        response = requests.get(msg)
        return response.json()

Queue Producer Function:

def threadLoop():
    global flag
    q = queue.Queue()
    flag = 'go'
    count = 0
    loop = asyncio.get_event_loop()
    time = datetime.datetime.now().time().strftime("%H:%M:%S")
    time = datetime.datetime.strptime(time, '%H:%M:%S').time()
    startTime = datetime.time(11, 58, 0)
    endTime = datetime.time(11, 58, 2)
    while True:
        time = datetime.datetime.now().time().strftime("%H:%M:%S")
        time = datetime.datetime.strptime(time, '%H:%M:%S').time()
        if time <= startTime or time >= endTime:
            t1 = threading.Thread(target=loop.run_until_complete, args=(main(q),), daemon=True)
            t1.start()
            break
    while True:
        try:
            if flag == 'stop':
                for i in range(len(tasks)):
                    tasks[i].cancel()
                t1.join()
                if t1.is_alive() is False:
                    print('threads stopped')
                    threadLoop()
                else:
                    break
            elif flag != 'stop':
                message = q.get(block = False)
                if message != None:
                    if message['type'] == 'tickerSubscribe':
                        instrument_name = message['params']['data']['instrument_name']
                        timestamp = message['params']['data']['timestamp']
                        bid_price = message['params']['data']['best_bid_price']
                        bid_iv = message['params']['data']['bid_iv']
                        ask_price = message['params']['data']['best_ask_price']
                        ask_iv = message['params']['data']['ask_iv']
                        open_interest = message['params']['data']['open_interest']
                        try:
                            old_open_interest = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 1]['open_interest']
                            orderbooks[instrument_name].append({'timestamp': timestamp, 'bid_price': bid_price, 'bid_iv': bid_iv, 'ask_price': ask_price, 'ask_iv': ask_iv, 'open_interest': open_interest, 'old_open_interest': old_open_interest})
                        except:
                            orderbooks[instrument_name].append({'timestamp': timestamp, 'bid_price': bid_price, 'bid_iv': bid_iv, 'ask_price': ask_price, 'ask_iv': ask_iv, 'open_interest': open_interest})
                    elif message['type'] == 'flowSubscribe':
                        instrument_name = message['params']['data'][0]['instrument_name']
                        trade_price = message['params']['data'][0]['price']
                        bid_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 2]['bid_price']
                        ask_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 2]['ask_price']
                        bid_trade_difference = str(bid_price - trade_price)
                        ask_trade_difference = str(ask_price - trade_price)
                        if bid_trade_difference[0] == '-':
                            bid_trade_difference = bid_trade_difference[1:]
                        if ask_trade_difference[0] == '-':
                            ask_trade_difference = ask_trade_difference[1:]
                        bid_trade_difference = float(bid_trade_difference)
                        ask_trade_difference = float(ask_trade_difference)
                        if bid_price or ask_price != message['params']['data'][0]['price']:
                            bid_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 1]['bid_price']
                            ask_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 1]['ask_price']
                            if bid_price or ask_price != message['params']['data'][0]['price']:
                                bid_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 3]['bid_price']
                                ask_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 3]['ask_price']
                                if bid_price == message['params']['data'][0]['price']:
                                    message['params']['data'][0]['direction'] == 'sell'
           

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The while-loop keeps running and, so, the ioloop doesn't get chance to run any other code.

Since, self.write_message returns an awaitable object, you can use the await statement to suspend the loop until it's resolved:

...
while True:
    await self.write_message(reply)
...

The await statement submits control back to the ioloop.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...