Created
September 10, 2014 18:39
-
-
Save 2072/19a0a8ce38768e2f792a to your computer and use it in GitHub Desktop.
Pthreads (PHP) shared object management
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// PHP PThreads issue 329 diagnostic script by John Wellesz | |
// Also a nice example of shared object management... | |
const ONEMUSEC = 1000000; | |
printf('file age: %d', time() - filemtime(__file__)); | |
echo PHP_EOL; | |
// a threaded object shared between different thread contexts | |
class sharedObject extends \Threaded { | |
public $name = ""; | |
public function __construct($name) { | |
$this->name = $name; | |
} | |
public function getName() { | |
return $this->name; | |
} | |
} | |
// a thread that uses the sharedObject | |
class objectUserThread extends \Thread { | |
public $broker = null; | |
public $id = 0; | |
public $objectOps = 0; | |
public function __construct (objectServer $broker, $id) { | |
$this->broker = $broker; | |
$this->id = $id; | |
} | |
public function printStatus() { | |
printf ("%s-%d status: made %d operations on shared objects.%s" | |
, __CLASS__ | |
, $this->id | |
, $this->objectOps | |
, PHP_EOL | |
); | |
} | |
public function run () { | |
$myID = Thread::getCurrentThreadId(); | |
$loop = 0; | |
$objNum = 0; | |
for (;;) { | |
//usleep(mt_rand(ONEMUSEC/100, ONEMUSEC/10)); | |
$ops = 0; | |
$objNum = $this->broker->getObjectNum(); | |
if (! $objNum || mt_rand(1, 50) == 1 && $objNum < 10) { | |
$this->broker->createObject(sprintf('%X-%d', substr(dechex($myID), -1 * mt_rand(1, strlen(dechex($myID)))), $loop)); | |
++$ops; | |
} | |
$objects = $this->broker->getObjects(mt_rand(1, $objNum ? $objNum : 1)); | |
$todelete = []; | |
// do something with those objects | |
foreach($objects as $name=>$obj) { | |
if (! $obj instanceof sharedObject) | |
throw new \Exception(sprintf( | |
"Something is wrong: obj '%s' is a %s, obtained objects: %s %s", $name, gettype($obj), print_r(array_keys($objects), true), PHP_EOL | |
)); | |
if ($name !== $obj->getName()) | |
throw new \Exception( | |
"Something is wrong: \$name !== \$obj->getName()" | |
); | |
++$ops; | |
if (mt_rand(1, 10000) == 1) | |
$todelete[] = $name; | |
} | |
foreach ($todelete as $name) { | |
$this->broker->runQuery('delete', [$name]); | |
++$ops; | |
} | |
$this->objectOps += $ops; | |
++$loop; | |
} | |
} | |
} | |
// the thread that creates and hold internal references to sharedObject | |
// instances | |
class objectServer extends \Thread { | |
public $queryType = ""; | |
public $queryOrigin = null; | |
public $queryParams = null; | |
public $queryResults = null; | |
public $queryServedCount = 0; | |
public $getCount = 0; | |
public $getnumCount = 0; | |
public $createCount = 0; | |
// init and start the thread | |
public function __construct() { | |
$this->queryOrigin = null; // the id of the querying thread | |
$this->queryType = 'init'; | |
$this->start(); | |
do { | |
$this->synchronized(function() { | |
if ($this->queryType !== null) | |
if (!$this->wait(10 * ONEMUSEC)) { | |
if ($this->hasStopped()) { | |
$this->queryType = false; | |
} | |
} | |
}); | |
} while ($this->queryType !== null); | |
printf("Object broker thread ready for query%s", PHP_EOL); | |
} | |
public function printStatus() { | |
vprintf (__CLASS__ . " status: served %d queries: g:%s, c:%d, gn:%d, holding %d objects. Mem usage: %s" . PHP_EOL | |
, $this->runQuery('getStats') | |
); | |
} | |
private function doGet(array &$objectRepo, array &$deletedRepo) { | |
$number = $this->queryParams->shift(); | |
$origin = $this->queryOrigin; | |
if ($number > count($objectRepo)) | |
$number = count($objectRepo); | |
$objectIds = array_keys($objectRepo); | |
shuffle($objectIds); | |
$objects = []; | |
for ($i = 0 ; $i < $number ; $i++) { | |
$objectID = $objectIds[$i]; | |
$object = $objectRepo[$objectID]; | |
$object->lock(); | |
if (! isset($deletedRepo[$objectID])) | |
$objects[$objectID] = $objectRepo[$objectID]; | |
$object->unlock(); | |
} | |
$this->queryResults->merge($objects); | |
$this->getCount += 1; | |
} | |
public function run () { | |
// reference holder for internal shared threaded instances | |
$holder = []; | |
$holder['properties'] = [ | |
'queryParams' => new \Threaded(), | |
'queryResults' => new \Threaded(), | |
]; | |
$this->merge($holder['properties']); | |
// external shared object reference holder | |
$objectRepo = []; | |
// object pending deletion | |
$deletedRepo = []; | |
// main loop that must never stop until everything else has | |
for (;;) { | |
$queryType = $this->notifyAndWaitForQuery(); | |
$now = time(); | |
// remove objects deleted more than five seconds ago | |
foreach ($deletedRepo as $objectID=>$time) | |
if ($now - $time > 5) { | |
unset($deletedRepo[$objectID]); | |
unset($objectRepo[$objectID]); | |
} | |
switch ($queryType) { | |
case 'getObjectNum': | |
$this->queryResults->merge([count($objectRepo)]); | |
$this->getnumCount += 1; | |
break; | |
case 'delete': | |
$objectID = $this->queryParams->shift(); | |
if (! isset($deletedRepo[$objectID])) | |
$deletedRepo[$objectID] = $now; | |
$this->queryResults->merge([true]); | |
break; | |
case 'get': | |
$this->doGet($objectRepo, $deletedRepo); | |
break; | |
case 'getStats': | |
$this->queryResults->merge([[ | |
$this->queryServedCount | |
, $this->getCount | |
, $this->createCount | |
, $this->getnumCount | |
, count($objectRepo) | |
, number_format(memory_get_usage(true)) | |
]]); | |
break; | |
case 'create': | |
$objectID = $this->queryParams->shift(); | |
if (! isset($objectRepo[$objectID])) { | |
$objectRepo[$objectID] = new sharedObject($objectID); | |
$ret = true; | |
} else | |
$ret = false; | |
$this->queryResults->merge([$ret]); | |
$this->createCount += 1; | |
printf('Created object: %s%s', $objectID, PHP_EOL); | |
break; | |
default: | |
printf ('Unsupported queryType: %s%s', var_Export($queryType, true), PHP_EOL); | |
break; | |
} | |
$this->queryServedCount += 1; | |
} | |
} | |
public function getObjectNum () { | |
return $this->runQuery('getObjectNum'); | |
} | |
public function getObjects ($number) { | |
return $this->runQuery('get', [$number]); | |
} | |
public function createObject ($name) { | |
return $this->runQuery('create', [$name]); | |
} | |
private function notifyAndWaitForQuery() { | |
$firstPass = true; | |
do { | |
// wait for instructions | |
$this->synchronized(function() use ($firstPass) { | |
if ($firstPass) { | |
$this->queryType = null; | |
$this->notify(); | |
} | |
if ($this->queryType === null) | |
$this->wait(10 * ONEMUSEC); | |
}); | |
$firstPass = false; | |
} while ($this->queryType === null); | |
return $this->queryType; | |
} | |
public function submitQueryAndWait($queryType) { | |
$this->queryOrigin = Thread::getCurrentThreadId(); | |
$pass = 1; | |
do { | |
if ($pass > 1 && $this->hasStopped()) | |
throw new \Exception($this, 'Crashed'); | |
$this->synchronized(function($queryType) use ($pass) { | |
if ($pass == 1) { | |
$this->queryType = $queryType; | |
$this->notify(); | |
} | |
if ($this->queryType !== null) | |
$this->wait(10 * ONEMUSEC); | |
}, $queryType); | |
++$pass; | |
} while ($this->queryType !== null); | |
} | |
protected function runQuery ($type, array $params = null) { | |
if ($this->queryParams->count()) | |
throw new \Exception('Unfinished query...'); | |
if ($params) | |
$this->queryParams->merge($params); | |
$this->submitQueryAndWait($type); | |
// extract the results and return them to the caller | |
$queryResults = $this->queryResults; | |
$queryResults->lock(); | |
$i = $queryResults->count(); | |
if ($i) | |
$result = $queryResults->chunk($i, true); | |
else | |
$result = []; | |
$queryResults->unlock(); | |
if ($i != count($result)) | |
throw new \Exception('!!!! $i != count($result) %d/%d %s', $i, count($result), PHP_EOL); | |
// if there is only one and there is no specific key | |
if ($i == 1 && isset($result[0])) | |
// return THE result directly instead of an array | |
return $result[0]; | |
else | |
return $result; | |
} | |
} | |
$broker = new objectServer(); | |
$user_1 = new objectUserThread($broker, 1); | |
$user_2 = new objectUserThread($broker, 2); | |
$user_3 = new objectUserThread($broker, 3); | |
$user_1->start(); | |
$user_2->start(); | |
$user_3->start(); | |
for (;;) { | |
sleep(1); | |
$broker->printStatus(); | |
$user_1->printStatus(); | |
$user_2->printStatus(); | |
$user_3->printStatus(); | |
echo PHP_EOL; | |
} | |
echo PHP_EOL; | |
echo PHP_EOL; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment