Last active
December 21, 2015 06:29
-
-
Save frankmayer/6264596 to your computer and use it in GitHub Desktop.
FIles for testing strange performance difference between Linux and Windows
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace PhpAmqpLib\Connection; | |
use PhpAmqpLib\Channel\AbstractChannel; | |
use PhpAmqpLib\Channel\AMQPChannel; | |
use PhpAmqpLib\Exception\AMQPProtocolConnectionException; | |
use PhpAmqpLib\Exception\AMQPRuntimeException; | |
use PhpAmqpLib\Exception\AMQPTimeoutException; | |
use PhpAmqpLib\Helper\MiscHelper; | |
use PhpAmqpLib\Wire\AMQPWriter; | |
use PhpAmqpLib\Wire\AMQPReader; | |
use PhpAmqpLib\Wire\IO\AbstractIO; | |
class AbstractConnection extends AbstractChannel | |
{ | |
public static $LIBRARY_PROPERTIES = array( | |
"library" => array('S', "PHP AMQP Lib"), | |
"library_version" => array('S', "2.0") | |
); | |
/** | |
* contructor parameters for clone | |
* @var array | |
*/ | |
protected $construct_params; | |
/** | |
* close the connection in destructur | |
* @var bool | |
*/ | |
protected $close_on_destruct = true ; | |
/** | |
* @var null|\PhpAmqpLib\Wire\IO\AbstractIO | |
*/ | |
protected $io = null; | |
public function __construct($user, $password, | |
$vhost="/",$insist=false, | |
$login_method="AMQPLAIN", | |
$login_response=null, | |
$locale="en_US", | |
AbstractIO $io) | |
{ | |
// save the params for the use of __clone | |
$this->construct_params = func_get_args(); | |
if ($user && $password) { | |
$login_response = new AMQPWriter(); | |
$login_response->write_table(array("LOGIN" => array('S',$user), | |
"PASSWORD" => array('S',$password))); | |
$login_response = substr($login_response->getvalue(),4); //Skip the length | |
} else { | |
$login_response = null; | |
} | |
$d = self::$LIBRARY_PROPERTIES; | |
while (true) { | |
$this->channels = array(); | |
// The connection object itself is treated as channel 0 | |
parent::__construct($this, 0); | |
$this->channel_max = 65535; | |
$this->frame_max = 131072; | |
$this->io = $io; | |
$this->input = new AMQPReader(null, $this->io); | |
$this->write($this->amqp_protocol_header); | |
$this->wait(array($this->waitHelper->get_wait('connection.start'))); | |
$this->x_start_ok($d, $login_method, $login_response, $locale); | |
$this->wait_tune_ok = true; | |
while ($this->wait_tune_ok) { | |
$this->wait(array( | |
$this->waitHelper->get_wait('connection.secure'), | |
$this->waitHelper->get_wait('connection.tune') | |
)); | |
} | |
$host = $this->x_open($vhost,"", $insist); | |
if (!$host) { | |
return; // we weren't redirected | |
} | |
// we were redirected, close the socket, loop and try again | |
$this->close_socket(); | |
} | |
} | |
/** | |
* cloning will use the old properties to make a new connection to the same server | |
*/ | |
public function __clone() | |
{ | |
call_user_func_array(array($this, '__construct'), $this->construct_params); | |
} | |
public function __destruct() | |
{ | |
if ($this->close_on_destruct) { | |
if (isset($this->input) && $this->input) { | |
// close() always tries to connect to the server to shutdown | |
// the connection. If the server has gone away, it will | |
// throw an error in the connection class, so catch it | |
// and shutdown quietly | |
try { | |
$this->close(); | |
} catch (\Exception $e) { } | |
} | |
} | |
} | |
public function select($sec, $usec = 0) | |
{ | |
return $this->getIO()->select($sec, $usec); | |
} | |
/** | |
* allows to not close the connection | |
* it`s useful after the fork when you don`t want to close parent process connection | |
* @param bool $close | |
*/ | |
public function set_close_on_destruct($close = true) | |
{ | |
$this->close_on_destruct = (bool) $close; | |
} | |
protected function close_socket() | |
{ | |
if ($this->debug) { | |
MiscHelper::debug_msg("closing socket"); | |
} | |
$this->getIO()->close(); | |
} | |
protected function write($data) | |
{ | |
if ($this->debug) { | |
MiscHelper::debug_msg("< [hex]:\n" . MiscHelper::hexdump($data, $htmloutput = false, $uppercase = true, $return = true)); | |
} | |
$this->getIO()->write($data); | |
} | |
protected function do_close() | |
{ | |
if (isset($this->input) && $this->input) { | |
$this->input->close(); | |
$this->input = null; | |
} | |
$this->close_socket(); | |
} | |
public function get_free_channel_id() | |
{ | |
for ($i=1; $i <= $this->channel_max; $i++) { | |
if (!isset($this->channels[$i])) { | |
return $i; | |
} | |
} | |
throw new AMQPRuntimeException("No free channel ids"); | |
} | |
public function send_content($channel, $class_id, $weight, $body_size, | |
$packed_properties, $body) | |
{ | |
$pkt = new AMQPWriter(); | |
$pkt->write_octet(2); | |
$pkt->write_short($channel); | |
$pkt->write_long(strlen($packed_properties)+12); | |
$pkt->write_short($class_id); | |
$pkt->write_short($weight); | |
$pkt->write_longlong($body_size); | |
$pkt->write($packed_properties); | |
$pkt->write_octet(0xCE); | |
$pkt = $pkt->getvalue(); | |
$this->write($pkt); | |
while ($body) { | |
$payload = substr($body,0, $this->frame_max-8); | |
$body = substr($body,$this->frame_max-8); | |
$pkt = new AMQPWriter(); | |
$pkt->write_octet(3); | |
$pkt->write_short($channel); | |
$pkt->write_long(strlen($payload)); | |
$pkt->write($payload); | |
$pkt->write_octet(0xCE); | |
$pkt = $pkt->getvalue(); | |
$this->write($pkt); | |
} | |
} | |
protected function send_channel_method_frame($channel, $method_sig, $args="") | |
{ | |
if ($args instanceof AMQPWriter) { | |
$args = $args->getvalue(); | |
} | |
$pkt = new AMQPWriter(); | |
$pkt->write_octet(1); | |
$pkt->write_short($channel); | |
$pkt->write_long(strlen($args)+4); // 4 = length of class_id and method_id | |
// in payload | |
$pkt->write_short($method_sig[0]); // class_id | |
$pkt->write_short($method_sig[1]); // method_id | |
$pkt->write($args); | |
$pkt->write_octet(0xCE); | |
$pkt = $pkt->getvalue(); | |
$this->write($pkt); | |
if ($this->debug) { | |
$PROTOCOL_CONSTANTS_CLASS = self::$PROTOCOL_CONSTANTS_CLASS; | |
MiscHelper::debug_msg("< " . MiscHelper::methodSig($method_sig) . ": " . | |
$PROTOCOL_CONSTANTS_CLASS::$GLOBAL_METHOD_NAMES[MiscHelper::methodSig($method_sig)]); | |
} | |
} | |
/** | |
* Wait for a frame from the server | |
*/ | |
protected function wait_frame($timeout = 0) | |
{ | |
// frankmayer debugging addition: Start measuring time | |
if (AMQP_DEBUG_FM === true ) { | |
$microtimeStart = microtime(true); | |
} | |
// frankmayer debugging addition | |
$currentTimeout = $this->input->getTimeout(); | |
$this->input->setTimeout($timeout); | |
try { | |
$frame_type = $this->input->read_octet(); | |
$channel = $this->input->read_short(); | |
$size = $this->input->read_long(); | |
$payload = $this->input->read($size); | |
$ch = $this->input->read_octet(); | |
} catch(AMQPTimeoutException $e) { | |
$this->input->setTimeout($currentTimeout); | |
throw $e; | |
} | |
$this->input->setTimeout($currentTimeout); | |
if ($ch != 0xCE) { | |
throw new AMQPRuntimeException(sprintf("Framing error, unexpected byte: %x", $ch)); | |
} | |
// frankmayer debugging addition: End measuring time & output result | |
if (AMQP_DEBUG_FM === true ) { | |
$microtimeEnd = microtime(true); | |
$microtimeDiff = $microtimeEnd -$microtimeStart; | |
echo " [.] AbstractConnection::wait_frame() Got response in: ",sprintf("%.4f", ($microtimeDiff))." seconds","\n"; | |
} | |
// end frankmayer debugging addition | |
return array($frame_type, $channel, $payload); | |
} | |
/** | |
* Wait for a frame from the server destined for | |
* a particular channel. | |
*/ | |
protected function wait_channel($channel_id, $timeout = 0) | |
{ | |
while (true) { | |
list($frame_type, $frame_channel, $payload) = $this->wait_frame($timeout); | |
if ($frame_channel == $channel_id) { | |
return array($frame_type, $payload); | |
} | |
// Not the channel we were looking for. Queue this frame | |
//for later, when the other channel is looking for frames. | |
array_push($this->channels[$frame_channel]->frame_queue, | |
array($frame_type, $payload)); | |
// If we just queued up a method for channel 0 (the Connection | |
// itself) it's probably a close method in reaction to some | |
// error, so deal with it right away. | |
if (($frame_type == 1) && ($frame_channel == 0)) { | |
$this->wait(); | |
} | |
} | |
} | |
/** | |
* Fetch a Channel object identified by the numeric channel_id, or | |
* create that object if it doesn't already exist. | |
*/ | |
public function channel($channel_id = null) | |
{ | |
if (isset($this->channels[$channel_id])) { | |
return $this->channels[$channel_id]; | |
} else { | |
$channel_id = $channel_id ? $channel_id : $this->get_free_channel_id(); | |
$ch = new AMQPChannel($this->connection, $channel_id); | |
$this->channels[$channel_id] = $ch; | |
return $ch; | |
} | |
} | |
/** | |
* request a connection close | |
*/ | |
public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0)) | |
{ | |
list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose( | |
$reply_code, | |
$reply_text, | |
$method_sig[0], | |
$method_sig[1] | |
); | |
$this->send_method_frame(array($class_id, $method_id), $args); | |
return $this->wait(array( | |
$this->waitHelper->get_wait('connection.close_ok') | |
)); | |
} | |
public static function dump_table($table) | |
{ | |
$tokens = array(); | |
foreach ($table as $name => $value) { | |
switch ($value[0]) { | |
case 'D': | |
$val = $value[1]->n . 'E' . $value[1]->e; | |
break; | |
case 'F': | |
$val = '(' . self::dump_table($value[1]) . ')'; | |
break; | |
case 'T': | |
$val = date('Y-m-d H:i:s', $value[1]); | |
break; | |
default: | |
$val = $value[1]; | |
} | |
$tokens[] = $name . '=' . $val; | |
} | |
return implode(', ', $tokens); | |
} | |
protected function connection_close($args) | |
{ | |
$reply_code = $args->read_short(); | |
$reply_text = $args->read_shortstr(); | |
$class_id = $args->read_short(); | |
$method_id = $args->read_short(); | |
$this->x_close_ok(); | |
throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id)); | |
} | |
/** | |
* confirm a connection close | |
*/ | |
protected function x_close_ok() | |
{ | |
$this->send_method_frame(array(10, 61)); | |
$this->do_close(); | |
} | |
/** | |
* confirm a connection close | |
*/ | |
protected function connection_close_ok($args) | |
{ | |
$this->do_close(); | |
} | |
protected function x_open($virtual_host, $capabilities="", $insist=false) | |
{ | |
$args = new AMQPWriter(); | |
$args->write_shortstr($virtual_host); | |
$args->write_shortstr($capabilities); | |
$args->write_bit($insist); | |
$this->send_method_frame(array(10, 40), $args); | |
$wait = array( | |
$this->waitHelper->get_wait('connection.open_ok') | |
); | |
if ($this->protocolVersion == '0.8') { | |
$wait[] = $this->waitHelper->get_wait('connection.redirect'); | |
} | |
return $this->wait($wait); | |
} | |
/** | |
* signal that the connection is ready | |
*/ | |
protected function connection_open_ok($args) | |
{ | |
$this->known_hosts = $args->read_shortstr(); | |
if ($this->debug) { | |
MiscHelper::debug_msg("Open OK! known_hosts: " . $this->known_hosts); | |
} | |
return null; | |
} | |
/** | |
* asks the client to use a different server | |
*/ | |
protected function connection_redirect($args) | |
{ | |
$host = $args->read_shortstr(); | |
$this->known_hosts = $args->read_shortstr(); | |
if ($this->debug) { | |
MiscHelper::debug_msg("Redirected to [". $host . "], known_hosts [" . $this->known_hosts . "]" ); | |
} | |
return $host; | |
} | |
/** | |
* security mechanism challenge | |
*/ | |
protected function connection_secure($args) | |
{ | |
$challenge = $args->read_longstr(); | |
} | |
/** | |
* security mechanism response | |
*/ | |
protected function x_secure_ok($response) | |
{ | |
$args = new AMQPWriter(); | |
$args->write_longstr($response); | |
$this->send_method_frame(array(10, 21), $args); | |
} | |
/** | |
* start connection negotiation | |
*/ | |
protected function connection_start($args) | |
{ | |
$this->version_major = $args->read_octet(); | |
$this->version_minor = $args->read_octet(); | |
$this->server_properties = $args->read_table(); | |
$this->mechanisms = explode(" ", $args->read_longstr()); | |
$this->locales = explode(" ", $args->read_longstr()); | |
if ($this->debug) { | |
MiscHelper::debug_msg(sprintf("Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s", | |
$this->version_major, | |
$this->version_minor, | |
self::dump_table($this->server_properties), | |
implode(', ', $this->mechanisms), | |
implode(', ', $this->locales))); | |
} | |
} | |
protected function x_start_ok($client_properties, $mechanism, $response, $locale) | |
{ | |
$args = new AMQPWriter(); | |
$args->write_table($client_properties); | |
$args->write_shortstr($mechanism); | |
$args->write_longstr($response); | |
$args->write_shortstr($locale); | |
$this->send_method_frame(array(10, 11), $args); | |
} | |
/** | |
* propose connection tuning parameters | |
*/ | |
protected function connection_tune($args) | |
{ | |
$v = $args->read_short(); | |
if ($v) { | |
$this->channel_max = $v; | |
} | |
$v = $args->read_long(); | |
if ($v) { | |
$this->frame_max = $v; | |
} | |
$this->heartbeat = $args->read_short(); | |
$this->x_tune_ok($this->channel_max, $this->frame_max, 0); | |
} | |
/** | |
* negotiate connection tuning parameters | |
*/ | |
protected function x_tune_ok($channel_max, $frame_max, $heartbeat) | |
{ | |
$args = new AMQPWriter(); | |
$args->write_short($channel_max); | |
$args->write_long($frame_max); | |
$args->write_short($heartbeat); | |
$this->send_method_frame(array(10, 31), $args); | |
$this->wait_tune_ok = false; | |
} | |
/** | |
* get socket from current connection | |
*/ | |
public function getSocket() | |
{ | |
return $this->sock; | |
} | |
/** | |
* @return \PhpAmqpLib\Wire\IO\AbstractIO | |
*/ | |
protected function getIO() { | |
return $this->io; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
define('AMQP_DEBUG_FM', true); | |
require_once __DIR__ . '/../vendor/autoload.php'; | |
use PhpAmqpLib\Connection\AMQPConnection; | |
use PhpAmqpLib\Message\AMQPMessage; | |
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); | |
$channel = $connection->channel(); | |
$channel->queue_declare('rpc_queue', false, false, false, false); | |
function fib($n) { | |
// if ($n == 0) | |
// return 0; | |
// if ($n == 1) | |
// return 1; | |
// return fib($n-1) + fib($n-2); | |
return $n; | |
} | |
echo " [x] Awaiting RPC requests\n"; | |
$callback = function($req) { | |
$n = intval($req->body); | |
echo " [.] fib(", $n, ")\n"; | |
$msg = new AMQPMessage( | |
(string) fib($n), | |
array('correlation_id' => $req->get('correlation_id')) | |
); | |
$req->delivery_info['channel']->basic_publish( | |
$msg, '', $req->get('reply_to')); | |
$req->delivery_info['channel']->basic_ack( | |
$req->delivery_info['delivery_tag']); | |
}; | |
$channel->basic_qos(null, 1, null); | |
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback); | |
while(count($channel->callbacks)) { | |
$channel->wait(); | |
} | |
$channel->close(); | |
$connection->close(); | |
?> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require_once __DIR__ . '/../vendor/autoload.php'; | |
use PhpAmqpLib\Connection\AMQPConnection; | |
use PhpAmqpLib\Message\AMQPMessage; | |
class FibonacciRpcClient | |
{ | |
private $connection; | |
private $channel; | |
private $callback_queue; | |
private $response; | |
private $corr_id; | |
public function __construct() | |
{ | |
$this->connection = new AMQPConnection( | |
'localhost', 5672, 'guest', 'guest'); | |
$this->channel = $this->connection->channel(); | |
list($this->callback_queue, ,) = $this->channel->queue_declare( | |
"", | |
false, | |
false, | |
true, | |
false | |
); | |
$this->channel->basic_consume( | |
$this->callback_queue, | |
'', | |
false, | |
false, | |
false, | |
false, | |
array($this, 'on_response') | |
); | |
} | |
public function on_response($rep) | |
{ | |
if ($rep->get('correlation_id') == $this->corr_id) { | |
$this->response = $rep->body; | |
} | |
} | |
public function call($n) | |
{ | |
$this->response = null; | |
$this->corr_id = uniqid(); | |
$msg = new AMQPMessage( | |
(string) $n, | |
array( | |
'correlation_id' => $this->corr_id, | |
'reply_to' => $this->callback_queue | |
) | |
); | |
$this->channel->basic_publish($msg, '', 'rpc_queue'); | |
while (!$this->response) { | |
$this->channel->wait(); | |
} | |
return intval($this->response); | |
} | |
} | |
define('AMQP_DEBUG_FM', true); | |
$fibonacci_rpc = new FibonacciRpcClient(); | |
for ($i = 0; $i <= 20000; $i++) { | |
$response = $fibonacci_rpc->call(30); | |
echo " [.] Got ", $response, "\n"; | |
} | |
?> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment