Fatal error: Uncaught Stomp\Exception\ConnectionException: Was not possible to write frame! Write operation timed out.
Fatal error: Uncaught Stomp\Exception\ConnectionException: Was not possible to write frame! Write operation timed out. (Host: 127.0.0.1) in /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/HeartbeatEmitter.php on line 178
Stomp\Network\Observer\Exception\HeartbeatException: Could not send heartbeat to server. in /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/HeartbeatEmitter.php on line 178
Call Stack:
0.0029 410088 1. {main}() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/bin/process-location-stock-message.php:0
242.5327 98653392 2. App\Broker->read() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/bin/process-location-stock-message.php:32
242.5327 98653392 3. Stomp\StatefulStomp->read() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/src/Broker.php:122
242.5327 98653392 4. Stomp\States\ConsumerState->read() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/StatefulStomp.php:163
242.5327 98653392 5. Stomp\Client->readFrame() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/States/ConsumerState.php:170
242.5327 98653392 6. Stomp\Network\Connection->readFrame() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Client.php:361
242.5328 98653424 7. Stomp\Network\Observer\ConnectionObserverCollection->emptyRead() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Connection.php:537
242.5328 98653424 8. Stomp\Network\Observer\HeartbeatEmitter->emptyRead() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/ConnectionObserverCollection.php:123
242.5328 98653424 9. Stomp\Network\Observer\HeartbeatEmitter->onPotentialConnectionStateActivity() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/AbstractBeats.php:245
242.5328 98653424 10. Stomp\Network\Observer\HeartbeatEmitter->checkDelayed() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/HeartbeatEmitter.php:164
242.5328 98653424 11. Stomp\Network\Observer\HeartbeatEmitter->onDelay() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/AbstractBeats.php:155
But I already have Heartbeat setup on my connection:
<?php
namespace App;
use Stomp\Client;
use Stomp\Exception\ConnectionException;
use Stomp\Network\Connection;
use Stomp\Network\Observer\HeartbeatEmitter;
use Stomp\StatefulStomp;
use Stomp\Transport\Frame;
use Stomp\Transport\Message;
class Broker
{
private $client;
private $subscriptions = [];
private $host;
private $port;
private $brokerUri;
private $username;
private $password;
private $selector;
private $topic;
private $path = '';
private $log_path = '';
private $log_filepath = '';
public function __construct()
{
$this->load_amq_config();
$connection = new Connection($this->host . ':' . $this->port);
$client = new Client($connection);
$client->setLogin($this->username, $this->password);
$client->setHeartbeat(500);
$connection->setReadTimeout(0, 250000);
$emitter = new HeartbeatEmitter($client->getConnection());
$client->getConnection()->getObservers()->addObserver($emitter);
$this->client = new StatefulStomp($client);
$client->connect();
}
private function load_amq_config(): void {
$this->host = AMQ_HOST;
$this->port = AMQ_PORT;
$this->brokerUri = AMQ_HOST . ':' . AMQ_PORT;
$this->username = AMQ_USERNAME;
$this->password = AMQ_PASSWORD;
$this->selector = AMQ_LOCATION_SELECTOR;
$this->topic = AMQ_TOPIC;
}
public function getSelector(){
return $this->selector;
}
public function getTopic(){
return $this->topic;
}
public function sendQueue(string $queueName, string $message, array $headers = []): bool
{
$destination = '/queue/' . $queueName;
return $this->client->send($destination, new Message($message, $headers + ['persistent' => 'true']));
}
public function sendTopic(string $topicName, string $message, array $headers = []): bool
{
$destination = '/topic/' . $topicName;
return $this->client->send($destination, new Message($message, $headers + ['persistent' => 'true']));
}
public function subscribeQueue(string $queueName, ?string $selector = null): void
{
$destination = '/queue/' . $queueName;
$this->subscriptions[$destination] = $this->client->subscribe($destination, $selector, 'client-individual');
}
public function subscribeTopic(string $topicName, ?string $selector = null): void
{
$destination = '/topic/' . $topicName;
$this->subscriptions[$destination] = $this->client->subscribe($destination, $selector, 'client-individual');
}
public function unsubscribeQueue(?string $queueName = null): void
{
if ($queueName) {
$destination = '/queue/' . $queueName;
if (isset($this->subscriptions[$destination])) {
$this->client->unsubscribe($this->subscriptions[$destination]);
}
} else {
$this->client->unsubscribe();
}
}
public function unsubscribeTopic(?string $topicName = null): void
{
if ($topicName) {
$destination = '/topic/' . $topicName;
if (isset($this->subscriptions[$destination])) {
$this->client->unsubscribe($this->subscriptions[$destination]);
}
} else {
$this->client->unsubscribe();
}
}
public function read(): ?Frame
{
return ($frame = $this->client->read()) ? $frame : null;
}
public function ack(Frame $message): void
{
$this->client->ack($message);
}
public function nack(Frame $message): void
{
$this->client->nack($message);
}
}
Main Message Processor Cron File:
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use App\Broker;
use App\class_amqMessageProcessor;
use Exception;
use Stomp\Transport\Frame;
$amqProcessor = new class_amqMessageProcessor();
try {
$broker = new Broker();
} catch (Exception $e) {
$amqProcessor->save_log("Error Message: ".$e->getMessage());
exit(1);
}
$selector = $broker->getSelector();
$topic = $broker->getTopic();
try{
$broker->subscribeTopic( $topic , $selector);
} catch (Exception $e) {
$amqProcessor->save_log("Error Message: ".$e->getMessage());
exit(1);
}
while (true) {
$message = $broker->read();
if($message AND $message != ''){
if ($message instanceof Frame) {
if ($message['type'] === 'terminate') {
$amqProcessor->save_log("Received shutdown command on message.");
}
$messageHeader = $message->getHeaders();
$messageBody = $message->getBody();
echo json_encode($messageBody);
$broker->ack($message);
}
usleep(100000);
}
else{
echo ("No Messages Received.\n");
$amqProcessor->save_log("No Messages Received.");
}
}
// $broker->unsubscribeTopic();
$amqProcessor->save_log("Cron End");
Composer dependency details:
ActiveMQ 5.17.1
"php": ">=7.2.0"
"name": "stomp-php/stomp-php",
"version": "5.0.0",
Can you please tell me what am I missing here?
Fatal error: Uncaught Stomp\Exception\ConnectionException: Was not possible to write frame! Write operation timed out.
But I already have Heartbeat setup on my connection:
Main Message Processor Cron File:
Composer dependency details:
Can you please tell me what am I missing here?