Skip to content

Was not possible to write frame! Write operation timed out while connecting ActiveMQ #169

@chandra10207

Description

@chandra10207

Fatal error: Uncaught Stomp\Exception\ConnectionException: Was not possible to write frame! Write operation timed out.

Fatal error: Uncaught Stomp\Exception\ConnectionException: Was not possible to write frame! Write operation timed out. (Host: 127.0.0.1) in /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/HeartbeatEmitter.php on line 178

Stomp\Network\Observer\Exception\HeartbeatException: Could not send heartbeat to server. in /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/HeartbeatEmitter.php on line 178

Call Stack:
    0.0029     410088   1. {main}() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/bin/process-location-stock-message.php:0
  242.5327   98653392   2. App\Broker->read() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/bin/process-location-stock-message.php:32
  242.5327   98653392   3. Stomp\StatefulStomp->read() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/src/Broker.php:122
  242.5327   98653392   4. Stomp\States\ConsumerState->read() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/StatefulStomp.php:163
  242.5327   98653392   5. Stomp\Client->readFrame() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/States/ConsumerState.php:170
  242.5327   98653392   6. Stomp\Network\Connection->readFrame() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Client.php:361
  242.5328   98653424   7. Stomp\Network\Observer\ConnectionObserverCollection->emptyRead() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Connection.php:537
  242.5328   98653424   8. Stomp\Network\Observer\HeartbeatEmitter->emptyRead() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/ConnectionObserverCollection.php:123
  242.5328   98653424   9. Stomp\Network\Observer\HeartbeatEmitter->onPotentialConnectionStateActivity() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/AbstractBeats.php:245
  242.5328   98653424  10. Stomp\Network\Observer\HeartbeatEmitter->checkDelayed() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/HeartbeatEmitter.php:164
  242.5328   98653424  11. Stomp\Network\Observer\HeartbeatEmitter->onDelay() /Applications/MAMP/htdocs/homeclearance/wp-content/plugins/hc-customization/message-processor/vendor/stomp-php/stomp-php/src/Network/Observer/AbstractBeats.php:155

But I already have Heartbeat setup on my connection:

       <?php

namespace App;

use Stomp\Client;
use Stomp\Exception\ConnectionException;
use Stomp\Network\Connection;
use Stomp\Network\Observer\HeartbeatEmitter;
use Stomp\StatefulStomp;
use Stomp\Transport\Frame;
use Stomp\Transport\Message;

class Broker
{
    private $client;
    private $subscriptions = [];
    private $host;
    private $port;
    private $brokerUri;
    private $username;
    private $password;
    private $selector;
    private $topic;
    private $path = '';
    private $log_path = '';
    private $log_filepath = '';

    public function __construct()
    {
        $this->load_amq_config();
        $connection = new Connection($this->host . ':' . $this->port);
        $client = new Client($connection);
        $client->setLogin($this->username, $this->password);

        $client->setHeartbeat(500);
        $connection->setReadTimeout(0, 250000);

        $emitter = new HeartbeatEmitter($client->getConnection());
        $client->getConnection()->getObservers()->addObserver($emitter);

        $this->client = new StatefulStomp($client);
        $client->connect();
    }


    private function load_amq_config(): void {
        $this->host = AMQ_HOST;
        $this->port = AMQ_PORT;
        $this->brokerUri = AMQ_HOST . ':' . AMQ_PORT;
        $this->username = AMQ_USERNAME;
        $this->password = AMQ_PASSWORD;
        $this->selector = AMQ_LOCATION_SELECTOR;
        $this->topic = AMQ_TOPIC;
    }

    public function getSelector(){
        return $this->selector;
    }

    public function getTopic(){
        return $this->topic;
    }


    public function sendQueue(string $queueName, string $message, array $headers = []): bool
    {
        $destination = '/queue/' . $queueName;
        return $this->client->send($destination, new Message($message, $headers + ['persistent' => 'true']));
    }

    public function sendTopic(string $topicName, string $message, array $headers = []): bool
    {
        $destination = '/topic/' . $topicName;
        return $this->client->send($destination, new Message($message, $headers + ['persistent' => 'true']));
    }

    public function subscribeQueue(string $queueName, ?string $selector = null): void
    {
        $destination = '/queue/' . $queueName;
        $this->subscriptions[$destination] = $this->client->subscribe($destination, $selector, 'client-individual');
    }

    public function subscribeTopic(string $topicName, ?string $selector = null): void
    {
        $destination = '/topic/' . $topicName;
        $this->subscriptions[$destination] = $this->client->subscribe($destination, $selector, 'client-individual');
    }

    public function unsubscribeQueue(?string $queueName = null): void
    {
        if ($queueName) {
            $destination = '/queue/' . $queueName;
            if (isset($this->subscriptions[$destination])) {
                $this->client->unsubscribe($this->subscriptions[$destination]);
            }
        } else {
            $this->client->unsubscribe();
        }
    }

    public function unsubscribeTopic(?string $topicName = null): void
    {
        if ($topicName) {
            $destination = '/topic/' . $topicName;
            if (isset($this->subscriptions[$destination])) {
                $this->client->unsubscribe($this->subscriptions[$destination]);
            }
        } else {
            $this->client->unsubscribe();
        }
    }

    public function read(): ?Frame
    {
        return ($frame = $this->client->read()) ? $frame : null;
    }

    public function ack(Frame $message): void
    {
        $this->client->ack($message);
    }

    public function nack(Frame $message): void
    {
        $this->client->nack($message);
    }
}

Main Message Processor Cron File:

<?php

require_once __DIR__ . '/../vendor/autoload.php';
use App\Broker;
use App\class_amqMessageProcessor;
use Exception;
use Stomp\Transport\Frame;

$amqProcessor = new class_amqMessageProcessor();
try {
    $broker = new Broker();
} catch (Exception $e) {
    $amqProcessor->save_log("Error Message: ".$e->getMessage());
    exit(1);
}

$selector = $broker->getSelector();
$topic = $broker->getTopic();
try{
    $broker->subscribeTopic( $topic , $selector);
} catch (Exception $e) {
    $amqProcessor->save_log("Error Message: ".$e->getMessage());
    exit(1);
}

while (true) {
    $message = $broker->read();
    if($message AND $message != ''){
        if ($message instanceof Frame) {
            if ($message['type'] === 'terminate') {
                $amqProcessor->save_log("Received shutdown command on message.");
            }
            $messageHeader = $message->getHeaders();
            $messageBody = $message->getBody();
            echo json_encode($messageBody);
            $broker->ack($message);
        }
        usleep(100000);
    }
    else{
        echo ("No Messages Received.\n");
        $amqProcessor->save_log("No Messages Received.");
    }
}

// $broker->unsubscribeTopic();

$amqProcessor->save_log("Cron End");

Composer dependency details:


ActiveMQ 5.17.1
 "php": ">=7.2.0"
"name": "stomp-php/stomp-php",
            "version": "5.0.0",

Can you please tell me what am I missing here?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions