【问题标题】:Limiting concurrency when processing messages from RabbitMQ处理来自 RabbitMQ 的消息时限制并发
【发布时间】:2020-09-06 13:02:28
【问题描述】:

我正在尝试从队列 (RabbitMQ) 中读取 URL 并发出有限数量的并发 HTTP 请求,即有 10 个工作人员池对从队列接收的 URL 发出并发请求(永远)。

到目前为止,我已经按照 RabbitMQ 教程实现了一个消费者: https://www.rabbitmq.com/tutorials/tutorial-one-go.html

并尝试了从网上发现的示例中的多种方法,以此处的示例结尾: http://jmoiron.net/blog/limiting-concurrency-in-go/

不幸的是,我当前的代码运行了大约一分钟,然后无限期地冻结。我已经尝试添加/移动 go 例程,但我似乎无法让它按预期工作(我对 Go 很陌生)。

当前代码:

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/Xide/bloom"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

var netClient = &http.Client{
    Timeout: time.Second * 10,
}

func getRequest(url string) {
    //resp, err := http.Get(string(url))
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP request error: %s", err)
        return
    }
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}

func main() {
    bf := bloom.NewDefaultScalable(0.1)

    conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "urls",            // name
        true,              // durable
        false,             // delete when unused
        false,             // exclusive
        false,             // no-wait
        nil,               // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, //global
    )
    failOnError(err, "Failed to set Qos")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    concurrency := 10
    sem := make(chan bool, concurrency)
    go func() {
        for d := range msgs {
            sem <- true
            url := string(d.Body)
            if bf.Match(url) == false {
                bf.Feed(url)
                log.Printf("Not seen: %s", d.Body)
                go func(url string) {
                    defer func() { <-sem }()
                    getRequest(url)
                }(url)
            } else {
                log.Printf("Already seen: %s", d.Body)
            }
            d.Ack(false)
        }
        for i := 0; i < cap(sem); i++ {
            sem <- true
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

【问题讨论】:

  • 您能否将日志输出添加到问题中,这将帮助人们了解发生了什么
  • 尝试运行带有-race标志的程序,它可以帮助你调试:blog.golang.org/race-detector
  • 并发设置为 10 时,它发出大约 60 个 HTTP 请求(逐渐变慢)然后冻结。使用 -race 构建不提供任何信息。
  • net/http documentation 说“客户端完成后必须关闭响应正文:”我无法在您的代码中找到关闭响应正文的位置。所以我猜想,所有这些连接都会无限期地保持打开状态。 (但只有 60 个调用,这应该不是问题。)
  • 如果我没记错的话,也有问题,如果你没有完整阅读响应的正文,但我找不到指向那个的文档。但我记得我做过io.Copy(resp.Body, ioutil.Discard) 之类的事情。也许那是迷信。

标签: go


【解决方案1】:

您没有正确处理您的 HTTP 响应,导致打开的连接越来越多。试试这个:

func getRequest(url string) {
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP request error: %s", err)
        return
    }
    // Add this bit:
    defer func() {
        io.Copy(ioutil.Discard, resp.Body)
        resp.Body.Close()
    }()
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}

在您从频道中阅读完消息后,这似乎是不必要的并且可能存在问题:

    for i := 0; i < cap(sem); i++ {
        sem <- true
    }

为什么在您读取队列中的所有消息后填充sem 频道?您已向频道添加了与您希望从中读取的消息一样多的消息,因此这充其量毫无意义,如果您对其余代码进行错误更改,可能会导致问题。

与您的问题无关,但这是多余的:

if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
}

根据the documentationFatalf 已经退出,因此永远不会调用panic。如果你想记录和panic,试试log.Panicf,它就是为此目的而设计的。

【讨论】:

    【解决方案2】:

    您在收到消息时添加到sem,但只有在您没有看到网址时才从sem 中删除。

    所以,一旦您“已经看到”了 10 个网址,您的应用就会锁定。 因此,您需要将 &lt;-sem 添加到记录“已见”的 else 语句中。

    不管怎样,这是一种相当奇怪的方式来实现这种并发性。 这是一个以更惯用的方式执行此操作的版本on Play

    注意,在这个版本中,我们只生成了 10 个监听 rabbit 频道的 goroutine。

    package main
    
    import (
        "fmt"
        "log"
        "net/http"
        "time"
    
        "github.com/Xide/bloom"
        "github.com/streadway/amqp"
    )
    
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
        }
    }
    
    var netClient = &http.Client{
        Timeout: time.Second * 10,
    }
    
    func getRequest(url string) {
        //resp, err := http.Get(string(url))
        resp, err := netClient.Get(string(url))
        if err != nil {
            log.Printf("HTTP request error: %s", err)
            return
        }
        resp.Body.Close()
        fmt.Println("StatusCode:", resp.StatusCode)
        fmt.Println(resp.Request.URL)
    }
    
    func main() {
        bf := bloom.NewDefaultScalable(0.1)
    
        conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()
    
        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()
    
        q, err := ch.QueueDeclare(
            "urls", // name
            true,   // durable
            false,  // delete when unused
            false,  // exclusive
            false,  // no-wait
            nil,    // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        err = ch.Qos(
            1,     // prefetch count
            0,     // prefetch size
            false, //global
        )
        failOnError(err, "Failed to set Qos")
    
        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        concurrency := 10
        var wg sync.Waitgroup              // used to coordinate when they are done, ie: if rabbit conn was closed
        for x := 0; x < concurrency; x++ { // spawn 10 goroutines, all reading from the rabbit channel
            wg.Add(1)
            go func() {
                defer wg.Done() // signal that this goroutine is done
                for d := range msgs {
                    url := string(d.Body)
                    if bf.Match(url) == false {
                        bf.Feed(url)
                        log.Printf("Not seen: %s", d.Body)
                        getRequest(url)
                    } else {
                        log.Printf("Already seen: %s", d.Body)
                    }
                    d.Ack(false)
                }
                log.Println("msgs channel closed")
            }()
        }
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        wg.Wait() // when all goroutine's exit, the app exits
    }
    

    【讨论】:

    • 上面的例子退出:panic: sync: negative WaitGroup counter@david-budworth
    • 更新了示例,我忽略了用工人数量(并发)初始化 Waitgroup。我实际上无法运行该应用程序,因为我没有任何提交项目,因此您可能需要进行一些调整。重点更多的是展示另一种方式并解释您的解决方案为何挂起。
    • var wg sync.Waitgroup => var wg sync.WaitGroup 并添加wg.Add(concurrency)
    • @Cui 是的,这更直接,只是wg.Add(concurrency),但我通常更喜欢wg.Add(1)wg.Done() 之间有明显的联系。另外,如果我稍后(不正确地)将 for 循环更改为 &lt;= concurrency,程序仍然会正常运行,即使它在循环中有一个错误
    猜你喜欢
    • 1970-01-01
    • 2015-10-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-26
    相关资源
    最近更新 更多