Worker::collect
is not intended to enable you to reap results; It is non-deterministic.
Worker::collect
is only intended to run garbage collection on objects referenced in the stack of Worker
objects.
If the intention is to process each result as it becomes available, the code might look something like this:
<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
$found = 0;
while (@$i++ < $expected) {
$pool->submit(new class($i, $results) extends Threaded {
public function __construct($id, Volatile $results) {
$this->id = $id;
$this->results = $results;
}
public function run() {
$result = file_get_contents('http://google.fr?q=' . $this->id);
$this->results->synchronized(function($results, $result){
$results[$this->id] = $result;
$results->notify();
}, $this->results, $result);
}
private $id;
private $results;
});
}
do {
$next = $results->synchronized(function() use(&$found, $results) {
while (!count($results)) {
$results->wait();
}
$found++;
return $results->shift();
});
var_dump($next);
} while ($found < $expected);
while ($pool->collect()) continue;
$pool->shutdown();
?>
This is obviously not very tolerant of errors, but the main difference is that I use a shared Volatile
collection of results, and I synchronize properly to fetch results in the main context as they become available.
If you wanted to wait for all results to become available, and possibly avoid some contention for locks - which you should always try to avoid if you can - then the code would look simpler, something like:
<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
while (@$i++ < $expected) {
$pool->submit(new class($i, $results) extends Threaded {
public function __construct($id, Volatile $results) {
$this->id = $id;
$this->results = $results;
}
public function run() {
$result = file_get_contents('http://google.fr?q=' . $this->id);
$this->results->synchronized(function($results, $result){
$results[$this->id] = $result;
$results->notify();
}, $this->results, $result);
}
private $id;
private $results;
});
}
$results->synchronized(function() use($expected, $results) {
while (count($results) != $expected) {
$results->wait();
}
});
var_dump(count($results));
while ($pool->collect()) continue;
$pool->shutdown();
?>
Noteworthy that the Collectable
interface is already implemented by Threaded
in the most recent versions of pthreads - which is the one you should be using ... always ...
The docs are out of date, sorry about that ... one human ...
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…