Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
332 views
in Technique[技术] by (71.8m points)

python - Twisted network client with multiprocessing workers?

So, I've got an application that uses Twisted + Stomper as a STOMP client which farms out work to a multiprocessing.Pool of workers.

This appears to work ok when I just use a python script to fire this up, which (simplified) looks something like this:

# stompclient.py

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
reactor.connectTCP(host, port, StompClientFactory())
reactor.run()

As this gets packaged for deployment, I thought I would take advantage of the twistd script and run this from a tac file.

Here's my very-similar-looking tac file:

# stompclient.tac

logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)

# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool.  (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)

StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination

application = service.Application('myapp')

service = internet.TCPClient(host, port, StompClientFactory())
service.setServiceParent(application)

For the sake of illustration, I have collapsed or changed a few details; hopefully they were not the essence of the problem. For example, my app has a plugin system, the pool is initialized by a separate method, and then work is delegated to the pool using pool.apply_async() passing one of my plugin's process() methods.

So, if I run the script (stompclient.py), everything works as expected.

It also appears to work OK if I run twist in non-daemon mode (-n):

twistd -noy stompclient.tac

however, it does not work when I run in daemon mode:

twistd -oy stompclient.tac

The application appears to start up OK, but when it attempts to fork off work, it just hangs. By "hangs", I mean that it appears that the child process is never asked to do anything and the parent (that called pool.apply_async()) just sits there waiting for the response to return.

I'm sure that I'm doing something stupid with Twisted + multiprocessing, but I'm really hoping that someone can explain to my the flaw in my approach.

Thanks in advance!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Since the difference between your working invocation and your non-working invocation is only the "-n" option, it seems most likely that the problem is caused by the daemonization process (which "-n" prevents from happening).

On POSIX, one of the steps involved in daemonization is forking and having the parent exit. Among of things, this has the consequence of having your code run in a different process than the one in which the .tac file was evaluated. This also re-arranges the child/parent relationship of processes which were started in the .tac file - as your pool of multiprocessing processes were.

The multiprocessing pool's processes start off with a parent of the twistd process you start. However, when that process exits as part of daemonization, their parent becomes the system init process. This may cause some problems, although probably not the hanging problem you described. There are probably other similarly low-level implementation details which normally allow the multiprocessing module to work but which are disrupted by the daemonization process.

Fortunately, avoiding this strange interaction should be straightforward. Twisted's service APIs allow you to run code after daemonization has completed. If you use these APIs, then you can delay the initialization of the multiprocessing module's process pool until after daemonization and hopefully avoid the problem. Here's an example of what that might look like:

from twisted.application.service import Service

class MultiprocessingService(Service):
    def startService(self):
        self.pool = multiprocessing.Pool(processes=processes)

MultiprocessingService().setServiceParent(application)

Now, separately, you may also run into problems relating to clean up of the multiprocessing module's child processes, or possibly issues with processes created with Twisted's process creation API, reactor.spawnProcess. This is because part of dealing with child processes correctly generally involves handling the SIGCHLD signal. Twisted and multiprocessing aren't going to be cooperating in this regard, though, so one of them is going to get notified of all children exiting and the other will never be notified. If you don't use Twisted's API for creating child processes at all, then this may be okay for you - but you might want to check to make sure any signal handler the multiprocessing module tries to install actually "wins" and doesn't get replaced by Twisted's own handler.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...