Hi trying to find a way how to run PHP object method in parallel.
Have looked through few solutions on multi-threading with PHP however can't seem to find a way to run object methods in parallel, can someone explain what am i doing wrong and suggest a fix on any of the solutions or alternative example with Country class where get_data method would be running in multiple parallel processes?
- pcntl_fork() - Forking with PHP
- Pthreads - PHP extension
- misterion/ko-process - composer package
- duncan3dc/fork-helper - composer package
- illuminate/queue - composer package
Testing pcntl_fork()
<?php
class Countries {
function get_data($country){
usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}
return $data;
}
}
$os = new Countries;
$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");
// how to add and control a limit of max processes running at the time?
$start_time = microtime(true);
foreach($countries as $country) {
$pid = pcntl_fork();
if (!$pid) {
error_log( date('Y-m-d H:i:s').' - In child '.$country."
", 3, $log);
// How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
$d[$country] = $os->get_data($country);
error_log( date('Y-m-d H:i:s').' - !pid -> d['.$country.'] ='.var_export($d[$country],true)."
", 3, $log);
exit($country);
}
}
while (pcntl_waitpid(0, $status) != -1);
// do something with $d[$country] here after all child processes completed
$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."
", 3, $log);
?>
Testing Pthreads
<?php
if (extension_loaded('pthreads')) {
$pool = new Pool(4);
class Countries {
function get_data($country){
usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}
return $data;
}
}
$os = new Countries;
$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");
$start_time = microtime(true);
foreach($countries as $country) {
$dataN = new Threaded();
$dataN->country = $country;
$dataN->os = $os;
$dataN->result = "";
$threads[] = $dataN;
$pool->submit(
new class($dataN) extends Threaded {
public $data;
public function __construct($data)
{
$this->data = $data;
}
public function run()
{
$this->data->result = $this->data->os->get_data($this->data->country);
}
}
);
}
while ($pool->collect());
$pool->shutdown();
foreach ($threads as $thread) {
error_log( date('Y-m-d H:i:s').' - d['.$thread->country.'] = '.var_export($thread->result,true)."
", 3, $log);
$d[$thread->country] = $thread->result;
}
// do something with $d[$country] here after all child processes completed
$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 2. PHP PThreads example duration='.$duration."
", 3, $log);
}else{
error_log( date('Y-m-d H:i:s').' - pthreads extension is not loaded!'."
", 3, $log);
}
?>
Testing misterion/ko-process
<?php
require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';
class Countries {
function get_data($country){
usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}
return $data;
}
}
$os = new Countries;
$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");
// how to add and control a limit of max processes running at the time?
$start_time = microtime(true);
$manager = new KoProcessManager();
foreach($countries as $country) {
$manager->fork(function(KoProcess $p) {
error_log( date('Y-m-d H:i:s').' - In child '.$country."
", 3, $log);
// How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
$d[$country] = $os->get_data($country);
});
}
error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish...
", 3, $log);
$manager->wait();
error_log( date('Y-m-d H:i:s')." - threads finished.
", 3, $log);
// do something with $d[$country] here after all child processes completed
$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 3. misterion/ko-process example duration='.$duration."
", 3, $log);
?>
Testing duncan3dc/fork-helper
<?php
require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';
class Countries {
function get_data($country){
usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}
return $data;
}
}
$os = new Countries;
$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");
// how to add and control a limit of max processes running at the time?
$start_time = microtime(true);
$fork = new duncan3dcForkerFork;
foreach($countries as $country) {
$fork->call(function () {
error_log( date('Y-m-d H:i:s').' - In child '.$country."
", 3, $log);
// How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
$d[$country] = $os->get_data($country);
});
}
error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish...
", 3, $log);
$fork->wait();
error_log( date('Y-m-d H:i:s')." - threads finished.
", 3, $log);
// do something with $d[$country] here after all child processes completed
$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 3. duncan3dc/fork-helper example duration='.$duration."
", 3, $log);
?>
Testing illuminate/queue
<?php
require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';
class Countries {
public $data;
function __construct($country){
$this->data[$country] = $this->get_data($country);
}
function get_data($country){
usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}
return $data;
}
}
$os = new Countries;
$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");
use IlluminateQueueCapsuleManager as Queue;
$queue = new Queue;
$queue->addConnection([
'driver' => 'beanstalkd',
'host' => 'localhost',
'queue' => 'default',
]);
// Make this Capsule instance available globally via static methods... (optional)
//$queue->setAsGlobal();
// how to add and control a limit of max processes running at the same time?
foreach($countries as $country) {
$d[$country] = $queue->push('Countries', array("country"=>$country));
}
// how to get results after all processes completed into $d[$country]?
// do something with results
$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."
", 3, $log);
?>
See Question&Answers more detail:
os