I have a script to subscribe to websockets of over 1000 instruments on Deribit's API and feed the results to a queue and runs in a separate thread, however after a certain amount of websockets are opened within 1 second, I get rate limited. My solution is to open a websocket and then sleep for 100ms before awaiting the next websocket task, however I cannot figure out how to do this with asyncio. Here is the websocket code:
class Deribit:
def __init__(self, contract, currency, q):
self.baseUrl = "wss://www.deribit.com/ws/api/v2"
self.contract = contract
self.currency = currency
self.q = q
async def tickerSubscribe(self):
msg = {
"jsonrpc": "2.0",
"method": "public/subscribe",
"params": {"channels": [f"ticker.{self.contract}.raw"]},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'tickerSubscribe'
self.q.put(response)
if not websocket.open:
print('tickerSubscribe Closed')
await self.tickerSubscribe()
async def flowSubscribe(self):
msg = {
"jsonrpc": "2.0",
"method": "public/subscribe",
"params": {"channels": [f"trades.option.{self.currency}.raw"]},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'flowSubscribe'
self.q.put(response)
if not websocket.open:
print('flowSubscribe closed')
await self.flowSubscribe()
Then here is the code to await the tasks:
async def main():
tasks = []
q = queue.Queue()
coins = []
print('Getting instruments..')
while True:
btc_instruments = Deribit('BTC', 'BTC', '').getInstruments('false')
if btc_instruments != None:
eth_instruments = Deribit('ETH', 'ETH', '').getInstruments('false')
break
print('Adding instruments to list..')
for instrument in range(len(btc_instruments['result'])):
orderbooks[btc_instruments['result'][instrument]['instrument_name']] = []
coins.append(btc_instruments['result'][instrument]['instrument_name'])
for instrument in range(len(eth_instruments['result'])):
orderbooks[eth_instruments['result'][instrument]['instrument_name']] = []
coins.append(eth_instruments['result'][instrument]['instrument_name'])
for coin in range(len(coins)):
tasks.append(asyncio.create_task(Deribit(coins[coin], '', q).tickerSubscribe()))
tasks.append(asyncio.create_task(Deribit('', 'BTC', q).flowSubscribe()))
tasks.append(asyncio.create_task(Deribit('', 'ETH', q).flowSubscribe()))
print(f'Length of Tasks list : {len(tasks)}')
print('Subscribing to instruments..')
for t in range(len(tasks)):
await tasks[t]
await asyncio.sleep(0.1)
Here is the traceback:
self.run()
File "/usr/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "ws_correct_trades.py", line 199, in main
await tasks[t]
File "ws_correct_trades.py", line 133, in tickerSubscribe
async with websockets.connect(self.baseUrl) as websocket:
File "/usr/local/lib/python3.8/dist-packages/websockets/client.py", line 517, in __aenter__
return await self
File "/usr/local/lib/python3.8/dist-packages/websockets/client.py", line 535, in __await_impl__
transport, protocol = await self._create_connection()
File "/usr/lib/python3.8/asyncio/base_events.py", line 1050, in create_connection
transport, protocol = await self._create_connection_transport(
File "/usr/lib/python3.8/asyncio/base_events.py", line 1080, in _create_connection_transport
await waiter
ConnectionResetError