【问题标题】:Go-micro rabbit mq plugin - publish message with priorityGo-micro rabbit mq plugin - 优先发布消息
【发布时间】:2020-06-24 10:23:03
【问题描述】:

由于支持 RabbitMQ 版本 3.5.0 优先级队列 - https://www.rabbitmq.com/priority.html

如果在队列创建期间传递了 x-max-priority 参数,则可以声明队列。

我可以成功声明一个优先级支持的队列

brkrSub := broker.NewSubscribeOptions(
        broker.DisableAutoAck(),
        rabbitmq.QueueArguments(map[string]interface{}{"x-max-priority": 10}),
    )

    service.Server().Subscribe(
        service.Server().NewSubscriber(
            "mytopic",
            h.Handle,
            server.SubscriberQueue("mytopic.hello"),
            server.SubscriberContext(brkrSub.Context),
        ),
    )

但是如何发布指定优先级的消息?

    body := &message.MyTestMessage{
        Message: fmt.Sprintf("Message number %d", counter),
    }

    msg := client.NewMessage(
        topic,
        body,
        // TODO: Priority
    )
    if err := client.Publish(ctx, msg); err != nil {
        fmt.Printf("Cannot publish message: ", err.Error())
        return
    }

我找不到将优先级作为 MessageOption 或 PublishOption 传递的直接方法,但是,似乎有一种方法可以在 client.Publish 上下文中指定其他选项。我在寻找正确的方向吗?如果是,你能帮我一点吗?

编辑:我能够执行以下操作而不会导致任何编译时错误。尽管优先级仍然被忽略,并且消息以通常的方式出现


func setPriority(ctx context.Context, priority int) client.PublishOption {
    return func(o *client.PublishOptions) {
        o.Context = context.WithValue(ctx, "priority", priority)
    }
}

func publish(ctx context.Context, priority int, counter int) {
    //body := fmt.Sprintf("hello, I am a message %d", counter)
    body := &message.MyTestMessage{
        Message: fmt.Sprintf("Message number %d", counter),
    }

    msg := client.NewMessage(
        topic,
        body,
    )
    if err := client.Publish(ctx, msg, setPriority(ctx, priority)); err != nil {
        fmt.Printf("Cannot publish message: ", err.Error())
        return
    }

    fmt.Printf("Published message %d to %s \n", counter, topic)
}

【问题讨论】:

    标签: go rabbitmq go-micro


    【解决方案1】:

    试试这样的:

    func publishMessageToChan(queue *amqp.Queue, channel *amqp.Channel, messageToQueue string) error {
        return channel.Publish(
            "<exchange>", // exchange
            "<queue>",    // routing key
            false,        // mandatory
            false,        // immediate
            amqp.Publishing{
                Timestamp:   time.Now(),
                ContentType: "text/plain",
                Body:        []byte(messageToQueue),
                Priority:    0, // <-- Priority here < 0 to 9>
            })
    }
    

    带有库“github.com/streadway/amqp”

    【讨论】:

    • 感谢您的回答,但不幸的是,我必须为此使用带有rabbitmq插件的go-micro。单独使用 rabbitmq 库确实很容易,但是这里的东西在很大程度上被过度设计了
    【解决方案2】:
    var brokerOpts broker.PublishOptions
    rabbitmq.Priority(uint8(10))(&brokerOpts)
    
    event.Publish(ctx, payload, client.PublishContext(brokerOpts.Context))
    

    【讨论】:

      猜你喜欢
      • 2018-07-30
      • 2023-01-31
      • 2018-03-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多