【发布时间】:2020-09-06 13:02:28
【问题描述】:
我正在尝试从队列 (RabbitMQ) 中读取 URL 并发出有限数量的并发 HTTP 请求,即有 10 个工作人员池对从队列接收的 URL 发出并发请求(永远)。
到目前为止,我已经按照 RabbitMQ 教程实现了一个消费者: https://www.rabbitmq.com/tutorials/tutorial-one-go.html
并尝试了从网上发现的示例中的多种方法,以此处的示例结尾: http://jmoiron.net/blog/limiting-concurrency-in-go/
不幸的是,我当前的代码运行了大约一分钟,然后无限期地冻结。我已经尝试添加/移动 go 例程,但我似乎无法让它按预期工作(我对 Go 很陌生)。
当前代码:
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/Xide/bloom"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
var netClient = &http.Client{
Timeout: time.Second * 10,
}
func getRequest(url string) {
//resp, err := http.Get(string(url))
resp, err := netClient.Get(string(url))
if err != nil {
log.Printf("HTTP request error: %s", err)
return
}
fmt.Println("StatusCode:", resp.StatusCode)
fmt.Println(resp.Request.URL)
}
func main() {
bf := bloom.NewDefaultScalable(0.1)
conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"urls", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, //global
)
failOnError(err, "Failed to set Qos")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
concurrency := 10
sem := make(chan bool, concurrency)
go func() {
for d := range msgs {
sem <- true
url := string(d.Body)
if bf.Match(url) == false {
bf.Feed(url)
log.Printf("Not seen: %s", d.Body)
go func(url string) {
defer func() { <-sem }()
getRequest(url)
}(url)
} else {
log.Printf("Already seen: %s", d.Body)
}
d.Ack(false)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
【问题讨论】:
-
您能否将日志输出添加到问题中,这将帮助人们了解发生了什么
-
尝试运行带有
-race标志的程序,它可以帮助你调试:blog.golang.org/race-detector -
并发设置为 10 时,它发出大约 60 个 HTTP 请求(逐渐变慢)然后冻结。使用 -race 构建不提供任何信息。
-
net/http documentation 说“客户端完成后必须关闭响应正文:”我无法在您的代码中找到关闭响应正文的位置。所以我猜想,所有这些连接都会无限期地保持打开状态。 (但只有 60 个调用,这应该不是问题。)
-
如果我没记错的话,也有问题,如果你没有完整阅读响应的正文,但我找不到指向那个的文档。但我记得我做过
io.Copy(resp.Body, ioutil.Discard)之类的事情。也许那是迷信。
标签: go