Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
193 views
in Technique[技术] by (71.8m points)

Multiprocessing starmap_async python

I am learning to use multiprocessing in python and I have a question. I want to count the number of times an object (i.e. tuple of words) is in a list. I propose two options. The first using pool.starmap_async and the second without multiprocessing.

ngrams=[('review', 'productivity'), ('productivity', 'satisfaction'), ('satisfaction', 'democratic'), ('democratic', 'autocratic'), ('autocratic', 'leadership'), ('leadership', 'empirical'), ('empirical', 'literature'), ('literature', 'explore'), ('explore', 'organizational_outcome'), ('organizational_outcome', 'democratic'), ('democratic', 'leadership'), ('leadership', 'task##oriented'), ('task##oriented', 'group'), ('group', 'individual'), ('individual', 'member'), ('member', 'productivity'), ('productivity', 'satisfaction'), ('satisfaction', 'receive'), ('receive', 'attention'), ('attention', 'emphasis')]
ngrams_uniq=[('satisfaction', 'democratic'), ('organizational_outcome', 'democratic'), ('review', 'productivity'), ('democratic', 'leadership'), ('member', 'productivity'), ('receive', 'attention'), ('empirical', 'literature'), ('group', 'individual'), ('literature', 'explore'), ('democratic', 'autocratic'), ('autocratic', 'leadership'), ('attention', 'emphasis'), ('task##oriented', 'group'), ('explore', 'organizational_outcome'), ('leadership', 'task##oriented'), ('satisfaction', 'receive'), ('productivity', 'satisfaction'), ('leadership', 'empirical'), ('individual', 'member')]

def count_ngrams(gram,ngrams):
  return (gram,ngrams.count(gram))

##With Pool

print(time.strftime("%H:%M:%S"))
pool = mp.Pool(mp.cpu_count())
dict_freq_ngrams=pool.starmap_async(count_ngrams,[(gram,ngrams) for gram in ngrams_uniq]).get()
pool.close()
print(time.strftime("%H:%M:%S"))

##Without Pool

print(time.strftime("%H:%M:%S"))
dict_freq_ngrams=[count_ngrams(gram,ngrams) for gram in ngrams_uniq]
print(time.strftime("%H:%M:%S"))

When I measure the execution time I always get that the second option is faster. I don't understand why that happens ... maybe I have an error but I don't know what it is.

Thanks in advance


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I dont think you have an error rather the overhead of copying the data to the new interpreter form multiprocessing outwais the speed gains made by paralel computation as just starting pool takes 0.2 to 0.3 seconds on my surface

hers the code i used to testenter image description here

import time
import multiprocessing as mp
import matplotlib.pyplot as plt
import numpy as np
import copy

ngrams=[('review', 'productivity'), ('productivity', 'satisfaction'), ('satisfaction', 'democratic'), ('democratic', 'autocratic'), ('autocratic', 'leadership'), ('leadership', 'empirical'), ('empirical', 'literature'), ('literature', 'explore'), ('explore', 'organizational_outcome'), ('organizational_outcome', 'democratic'), ('democratic', 'leadership'), ('leadership', 'task##oriented'), ('task##oriented', 'group'), ('group', 'individual'), ('individual', 'member'), ('member', 'productivity'), ('productivity', 'satisfaction'), ('satisfaction', 'receive'), ('receive', 'attention'), ('attention', 'emphasis')]*40
ngrams_uniq=[('satisfaction', 'democratic'), ('organizational_outcome', 'democratic'), ('review', 'productivity'), ('democratic', 'leadership'), ('member', 'productivity'), ('receive', 'attention'), ('empirical', 'literature'), ('group', 'individual'), ('literature', 'explore'), ('democratic', 'autocratic'), ('autocratic', 'leadership'), ('attention', 'emphasis'), ('task##oriented', 'group'), ('explore', 'organizational_outcome'), ('leadership', 'task##oriented'), ('satisfaction', 'receive'), ('productivity', 'satisfaction'), ('leadership', 'empirical'), ('individual', 'member')]
ngrams_copy=copy.copy(ngrams)

def count_ngrams(gram,ngrams):
    return (gram,ngrams.count(gram))



if __name__ == "__main__":
    std = np.array([])
    Pool= np.array([])
    for i in range(100):
        
        t = time.time()
        with mp.Pool(mp.cpu_count()) as pool:
            res=pool.starmap_async(count_ngrams,[(val, ngrams) for val in ngrams_uniq])
            dict_freq_ngrams = res.get()#(gram,ngrams) for gram in ngrams_uniq]

        Pool = np.append(Pool, np.array(time.time() - t))
        print(i)

        t = time.time()
        dict_freq_ngrams=[count_ngrams(gram,ngrams) for gram in ngrams_uniq]
        std = np.append(std, np.array(time.time() - t))
        ngrams = ngrams+ngrams_copy

    plt.plot(std)
    plt.plot(Pool)
    plt.show()

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...