【问题标题】:Rabbitmq - php amqp broken broken pipe errorRabbitmq - php amqp 破碎的管道错误
【发布时间】:2013-04-05 19:57:44
【问题描述】:

我正在处理一个巨大的 xml 文档(其中包含大约一百万个条目),然后使用 rabbitmq 将格式化版本导入到数据库中。每次发布大约 200,000 个条目后,我都会收到一个损坏的管道错误,并且 rabbitmq 无法从中恢复。

注意错误:fwrite(): send of 2651 bytes failed with errno=11 资源暂时不可用 [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,行 439]

注意错误:fwrite(): send of 33 bytes failed with errno=104 对等方重置连接 [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc,行 439]

注意错误:fwrite(): send of 19 bytes failed with errno=32 Broken [/var/www/ribbon/app/Console/Command/lib/php_amqplib/amqp.inc 中的管道, 第439行]

这随后会导致节点关闭错误,需要手动终止进程才能从中恢复。

这些是我的类方法:-

public function publishMessage($message) {
    if (!isset($this->conn)) {
        $this->_createNewConnectionAndChannel();
    }
    try {
        $this->ch->basic_publish(
            new AMQPMessage($message, array('content_type' => 'text/plain')), 
            $this->defaults['exchange']['name'], 
            $this->defaults['binding']['routing_key']
        );
    } catch (Exception $e) {
        echo "Caught exception : " . $e->getMessage();
        echo "Creating new connection.";
        $this->_createNewConnectionAndChannel();
        $this->publishMessage($message); // try again
    }
}

protected function _createNewConnectionAndChannel() {
    if (isset($this->conn)) {
        $this->conn->close();
    }

    if(isset($this->ch)) {
        $this->ch->close();
    }

    $this->conn = new AMQPConnection(
        $this->defaults['connection']['host'], 
        $this->defaults['connection']['port'], 
        $this->defaults['connection']['user'], 
        $this->defaults['connection']['pass']
    );
    $this->ch = $this->conn->channel();
    $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true);
    $this->ch->basic_qos(0 , 20 , 0); // fair dispatching

    $this->ch->queue_declare(
        $this->defaults['queue']['name'],
        $this->defaults['queue']['passive'],
        $this->defaults['queue']['durable'],
        $this->defaults['queue']['exclusive'],
        $this->defaults['queue']['auto_delete']
    );

    $this->ch->exchange_declare(
        $this->defaults['exchange']['name'],
        $this->defaults['exchange']['type'],
        $this->defaults['exchange']['passive'],
        $this->defaults['exchange']['durable'],
        $this->defaults['exchange']['auto_delete']
    );

    $this->ch->queue_bind(
        $this->defaults['queue']['name'],
        $this->defaults['exchange']['name'],
        $this->defaults['binding']['routing_key']
    );
}

任何帮助将不胜感激。

【问题讨论】:

标签: php rabbitmq amqp


【解决方案1】:

确保您在 Rabbit MQ 上为您的用户添加了虚拟主机访问权限。我创建了新用户并忘记了默认使用的“/”主机的设置访问权限。

您可以通过管理面板 yourhost:15672 > Admin > 点击用户 > 查找“设置权限”。

附:我假设您的 RabbitMQ 服务正在运行,用户存在且密码正确。

【讨论】:

    【解决方案2】:

    实际上,当您的消息中包含大量内容并且您的消费者花费太多时间仅处理一条消息时,就会发生此问题,即向兔子响应“ACK”并尝试使用另一条消息时出现问题。

    例如,当我遇到这个问题时,我会尝试“适应”我的消息,因为它是一个产品工作者,并且每条消息都有大约 1k 个产品 ID,所以我改为 100 个产品并且效果很好。

    您可以阅读更多关于使用心跳检测死 TCP 连接here

    【讨论】:

      【解决方案3】:

      这个问题发生在我与 RabbitMQ 的连接中断时(原因并不重要,在我的情况下,我故意停止 RabbitMQ 服务以进行一些失败测试),我试图通过关闭旧的重新连接到 RabbitMQ连接并初始化一个新的,但我收到Broken pipe or closed connection 错误。 我解决这个问题的方法是在我的连接上使用reconnect() 方法:

      $channel->reconnect();
      

      【讨论】:

      • $channel->getConnection()->reconnect();
      • 我在 $channel->getConnection()->reconnect();也许你能告诉我为什么?
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-16
      • 1970-01-01
      • 2013-03-07
      相关资源
      最近更新 更多