Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
226 views
in Technique[技术] by (71.8m points)

python - How to sleep between awaiting pythonasyncio tasks to avoid rate limits

I have a script to subscribe to websockets of over 1000 instruments on Deribit's API and feed the results to a queue and runs in a separate thread, however after a certain amount of websockets are opened within 1 second, I get rate limited. My solution is to open a websocket and then sleep for 100ms before awaiting the next websocket task, however I cannot figure out how to do this with asyncio. Here is the websocket code:

class Deribit:
    def __init__(self, contract, currency, q):
        self.baseUrl = "wss://www.deribit.com/ws/api/v2"
        self.contract = contract
        self.currency = currency
        self.q = q

    async def tickerSubscribe(self):
        msg = {
            "jsonrpc": "2.0",
            "method": "public/subscribe",
            "params": {"channels": [f"ticker.{self.contract}.raw"]},
        }
        async with websockets.connect(self.baseUrl) as websocket:
            await websocket.send(json.dumps(msg))
            while websocket.open:
                if flag == 'stop':
                    await websocket.close()
                    return
                else:        
                    response = await websocket.recv()
                    response = json.loads(response)
                    response['type'] = 'tickerSubscribe'
                    self.q.put(response)
            if not websocket.open:
                print('tickerSubscribe Closed')
                await self.tickerSubscribe()
    async def flowSubscribe(self):
        msg = {
            "jsonrpc": "2.0",
            "method": "public/subscribe",
            "params": {"channels": [f"trades.option.{self.currency}.raw"]},
        }
        async with websockets.connect(self.baseUrl) as websocket:
            await websocket.send(json.dumps(msg))
            while websocket.open:
                if flag == 'stop':
                    await websocket.close()
                    return
                else:        
                    response = await websocket.recv()
                    response = json.loads(response)
                    response['type'] = 'flowSubscribe'
                    self.q.put(response)
            if not websocket.open:
                print('flowSubscribe closed')
                await self.flowSubscribe()

Then here is the code to await the tasks:

async def main():
    tasks = []
    q = queue.Queue()
    coins = []
    print('Getting instruments..')
    while True:
        btc_instruments = Deribit('BTC', 'BTC', '').getInstruments('false')
        if btc_instruments != None:
            eth_instruments = Deribit('ETH', 'ETH', '').getInstruments('false')
            break
    print('Adding instruments to list..')
    for instrument in range(len(btc_instruments['result'])):
        orderbooks[btc_instruments['result'][instrument]['instrument_name']] = []
        coins.append(btc_instruments['result'][instrument]['instrument_name'])
    for instrument in range(len(eth_instruments['result'])):
        orderbooks[eth_instruments['result'][instrument]['instrument_name']] = []
        coins.append(eth_instruments['result'][instrument]['instrument_name'])
    for coin in range(len(coins)):
        tasks.append(asyncio.create_task(Deribit(coins[coin], '', q).tickerSubscribe()))
    tasks.append(asyncio.create_task(Deribit('', 'BTC', q).flowSubscribe()))
    tasks.append(asyncio.create_task(Deribit('', 'ETH', q).flowSubscribe()))
    print(f'Length of Tasks list : {len(tasks)}')
    print('Subscribing to instruments..')
    for t in range(len(tasks)):
        await tasks[t]
        await asyncio.sleep(0.1)

Here is the traceback:

self.run()
File "/usr/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in  run_until_complete
     return future.result()
File "ws_correct_trades.py", line 199, in main
    await tasks[t]
File "ws_correct_trades.py", line 133, in tickerSubscribe
    async with websockets.connect(self.baseUrl) as websocket:
File "/usr/local/lib/python3.8/dist-packages/websockets/client.py", line 517, in __aenter__
    return await self
File "/usr/local/lib/python3.8/dist-packages/websockets/client.py", line  535, in __await_impl__
    transport, protocol = await self._create_connection()
File "/usr/lib/python3.8/asyncio/base_events.py", line 1050, in    create_connection
    transport, protocol = await self._create_connection_transport(
File "/usr/lib/python3.8/asyncio/base_events.py", line 1080, in    _create_connection_transport
    await waiter
ConnectionResetError

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

...open a websocket and then sleep for 100ms before awaiting the next websocket task...

From the documentation, there are three ways to start a coroutine. One is with asyncio.create_task. In your process, the loops that create the tasks are starting them immediately with no delay. Here is a, hopefully analogous, mre - adapted directly from the example in the docs.

import asyncio, time

async def say_after(delay, what):
    # await asyncio.sleep(delay)
    print(f"{time.perf_counter():1.5f} {what:>7}")

async def main():
    tasks = []
    print(f"start creating tasks {time.perf_counter():1.5f}")
    for i in range(5):
        tasks.append(asyncio.create_task(say_after(.1, f'task-{i}'),name=f'task-{i}'))

    print(f"sleeping 3 before awaiting tasks {time.perf_counter():1.5f}")
    await asyncio.sleep(3)
    for task in tasks:
        print(f"awaiting {task.get_name()} | {time.perf_counter():1.5f}")
        await task
 
 
asyncio.run(main())

start creating tasks 0.79750
sleeping 3 before awaiting tasks 0.79803
0.79858  task-0
0.79891  task-1
0.79926  task-2
0.80382  task-3
0.80421  task-4
awaiting task-0 | 3.80519
awaiting task-1 | 3.80611
awaiting task-2 | 3.80744
awaiting task-3 | 3.80816
awaiting task-4 | 3.81093

You can see that the tasks are starting immediately when they are created - not when they are awaited.


Even if you add a delay in the couroutine, it doesn't accomplish what you want.

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{time.perf_counter():1.5f} {what:>7}")

>>>
start creating tasks 0.89468
sleeping 3 before awaiting tasks 0.89484
1.00323  task-0
1.00376  task-1
1.00431  task-4
1.00467  task-3
1.00508  task-2
awaiting task-0 | 3.90330
awaiting task-1 | 3.90421
awaiting task-2 | 3.90542
awaiting task-3 | 3.90617
awaiting task-4 | 3.91162

Each one processes 100 mS after it is started but no delay between them.


Adding the delay to the loop should do what you want - create each task 100 mS apart.

async def say_after(delay, what):
    # await asyncio.sleep(delay)
    print(f"{time.perf_counter():1.5f} {what:>7}")

async def main():
    tasks = []
    print(f"start creating tasks {time.perf_counter():1.5f}")
    for i in range(5):
        await asyncio.sleep(.1)
        tasks.append(asyncio.create_task(say_after(.1, f'task-{i}'),name=f'task-{i}'))

    print(f"sleeping 3 before awaiting tasks {time.perf_counter():1.5f}")
    await asyncio.sleep(3)
    for task in tasks:
        print(f"awaiting {task.get_name()} | {time.perf_counter():1.5f}")
        await task

>>>
start creating tasks 0.81102
0.91610  task-0
1.02837  task-1
1.13217  task-2
1.24762  task-3
sleeping 3 before awaiting tasks 1.35085
1.35127  task-4
awaiting task-0 | 4.36476
awaiting task-1 | 4.36564
awaiting task-2 | 4.36670
awaiting task-3 | 4.36752
awaiting task-4 | 4.37601

Or maybe you could submit in batches waiting between each batch.

async def main():
    tasks = []
    print(f"start creating tasks {time.perf_counter():1.5f}")
    flag = 0
    n_per_second = 3
    while flag < 3:    # limit the number of batches for the example.
        print(f'batch:{flag}')
        for i in range(n_per_second):
            name = f'task-{(flag*10)+i}'
            tasks.append(asyncio.create_task(say_after(.1, name),name=name))

        await asyncio.sleep(1 / n_per_second)
        flag += 1
    print(f"sleeping 3 before awaiting tasks {time.perf_counter():1.5f}")
    await asyncio.sleep(3)
    for task in tasks:
        print(f"awaiting {task.get_name()} | {time.perf_counter():1.5f}")
        await task

asyncio.run(main()

>>>
start creating tasks 0.84680
batch:0        
0.84742  task-0
0.84749  task-1
0.84755  task-2
batch:1
1.19304 task-10
1.19334 task-11
1.19372 task-12
batch:2
1.54109 task-20
1.54149 task-21
1.54171 task-22
sleeping 3 before awaiting tasks 1.88965
awaiting task-0 | 4.90472
awaiting task-1 | 4.90519
awaiting task-2 | 4.90568
awaiting task-10 | 4.90766
awaiting task-11 | 4.90802
awaiting task-12 | 4.90836
awaiting task-20 | 4.91029
awaiting task-21 | 4.91057
awaiting task-22 | 4.91096

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...