【问题标题】:Can I prevent amqp.Channel closing on errors?我可以防止 amqp.Channel 因错误而关闭吗?
【发布时间】:2019-11-08 17:44:07
【问题描述】:

我尝试在 Go 中的单个通道上创建多个 AMQP 队列消费者。

我面临的问题是,当创建多个消费者时,如果第一个失败,通道会立即关闭,从而阻止进一步的操作。

有没有办法避免这种情况,还是我必须重新创建频道?

示例

假设队列“client-a”不存在,这将导致为“client-b”创建队列消费者时出错,因为此时通道已关闭。错误是Exception (504) Reason: "channel/connection is not open"

package main

import (
    "github.com/streadway/amqp"
    "log"
)

func check(err error) {
    if err != nil {
        panic(err)
    }
}

func TestChannelProblems() {
    // Setup AMQP stuff
    connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    check(err)
    log.Println("Queue connection ok")

    channel, err := connection.Channel()
    check(err)
    log.Println("Queue channel ok")

    queuesToConnectTo := []string{"client-a", "client-b"}

    for i, _ := range queuesToConnectTo {
        queueName := queuesToConnectTo[i]

        _, err := channel.Consume(queueName, "", false, false, false, false, nil)
        if err != nil {
            log.Printf("Connecting to queue %v failed: %v", queueName, err.Error())
        }

        // ... Here would be the logic to use the return value of channel.Consume
    }
}

【问题讨论】:

  • 为什么关闭了频道?请出示minimal reproducible example
  • 会的,可能需要一点时间。很抱歉错过了
  • 完成。我确信有一种方法可以通过编写一些弹性解决方案来自己处理这种情况。我问自己是否有办法让 RabbitMQ 不会在像这样的异常情况下关闭通道
  • 不确定我是否理解您的问题。 Channel.Consume 本身返回一个只读通道Delivery,反过来可以读取新消息。所以你想要同一个连接来处理多个队列?是什么阻止您按照文档中的建议使用Channel.QueueDeclare
  • 使用QueueDeclare 当然是正确的选择。遗憾的是,我无法控制这一点,因为我的软件只是单体系统的一小部分,队列可能不是由我创建的...... sigh

标签: go rabbitmq


【解决方案1】:

长话短说:在协议异常时关闭通道是 AMQP(或至少是 RabbitMQ)中的正常行为,必须由应用程序处理。

来源:https://www.rabbitmq.com/channels.html#error-handling

【讨论】:

    【解决方案2】:

    我尝试在 Go 中的单个通道上创建多个 amqp 队列消费者。

    一般不建议这样做。如果您的一个消费者遇到错误,它也会关闭另外两个。您应该为每个消费者打开一个渠道。尽管开通渠道的成本相对较高,因为它是网络往返,但在实践中这不太可能成为问题。

    有没有办法防止这种情况发生,还是我必须重新创建频道?

    您必须重新创建频道。如果连接发生任何错误,库 streadway/amqp 将关闭传递通道(通道 = Go 类型 <-chan amqp.Delivery)。

    您可以查看this thread 了解有关如何处理 AMQP 错误并可能恢复它们的详细信息。本质上,您使用NotifyClose 来监听关闭事件:

    // c is an *amqp.Channel
    errC := c.NotifyClose(make(chan *amqp.Error, 10))
    go func() {
        for err := range errC {
            if err != nil {
                // error-handling
            }
        }
    }()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-11-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-07-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多