Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
408 views
in Technique[技术] by (71.8m points)

queue - Asyncio with multiprocessing : Producers-Consumers model

I am trying retrieve stock prices and process the prices them as they come. I am a beginner with concurrency but I thought this set up seems suited to an asyncio producers-consumers model in which each producers retrieve a stock price, and pass it to the consumers vial a queue. Now the consumers have do the stock price processing in parallel (multiprocessing) since the work is CPU intensive. Therefore I would have multiple consumers already working while not all the producers are finished retrieving data. In addition, I would like to implement a step in which, if the consumer finds that the stock price it's working on is invalid , we spawn a new consumer job for that stock.

So far, i have the following toy code that sort of gets me there, but has issues with my process_data function (the consumer).

from concurrent.futures import ProcessPoolExecutor
import asyncio
import random
import time
random.seed(444)

#producers
async def retrieve_data(ticker, q):
    '''
    Pretend we're using aiohttp to retrieve stock prices from a URL
    Place a tuple of stock ticker and price into asyn queue as it becomes available
    '''
    start = time.perf_counter() # start timer
    await asyncio.sleep(random.randint(4, 8)) # pretend we're calling some URL
    price = random.randint(1, 100) # pretend this is the price we retrieved
    print(f'{ticker} : {price} retrieved in {time.perf_counter() - start:0.1f} seconds') 
    await q.put((ticker, price)) # place the price into the asyncio queue
    

#consumers
async def process_data(q):
    while True:
        data = await q.get()
        print(f"processing: {data}")
        with ProcessPoolExecutor() as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor, data_processor, data)
            #if output of data_processing failed, send ticker back to queue to retrieve data again
            if not result[2]: 
                print(f'{result[0]} data invalid. Retrieving again...')
                await retrieve_data(result[0], q) # add a new task
                q.task_done() # end this task
            else:
                q.task_done() # so that q.join() knows when the task is done
            
async def main(tickers):       
    q = asyncio.Queue()
    producers = [asyncio.create_task(retrieve_data(ticker, q)) for ticker in tickers]
    consumers = [asyncio.create_task(process_data(q))]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too. blocks until all items in the queue have been received and processed
    for c in consumers:
        c.cancel() #cancel the consumer tasks, which would otherwise hang up and wait endlessly for additional queue items to appear
    

    
'''
RUN IN JUPYTER NOTEBOOK
'''
start = time.perf_counter()
tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
await main(tickers)
print(f'total elapsed time: {time.perf_counter() - start:0.2f}')

'''
RUN IN TERMINAL
'''
# if __name__ == "__main__":
#     start = time.perf_counter()
#     tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
#     asyncio.run(main(tickers))
#     print(f'total elapsed time: {time.perf_counter() - start:0.2f}')

The data_processor() function below, called by process_data() above needs to be in a different cell in Jupyter notebook, or a separate module (from what I understand, to avoid a PicklingError)

from multiprocessing import current_process

def data_processor(data):
    ticker = data[0]
    price = data[1]
    
    print(f'Started {ticker} - {current_process().name}')
    start = time.perf_counter() # start time counter
    time.sleep(random.randint(4, 5)) # mimic some random processing time
    
    # pretend we're processing the price. Let the processing outcome be invalid if the price is an odd number
    if price % 2==0:
        is_valid = True
    else:
        is_valid = False
    
    print(f"{ticker}'s price {price} validity: --{is_valid}--"
          f' Elapsed time: {time.perf_counter() - start:0.2f} seconds')
    return (ticker, price, is_valid)

THE ISSUES

  1. Instead of using python's multiprocessing module, i used concurrent.futures' ProcessPoolExecutor, which I read is compatible with asyncio (What kind of problems (if any) would there be combining asyncio with multiprocessing?). But it seems that I have to choose between retrieving the output (result) of the function called by the executor and being able to run several subprocesses in parallel. With the construct below, the subprocesses run sequentially, not in parallel.

    with ProcessPoolExecutor() as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor, data_processor, data)  
    

Removing result = await in front of loop.run_in_executor(executor, data_processor, data) allows to run several consumers in parallel, but then I can't collect their results from the parent process. I need the await for that. And then of course the remaining of the code block will fail.

How can I have these subprocesses run in parallel and provide the output? Perhaps it needs a different construct or something else than the producers-consumers model

  1. the part of the code that requests invalid stock prices to be retrieved again works (provided I can get the result from above), but it is ran in the subprocess that calls it and blocks new consumers from being created until the request is fulfilled. Is there a way to address this?

    #if output of data_processing failed, send ticker back to queue to retrieve data again
    if not result[2]: 
            print(f'{result[0]} data invalid. Retrieving again...')
            await retrieve_data(result[0], q) # add a new task
            q.task_done() # end this task
        else:
            q.task_done() # so that q.join() knows when the task is done
    
question from:https://stackoverflow.com/questions/65905372/asyncio-with-multiprocessing-producers-consumers-model

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

But it seems that I have to choose between retrieving the output (result) of the function called by the executor and being able to run several subprocesses in parallel.

Luckily this is not the case, you can also use asyncio.gather() to wait for multiple items at once. But you obtain data items one by one from the queue, so you don't have a batch of items to process. The simplest solution is to just start multiple consumers. Replace

# the single-element list looks suspicious anyway
consumers = [asyncio.create_task(process_data(q))]

with:

# now we have an actual list
consumers = [asyncio.create_task(process_data(q)) for _ in range(16)]

Each consumer will wait for an individual task to finish, but that's ok because you'll have a whole pool of them working in parallel, which is exactly what you wanted.

Also, you might want to make executor a global variable and not use with, so that the process pool is shared by all consumers and lasts as long as the program. That way consumers will reuse the worker processes already spawned instead of having to spawn a new process for each job received from the queue. (That's the whole point of having a process "pool".) In that case you probably want to add executor.shutdown() at the point in the program where you don't need the executor anymore.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...