PHP - Multi threading and pools

1.8k Views Asked by At

I am using Pool object in PHP pthread, and made the following test script, to see how the pooling should work. I tought, that what pooling should do, is to get a given number of tasks, open up a maximum x number of workers, and assign them the tasks, and as soon as a worker finishes with a task, if more tasks are available, assign to that worker a new task.

Given the below example, and the above assumption:

class Work extends Threaded {
    public $id;

    public function __construct($id) {
        $this->id = $id;
    }

    public function run() {
        if ($this->id == 0) {
            sleep(3);
            echo $this->id . " is ready\n";
            return;
        } else {
            echo $this->id . " is ready\n";
            return;
        }
    }
}

$pool = new Pool(2, 'Worker', []);
for ($i=0; $i<4; $i++) $pool->submit(new Work($i));
while ($pool->collect());
$pool->shutdown();

I was expecting this script to output the following information:

1 is ready
2 is ready
3 is ready
0 is ready

because, there are essentially 2 workers available, and because of the sleep operatin the first worker stumbles upon, task 1,2,3 must be completed by the second worker.

Instead of this, the output I am getting is:

1 is ready
3 is ready
0 is ready
2 is ready

It is clear, that worker 1, gets assigned job 0, and job 2 at the get go, thus worker 2, after finishing job 1 and 3, just waits, instead of taking over job 2 from worker 1.

Is this a bug? Or is this intended to work this way?

My PHP version:

PHP 7.2.14 (cli) (built: Jan  9 2019 22:23:26) ( ZTS MSVC15 (Visual C++ 2017) x64 )
Copyright (c) 1997-2018 The PHP Group
Zend Engine v3.2.0, Copyright (c) 1998-2018 Zend Technologies
3

There are 3 best solutions below

2
rkeet On

For some reason my Docker has crapped itself now that I've updated Windows to 1809, so posting untested. (So sorry, no output to give atm)


Modified existing code I use in a project with your counter + sleep.

$pool = new Pool(2);
foreach ([0,1,2,3] as $count) {
    $pool->submit(
        new class ($count) extends Threaded
        {
            private $count;

            public function __construct(int $count)
            {
                $this->count= $count;
            }

            public function run()
            {
                if ($this->count== 0) {
                    sleep(3);
                    echo $this->count . " is ready\n";
                } else {
                    echo $this->count . " is ready\n";
                }
            }
        }
    );
}

while ($pool->collect());

$pool->shutdown();

I use anonymous class (new class ($count) extends Threaded) as the submit() param.

On the server this runs perfectly, using a Docker instance running PHP ZTS 7.2.13 on Alpine 3.8

0
Kuilo Skio On

Let me answer: from what I know about pthreads in php, pool is like number of proccessing php.exe that can be run at the same times.

So in your case, you define two pool by using new Pool(2, 'Worker', []);

So let's make abstract explanation about it. There is 2 Pool, call it as PoolA and PoolB.

Loop from 0 to 3, each loop submit task to Pool.

There are 4 tasks from 0 to 3, lets call them by task0, task1, task2, task3.

When loop occur, from my perspective, it should be queue like this

PoolA -> submit task0
PoolB -> submit task1
PoolA -> submit task2
PoolB -> submit task3

But from class Work that will be task0, ... till task3.

Situation/Condition

You define some logic in run() => when parameter(in this case $id from constructor) is 0, then sleep(3).

From this situation, PoolA is which submit task0 that contains parameter($id) is value 0, PoolA will wait for 3 seconds. PoolA also submit task2.

On the other hand, PoolB submit task1 and task3, from this situation, doesn't need to wait for 3 seconds.

So when while($pool->collect()); is run, possible queue that's most likely happen

task1    (PoolB)
task3    (PoolB)
task0    (PoolA)  ->>>> PoolA delayed because from task0 needs to sleep for 3 seconds
task2    (PoolA)

So I think it's correct when outputs are

1 is ready
3 is ready
0 is ready
2 is ready

There is a questions.

Why is only PoolA that delayed, even if PoolA delayed why task2 didn't submit to PoolB or why task1 or task3 not submit to PoolA??

Well, I don't understand too. I have task similar to yours, after many experiments, I'm not sure pthreads which use Pool & Threaded is multi-threading or multiprocessing.

0
Jared On

Echoing from the individual threads can be deceiving. I often find that they seem like they are executing before they are even called. I'd recommend avoiding echoing from inside threads, unless you don't care about the order, as it can be still be useful to test for specific circumstances, etc.

Below is some code which should resolve any questions of when the code is executing, as this code sorts the results by the actual time they executed. (It's also a nice example of how to get results back from a thread pool.)

<?php
class Work extends Threaded {
    public $id;
    public $data;
    private $complete = false;
    public function __construct($id) {
        $this->id = $id;
    }

    public function run() {
        $temp = array();
        if ($this->id == 0) {
            echo "<pre>".$this->id . " started (from inside threaded)";
            $temp[] = array(microtime(true), $this->id . " started");
            sleep(3);
        }
        echo "<pre>".$this->id . " is ready (from inside threaded)";
        $temp[] = array(microtime(true), $this->id . " is ready");
        $this->data = (array) $temp; // note: it's important to cast as array, otherwise you will get a volitile
        $this->complete = true;
    }

    public function isDone() {
        return $this->complete;
    }
}

// we create a custom pool, to pass on our results
class ExamplePool extends Pool {
    public $dataAr = array(); // used to return data after we're done
    private $numTasks = 0; // counter used to know when we're done
    private $numCompleted = 0; // keep track of how many threads finished
    /**
     * override the submit function from the parent
     * to keep track of our jobs
     */
    public function submit(Threaded $task) {
        $this->numTasks++;
        parent::submit($task);
    }
    /**
     * used to wait until all workers are done
     */
    public function process() {
        // Run this loop as long as we have
        // jobs in the pool
        while ($this->numCompleted < $this->numTasks) {
            $this->collect(function (Work $task) {
                // If a task was marked as done, collect its results
                if ($task->isDone()) {
                    //this is how you get your completed data back out [accessed by $pool->process()]
                    $this->dataAr = array_merge($this->dataAr, $task->data);
                    $this->numCompleted++;
                }
                return $task->isDone();
            });
        }
        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();
        return $this->dataAr;
    }
}

$pool = new ExamplePool(4);
for($i=0; $i<4; $i++) { 
    $pool->submit(new Work($i));
}
$retArr = $pool->process();
usort($retArr, 'sortResultsByTime'); // sort the results by time

// echo out the sorted results
echo "<br><br>";
for($i=0;$i<count($retArr);$i++){
    echo number_format($retArr[$i][0], 4, ".", "").' '.$retArr[$i][1]."\n";
}

function sortResultsByTime($a, $b) {
    return $a[0] > $b[0];
}
?>

Please note the code above yields this for me:

0 started (from inside threaded)
0 is ready (from inside threaded)
1 is ready (from inside threaded)
2 is ready (from inside threaded)
3 is ready (from inside threaded)

1609458117.8764 0 started
1609458117.8776 1 is ready
1609458117.8789 2 is ready
1609458117.8802 3 is ready
1609458120.8765 0 is ready

And as expected, the stuff echoed from inside the threads seems weird, however if you store the results, and sort them by the time they were executed, you can see it acts as expected.