vendor/stomp-php/stomp-php/src/Client.php line 213

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Stomp package.
  4.  *
  5.  * For the full copyright and license information, please view the LICENSE
  6.  * file that was distributed with this source code.
  7.  */
  8. namespace Stomp;
  9. use Stomp\Exception\ConnectionException;
  10. use Stomp\Exception\MissingReceiptException;
  11. use Stomp\Exception\StompException;
  12. use Stomp\Exception\UnexpectedResponseException;
  13. use Stomp\Network\Connection;
  14. use Stomp\Protocol\Protocol;
  15. use Stomp\Protocol\Version;
  16. use Stomp\Transport\Frame;
  17. /**
  18.  * Stomp Client
  19.  *
  20.  * This is the minimal implementation of a Stomp Client, it allows to send and receive Frames using the Stomp Protocol.
  21.  *
  22.  * @package Stomp
  23.  * @author Hiram Chirino <hiram@hiramchirino.com>
  24.  * @author Dejan Bosanac <dejan@nighttale.net>
  25.  * @author Michael Caplan <mcaplan@labnet.net>
  26.  * @author Jens Radtke <swefl.oss@fin-sn.de>
  27.  */
  28. class Client
  29. {
  30.     /**
  31.      * Perform request synchronously
  32.      *
  33.      * @var boolean
  34.      */
  35.     private $sync true;
  36.     /**
  37.      * Client id used for durable subscriptions
  38.      *
  39.      * @var string
  40.      */
  41.     private $clientId;
  42.     /**
  43.      * Connection session id
  44.      *
  45.      * @var string|null
  46.      */
  47.     private $sessionId;
  48.     /**
  49.      * Frames that have been read but not processed yet.
  50.      *
  51.      * @var Frame[]
  52.      */
  53.     private $unprocessedFrames = [];
  54.     /**
  55.      * @var Connection|null
  56.      */
  57.     private $connection;
  58.     /**
  59.      *
  60.      * @var Protocol|null
  61.      */
  62.     private $protocol;
  63.     /**
  64.      * Seconds to wait for a receipt.
  65.      *
  66.      * @var float
  67.      */
  68.     private $receiptWait 2;
  69.     /**
  70.      *
  71.      * @var string
  72.      */
  73.     private $login;
  74.     /**
  75.      *
  76.      * @var string
  77.      */
  78.     private $passcode;
  79.     /**
  80.      *
  81.      * @var array
  82.      */
  83.     private $versions = [Version::VERSION_1_0Version::VERSION_1_1Version::VERSION_1_2];
  84.     /**
  85.      *
  86.      * @var string
  87.      */
  88.     private $host;
  89.     /**
  90.      *
  91.      * @var int[]
  92.      */
  93.     private $heartbeat = [00];
  94.     /**
  95.      * @var bool
  96.      */
  97.     private $isConnecting false;
  98.     /**
  99.      * Constructor
  100.      *
  101.      * @param string|Connection $broker Broker URL or a connection
  102.      * @see Connection::__construct()
  103.      */
  104.     public function __construct($broker)
  105.     {
  106.         $this->connection $broker instanceof Connection $broker : new Connection($broker);
  107.     }
  108.     /**
  109.      * Configure versions to support.
  110.      *
  111.      * @param array $versions defaults to all client supported versions
  112.      */
  113.     public function setVersions(array $versions)
  114.     {
  115.         $this->versions $versions;
  116.     }
  117.     /**
  118.      * Configure the login to use.
  119.      *
  120.      * @param string $login
  121.      * @param string $passcode
  122.      */
  123.     public function setLogin($login$passcode)
  124.     {
  125.         $this->login $login;
  126.         $this->passcode $passcode;
  127.     }
  128.     /**
  129.      * Sets an fixed vhostname, which will be passed on connect as header['host'].
  130.      *
  131.      * (null = Default value is the hostname determined by connection.)
  132.      *
  133.      * @param string $host
  134.      */
  135.     public function setVhostname($host null)
  136.     {
  137.         $this->host $host;
  138.     }
  139.     /**
  140.      * Set the desired heartbeat for the connection.
  141.      *
  142.      * A heartbeat is a specific message that will be send / received when no other data is send / received
  143.      * within an interval - to indicate that the connection is still stable. If client and server agree on a beat and
  144.      * the interval passes without any data activity / beats the connection will be considered as broken and closed.
  145.      *
  146.      * If you want to make sure that the server is still available, you should use the ServerAliveObserver
  147.      * in combination with an requested server heartbeat interval.
  148.      *
  149.      * If you define a heartbeat for client side, you must assure that
  150.      * your application will send data within the interval.
  151.      * You can add \Stomp\Network\Observer\HeartbeatEmitter to your connection in order to send beats automatically.
  152.      *
  153.      * If you don't use HeartbeatEmitter you must either send messages within the interval
  154.      * or make calls to Connection::sendAlive()
  155.      *
  156.      * @param int $send
  157.      *   Number of milliseconds between expected sending of heartbeats. 0 means
  158.      *   no heartbeats sent.
  159.      * @param int $receive
  160.      *   Number of milliseconds between expected receipt of heartbeats. 0 means
  161.      *   no heartbeats expected. (not yet supported by this client)
  162.      * @see \Stomp\Network\Observer\ServerAliveObserver
  163.      * @see \Stomp\Network\Observer\HeartbeatEmitter
  164.      * @see \Stomp\Network\Connection::sendAlive()
  165.      */
  166.     public function setHeartbeat($send 0$receive 0)
  167.     {
  168.         $this->heartbeat = [$send$receive];
  169.     }
  170.     /**
  171.      * Connect to server
  172.      *
  173.      * @return boolean
  174.      * @throws StompException
  175.      * @see setVhostname
  176.      */
  177.     public function connect()
  178.     {
  179.         if ($this->isConnected()) {
  180.             return true;
  181.         }
  182.         $this->isConnecting true;
  183.         $this->connection->connect();
  184.         $this->connection->getParser()->legacyMode(true);
  185.         $this->protocol = new Protocol($this->clientId);
  186.         $this->host $this->host ?: $this->connection->getHost();
  187.         $connectFrame $this->protocol->getConnectFrame(
  188.             $this->login,
  189.             $this->passcode,
  190.             $this->versions,
  191.             $this->host,
  192.             $this->heartbeat
  193.         );
  194.         $this->sendFrame($connectFramefalse);
  195.         if ($frame $this->getConnectedFrame()) {
  196.             $version = new Version($frame);
  197.             if ($version->hasVersion(Version::VERSION_1_1)) {
  198.                 $this->connection->getParser()->legacyMode(false);
  199.             }
  200.             $this->sessionId $frame['session'];
  201.             $this->protocol $version->getProtocol($this->clientId);
  202.             $this->isConnecting false;
  203.             return true;
  204.         }
  205.         throw new ConnectionException('Connection not acknowledged');
  206.     }
  207.     /**
  208.      * Returns the next available frame from the connection, respecting the connect timeout.
  209.      *
  210.      * @return null|Frame
  211.      * @throws ConnectionException
  212.      * @throws Exception\ErrorFrameException
  213.      */
  214.     private function getConnectedFrame()
  215.     {
  216.         $deadline microtime(true) + $this->getConnection()->getConnectTimeout();
  217.         do {
  218.             if ($frame $this->connection->readFrame()) {
  219.                 return $frame;
  220.             }
  221.         } while (microtime(true) <= $deadline);
  222.         return null;
  223.     }
  224.     /**
  225.      * Send a message to a destination in the messaging system
  226.      *
  227.      * @param string $destination Destination queue
  228.      * @param string|Frame $msg Message
  229.      * @param array $header
  230.      * @param boolean $sync Perform request synchronously
  231.      * @return boolean
  232.      */
  233.     public function send($destination$msg, array $header = [], $sync null)
  234.     {
  235.         if (!$msg instanceof Frame) {
  236.             return $this->send($destination, new Frame('SEND'$header$msg), [], $sync);
  237.         }
  238.         $msg->addHeaders($header);
  239.         $msg['destination'] = $destination;
  240.         return $this->sendFrame($msg$sync);
  241.     }
  242.     /**
  243.      * Send a frame.
  244.      *
  245.      * @param Frame $frame
  246.      * @param boolean $sync
  247.      * @return boolean
  248.      */
  249.     public function sendFrame(Frame $frame$sync null)
  250.     {
  251.         if (!$this->isConnecting && !$this->isConnected()) {
  252.             $this->connect();
  253.         }
  254.         // determine if client was configured to write sync or not
  255.         $writeSync $sync !== null $sync $this->sync;
  256.         if ($writeSync) {
  257.             return $this->sendFrameExpectingReceipt($frame);
  258.         } else {
  259.             return $this->connection->writeFrame($frame);
  260.         }
  261.     }
  262.     /**
  263.      * Write frame to server and expect an matching receipt frame
  264.      *
  265.      * @param Frame $stompFrame
  266.      * @return bool
  267.      */
  268.     protected function sendFrameExpectingReceipt(Frame $stompFrame)
  269.     {
  270.         $receipt md5(microtime());
  271.         $stompFrame['receipt'] = $receipt;
  272.         $this->connection->writeFrame($stompFrame);
  273.         return $this->waitForReceipt($receipt);
  274.     }
  275.     /**
  276.      * Wait for an receipt
  277.      *
  278.      * @param string $receipt
  279.      * @return boolean
  280.      * @throws UnexpectedResponseException If response has an invalid receipt.
  281.      * @throws MissingReceiptException     If no receipt is received.
  282.      */
  283.     protected function waitForReceipt($receipt)
  284.     {
  285.         $stopAfter $this->calculateReceiptWaitEnd();
  286.         while (true) {
  287.             if ($frame $this->connection->readFrame()) {
  288.                 if ($frame->getCommand() == 'RECEIPT') {
  289.                     if ($frame['receipt-id'] == $receipt) {
  290.                         return true;
  291.                     } else {
  292.                         throw new UnexpectedResponseException($framesprintf('Expected receipt id %s'$receipt));
  293.                     }
  294.                 } else {
  295.                     $this->unprocessedFrames[] = $frame;
  296.                 }
  297.             }
  298.             if (microtime(true) >= $stopAfter) {
  299.                 break;
  300.             }
  301.         }
  302.         throw new MissingReceiptException($receipt);
  303.     }
  304.     /**
  305.      * Returns the timestamp with micro time to stop wait for a receipt.
  306.      *
  307.      * @return float
  308.      */
  309.     protected function calculateReceiptWaitEnd()
  310.     {
  311.         return microtime(true) + $this->receiptWait;
  312.     }
  313.     /**
  314.      * Read response frame from server
  315.      *
  316.      * @return Frame|false when no frame to read
  317.      */
  318.     public function readFrame()
  319.     {
  320.         return array_shift($this->unprocessedFrames) ?: $this->connection->readFrame();
  321.     }
  322.     /**
  323.      * Graceful disconnect from the server
  324.      * @param bool $sync
  325.      * @return void
  326.      */
  327.     public function disconnect($sync false)
  328.     {
  329.         try {
  330.             if ($this->connection && $this->connection->isConnected()) {
  331.                 if ($this->protocol) {
  332.                     $this->sendFrame($this->protocol->getDisconnectFrame(), $sync);
  333.                 }
  334.             }
  335.         } catch (StompException $ex) {
  336.             // nothing!
  337.         }
  338.         if ($this->connection) {
  339.             $this->connection->disconnect();
  340.         }
  341.         $this->sessionId null;
  342.         $this->unprocessedFrames = [];
  343.         $this->protocol null;
  344.         $this->isConnecting false;
  345.     }
  346.     /**
  347.      * Current stomp session ID
  348.      *
  349.      * @return string|null
  350.      */
  351.     public function getSessionId()
  352.     {
  353.         return $this->sessionId;
  354.     }
  355.     /**
  356.      * Graceful object destruction
  357.      *
  358.      */
  359.     public function __destruct()
  360.     {
  361.         $this->disconnect();
  362.     }
  363.     /**
  364.      * Check if client session has ben established
  365.      *
  366.      * @return boolean
  367.      */
  368.     public function isConnected()
  369.     {
  370.         return !empty($this->sessionId) && $this->connection->isConnected();
  371.     }
  372.     /**
  373.      * Get the used connection.
  374.      *
  375.      * @return Connection
  376.      */
  377.     public function getConnection()
  378.     {
  379.         return $this->connection;
  380.     }
  381.     /**
  382.      * Get the currently used protocol.
  383.      *
  384.      * @return null|\Stomp\Protocol\Protocol
  385.      */
  386.     public function getProtocol()
  387.     {
  388.         if (!$this->isConnecting && !$this->isConnected()) {
  389.             $this->connect();
  390.         }
  391.         return $this->protocol;
  392.     }
  393.     /**
  394.      * @return string
  395.      */
  396.     public function getClientId()
  397.     {
  398.         return $this->clientId;
  399.     }
  400.     /**
  401.      * @param string $clientId
  402.      * @return Client
  403.      */
  404.     public function setClientId($clientId)
  405.     {
  406.         $this->clientId $clientId;
  407.         return $this;
  408.     }
  409.     /**
  410.      * Set seconds to wait for a receipt.
  411.      *
  412.      * @param float $seconds
  413.      */
  414.     public function setReceiptWait($seconds)
  415.     {
  416.         $this->receiptWait $seconds;
  417.     }
  418.     /**
  419.      * Check if client runs in synchronized mode, which is the default operation mode.
  420.      *
  421.      * @return boolean
  422.      */
  423.     public function isSync()
  424.     {
  425.         return $this->sync;
  426.     }
  427.     /**
  428.      * Toggle synchronized mode.
  429.      *
  430.      * @param boolean $sync
  431.      */
  432.     public function setSync($sync)
  433.     {
  434.         $this->sync $sync;
  435.     }
  436. }