Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
205 views
in Technique[技术] by (71.8m points)

multithreading - How to run PHP object methods in parallel and sync the results into array

Hi trying to find a way how to run PHP object method in parallel.

Have looked through few solutions on multi-threading with PHP however can't seem to find a way to run object methods in parallel, can someone explain what am i doing wrong and suggest a fix on any of the solutions or alternative example with Country class where get_data method would be running in multiple parallel processes?

  1. pcntl_fork() - Forking with PHP
  2. Pthreads - PHP extension
  3. misterion/ko-process - composer package
  4. duncan3dc/fork-helper - composer package
  5. illuminate/queue - composer package

Testing pcntl_fork()

    <?php

    class Countries {
        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



    // how to add and control a limit of max processes running at the time?

    $start_time = microtime(true);

    foreach($countries as $country) {

        $pid = pcntl_fork();

        if (!$pid) {

            error_log( date('Y-m-d H:i:s').' - In child  '.$country." 
", 3, $log);

            // How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
            $d[$country] = $os->get_data($country);

            error_log( date('Y-m-d H:i:s').' - !pid -> d['.$country.']  ='.var_export($d[$country],true)." 
", 3, $log);
            exit($country);
        }
    }

    while (pcntl_waitpid(0, $status) != -1);
    // do something with $d[$country] here after all child processes completed


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."
", 3, $log);





?>

Testing Pthreads

<?php


if (extension_loaded('pthreads')) {

    $pool = new Pool(4);

    class Countries {
        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");


    $start_time = microtime(true);
    foreach($countries as $country) {   
        $dataN = new Threaded();
        $dataN->country = $country;
        $dataN->os = $os;
        $dataN->result = "";

        $threads[] = $dataN;

        $pool->submit(
            new class($dataN) extends Threaded {
                public $data;

                public function __construct($data)
                {
                    $this->data = $data;
                }

                public function run()
                {

                    $this->data->result = $this->data->os->get_data($this->data->country);

                }
            }
        );

    }


    while ($pool->collect());

    $pool->shutdown();

    foreach ($threads as $thread) {

        error_log( date('Y-m-d H:i:s').' - d['.$thread->country.'] = '.var_export($thread->result,true)."
", 3, $log);
        $d[$thread->country] = $thread->result;

    }

    // do something with $d[$country] here after all child processes completed

    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 2. PHP PThreads example duration='.$duration."
", 3, $log);
}else{
    error_log( date('Y-m-d H:i:s').' - pthreads extension is not loaded!'."
", 3, $log);

}   

?>

Testing misterion/ko-process

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';

    class Countries {
        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



    // how to add and control a limit of max processes running at the time?

    $start_time = microtime(true);



    $manager = new KoProcessManager();

    foreach($countries as $country) {

        $manager->fork(function(KoProcess $p) {
            error_log( date('Y-m-d H:i:s').' - In child  '.$country." 
", 3, $log);
            // How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
            $d[$country] = $os->get_data($country);
        });

    }

    error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish... 
", 3, $log);  
    $manager->wait();

    error_log( date('Y-m-d H:i:s')." - threads finished. 
", 3, $log); 

    // do something with $d[$country] here after all child processes completed


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 3. misterion/ko-process example duration='.$duration."
", 3, $log);



?>

Testing duncan3dc/fork-helper

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';


    class Countries {
        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



    // how to add and control a limit of max processes running at the time?

    $start_time = microtime(true);


    $fork = new duncan3dcForkerFork;

    foreach($countries as $country) {

        $fork->call(function () {
            error_log( date('Y-m-d H:i:s').' - In child  '.$country." 
", 3, $log);
            // How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
            $d[$country] = $os->get_data($country);

        });


    }

    error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish... 
", 3, $log);  

    $fork->wait();
    error_log( date('Y-m-d H:i:s')." - threads finished. 
", 3, $log); 

    // do something with $d[$country] here after all child processes completed


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 3. duncan3dc/fork-helper example duration='.$duration."
", 3, $log);





?>

Testing illuminate/queue

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';


    class Countries {

        public $data;

        function __construct($country){
                $this->data[$country] = $this->get_data($country);
        }

        function get_data($country){

            usleep(1000);
            foreach($i=0; $i<1000;$i++ ){
                $data[$i] = $country;
            }

            return $data;

        }
    }

    $os = new Countries;

    $countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");

    use IlluminateQueueCapsuleManager as Queue;

    $queue = new Queue;

    $queue->addConnection([
        'driver' => 'beanstalkd',
        'host' => 'localhost',
        'queue' => 'default',
    ]);

    // Make this Capsule instance available globally via static methods... (optional)
    //$queue->setAsGlobal();


    // how to add and control a limit of max processes running at the same time?
    foreach($countries as $country) {
        $d[$country] = $queue->push('Countries', array("country"=>$country));
    }
    // how to get results after all processes completed into $d[$country]?
    // do something with results


    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration,3);
    error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."
", 3, $log);


?>              
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I can't help with pthreads, ko-process, fork-helper, or queue (I simply don't have experience using them), but this is one method to get your code working with pcntl_fork and using sockets to pass messages between child and parent process:

<?php
    class Countries {
        function get_data($country){
            usleep(1000);
            for($i=0; $i<1000; $i++){
                $data[$i] = $country;
            }

            return $data;
        }
    }

    $os = new Countries;

    $countries = ["GB", "US", "FR", "DE", "IT", "ES", "LT", "BR", "BE", "JP", "CN"];

    // To answer your question about limiting the number of concurrent processes, you
    // need to limit the number of times you call pctnl_fork(). You might do something
    // like:
    //    1. Chunk the $countries array: [["GB", "US"], ["FR", "DE"], ["IT", "ES"], ...
    //    2. Call pctnl_fork() once for each inner array (half as many)
    //    3. Child process calls $os->get_data() once for each country in the sub-array
    //
    // Another solution is to utilize what's known as a "Pool" -- where you give a
    // collection of tasks to a class which spins up threads for you and hands tasks to
    // threads as they become available. This method abstracts the complexity of
    // multiprocessing, but will require you to find a third-party library you like or
    // implement the Pool class on your own.
    $start_time = microtime(true);

    // Initialize $d in the parent thread (before pcntl_fork())
    $d = [];

    // Keep a list of child processes, so that we can wait for ALL of them to terminate
    $pids = [];

    // Initialize a socket for message passing (see below)
    socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $socket);

    foreach($countries as $country) {
        $pid = pcntl_fork();

        if (!$pid) {
            error_log( date('Y-m-d H:i:s').' - In child  '.$country." 
", 3, $log);

            // To answer your question about how to collect the result in the $d array,
            // you need to pass the results back to the parent thread via some message
            // channel. The easiest solution I know of is a socket pair.
            //
            // In order for the socket to be available to both the parent and child,
            // the socket must be created before you fork (see above)
            $data = serialize($os->get_data($country));

            // Sockets are just raw byte streams with no organization or semantics. It's
            // up to you to understand the output of the socket. I devised a basic
            // protocol here where I begin with the country code, follow it up with a
            // serialized data structure, then terminate with a double-new-line
            socket_write($socket[0], $country . " " . $data . "

");
            socket_close($socket[0]);
            exit();
        }

        $pids[] = $pid;
    }

    // Wait for all child processes to finish
    foreach($pids as $pid) {
        pcntl_waitpid($pid, $status);
    }

    // Keep reading from the socket until there's no data left
    $new_data = socket_read($socket[1], 1024);
    $data = $new_data;
    while(strlen($new_data) == 1024) {
        $new_data = socket_read($socket[1], 1024);
        $data .= $new_data;
    }

    // Split at double-new-line to get individual return values
    $results = explode("

", $data);

    // Now parse the results (per my custom protocol I defined above)
    foreach($results as $result) {
        $country = substr($result, 0, 2);
        $result = substr($result, 3);
        $d[$country] = unserialize($result);
    }

    $end_time = microtime(true);
    $duration = $end_time - $start_time;
    $duration = number_format($duration, 3);

    error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."
", 3, $log);

?>

One thing I do want to note: Many times multiprocessing doesn't magically make programs run faster like people think it will. If a task is CPU-bound (that is, you're spending all of your time performing complex CPU operations) then multiprocessing will either have no effect or make it slower. If a task is IO-bound (that is, you spend all of your time waiting on network or disk operations to complete) then you can speed it up dramatically by allowing the processor to do meaningful work instead of sitting on its hands and waiting.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...