Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
306 views
in Technique[技术] by (71.8m points)

python - Giving access to shared memory after child processes have already started

How do I give child processes access to data in shared memory if the data is only available after the child processes have been spawned (using multiprocessing.Process)?

I am aware of multiprocessing.sharedctypes.RawArray, but I can't figure out how to give my child processes access to a RawArray that is created after the processes have already started.

The data is generated by the parent process, and the amount of data is not known in advance.

If not for the GIL I'd be using threading instead which will make this task a little simpler. Using a non-CPython implementation is not an option.


Looking under the hood of muliprocessing.sharedctypes, it looks like shared ctype objects are allocated using mmaped memory.

So this question really boils down to: Can a child process access an anonymously mapped memory if mmap() was called by the parent after the child process was spawned?

That's somewhat in the vein of what's being asked in this question, except that in my case the caller of mmap() is the parent process and not the child process.


(Solved)

I created my own version of RawArray that uses shm_open() under the hood. The resulting shared ctypes array can be shared with any process as long as the identifier (tag) matches.

See this answer for details and an example.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Disclaimer: I am the author of the question.

I eventually used the posix_ipc module to create my own version of RawArray. I used mainly posix_ipc.SharedMemory which calls shm_open() under the hood.

My implementation (ShmemRawArray) exposes the same functionality as RawArray but required two additional parameters - a tag to uniquely identify the shared memory region, and a create flag to determine if we should be created a new shared memory segment or attach to an existing one.

Here's a copy if anyone's interested: https://gist.github.com/1222327

ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)

Usage notes:

  • The first two args (typecode_or_type and size_or_initializer) should work the same as with RawArray.
  • The shared array is accessible by any process, as long as tag matches.
  • The shared memory segment is unlinked when the origin object (returned by ShmemRawArray(..., create=True)) is deleted
  • Creating an shared array using a tag that currently exists will raise an ExistentialError
  • Accessing a shared array using a tag that doesn't exist (or one that has been unlinked) will also raise an ExistentialError

A SSCCE (Short, Self Contained, Compilable Example) showing it in action.

#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray

class Point(ctypes.Structure):
    _fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]

def worker(q):
    # get access to ctypes array shared by parent
    count, tag = q.get()
    shared_data = ShmemRawArray(Point, count, tag, False)

    proc_name = multiprocessing.current_process().name
    print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]

if __name__ == '__main__':
    procs = []
    np = multiprocessing.cpu_count()
    queue = multiprocessing.Queue()

    # spawn child processes
    for i in xrange(np):
        p = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(p)
        p.start()

    # create a unique tag for shmem segment
    tag = "stack-overflow-%d" % multiprocessing.current_process().pid

    # random number of points with random data
    count = randint(3,10) 
    combined_data = [Point(x=random(), y=random()) for i in xrange(count)]

    # create ctypes array in shared memory using ShmemRawArray
    # - we won't be able to use multiprocssing.sharectypes.RawArray here 
    #   because children already spawned
    shared_data = ShmemRawArray(Point, combined_data, tag)

    # give children info needed to access ctypes array
    for p in procs:
        queue.put((count, tag))

    print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
    for p in procs:
        p.join()

Running this results in the following output:

[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...