Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
180 views
in Technique[技术] by (71.8m points)

machine learning - generating a list of arrays using multiprocessing in python

I am having difficulty implementing parallelisation for generating a list of arrays. In this case, each array is generated independently, and then appended to a list. Somehow multiprocessing.apply_asynch() is outputting an empty array when I feed it with complicated arguments.

More specifically, just to give the context, I am attempting implement a machine learning algorithm using parallelisation . The idea is the following: I have an 'system', and an 'agent' which performs actions on the system. To teach the agent (in this case a neural net) how to behave optimally (with respect to a certain reward scheme that I have omitted here), the agent needs to generate trajectories of the system by applying actions on it. From the obtained reward obtained upon performing the actions, the agent then learns what to do and what not to do. Note importantly that the possible actions in the code are referred to as integers with:

    possible_actions = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
    

So here I am attempting to generate many such trajectories using multiprocessing (sorry the code is not runnable here as it requires many other files, but I'm hoping somebody can spot the issue):

    from quantum_simulator_EC import system
    from reinforce_keras_EC import Agent
    import multiprocessing as mp


    s = system(1200, N=3)
    s.set_initial_state([0,0,1])  
    agent = Agent(alpha=0.0003,  gamma=0.95, n_actions=len( s.actions ))


    def get_result(result):
        global action_batch
        action_batch.append(result)

    def generate_trajectory(s, agent):

        sequence_of_actions = []

        for k in range( 5 ):

            net_input = s.generate_net_input_FULL(6)
            action = agent.choose_action( net_input )
    
            sequence_of_actions.append(action)
    

        return sequence_of_actions
    
    action_batch = []

    pool = mp.Pool(2)
    for i in range(0, batch_size):
        pool.apply_async(generate_trajectory, args=(s,agent), callback=get_result)
    pool.close()
    pool.join()

    print(action_batch)

The problem is the code returns an empty array []. Can somebody explain to me what the issue is? Are there restrictions on the kind of arguments that I can pass to apply_asynch? In this example I am passing my system 's' and my 'agent', both complicated objects. I am mentioning this because when I test my code with simple arguments like integers or matrices, instead of agent and system, it works fine. If there is no obvious reason why it's not working, if somebody has some tips to debug the code that would also be helpful.

Note that there is no problem if I do not use multiprocessing by replacing the last part by:

    action_batch = []

    for i in range(0, batch_size):
        get_result( generate_sequence(s,agent) )

    print(action_batch)

And in this case, the output here is as expected, a list of sequences of 5 actions:

    [[4, 2, 1, 1, 7], [8, 2, 2, 12, 1], [8, 1, 9, 11, 9], [7, 10, 6, 1, 0]]
question from:https://stackoverflow.com/questions/65848446/generating-a-list-of-arrays-using-multiprocessing-in-python

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The final results can directly be appended to a list in the main process, no need to create a callback function. Then you can close and join the pool, and finally retrieve all the results using get.

See the following two examples, using apply_async and starmap_async, (see this post for the difference).

Solution apply

import multiprocessing as mp
import time


def func(s, agent):
    print(f"Working on task {agent}")
    time.sleep(0.1)  # some task
    return (s, s, s)


if __name__ == '__main__':
    agent = "My awesome agent"
    with mp.Pool(2) as pool:
        results = []
        for s in range(5):
            results.append(pool.apply_async(func, args=(s, agent)))
        pool.close()
        pool.join()

    print([result.get() for result in results])

Solution starmap

import multiprocessing as mp
import time


def func(s, agent):
    print(f"Working on task {agent}")
    time.sleep(0.1)  # some task
    return (s, s, s)


if __name__ == '__main__':
    agent = "My awesome agent"
    with mp.Pool(2) as pool:
        result = pool.starmap_async(func, [(s, agent) for s in range(5)])
        pool.close()
        pool.join()

    print(result.get())
Output
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
Working on task My awesome agent
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...