【问题标题】:How Can I Effectively 'Max Out' Concurrent HTTP Requests?如何有效地“最大化”并发 HTTP 请求?
【发布时间】:2019-06-01 01:04:47
【问题描述】:

我目前正在尝试使用 Go 进行一些实验。这是我正在尝试做的事情:

我有一个 REST API 服务正在运行,我想在尽可能多的 Goroutine 中一遍又一遍地查询一个特定的 URL,以查看这些响应的性能如何(通过查看我的 REST API 服务器日志) .我想在退出程序之前发送总共 100 万个 HTTP 请求——在我的计算机允许的范围内同时执行。

我知道有一些工具可以做到这一点,但我主要对如何使用 goroutine 在 Go 中最大化我的 HTTP 并发感兴趣。

这是我的代码:

package main

import (
    "fmt"
    "net/http"
    "runtime"
    "time"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    transport := &http.Transport{}

    for i := 0; i < 1000000; i++ {
        go func() {
            req, _ := http.NewRequest("GET", "http://myapi.com", nil)
            req.Header.Set("User-Agent", "custom-agent")
            req.SetBasicAuth("xxx", "xxx")
            resp, err := transport.RoundTrip(req)
            if err != nil {
                panic("HTTP request failed.")
            }
            defer resp.Body.Close()

            if resp.StatusCode != 302 {
                panic("Unexpected response returned.")
            }

            location := resp.Header.Get("Location")
            if location == "" {
                panic("No location header returned.")
            }
            fmt.Println("Location Header Value:", location)
        }()
    }

    time.Sleep(60 * time.Second)
}

我期望这段代码做的是:

  • 启动 1,000,000 个 goroutine,每个 goroutine 都向我的 API 服务发出 HTTP 请求。
  • 在我的所有 CPU 上同时运行 goroutines(因为我使用运行时包来增加 GOMAXPROCS 设置)。

然而,发生的情况是我收到以下错误(太多无法粘贴,所以我只包含了一些输出):

goroutine 16680 [IO wait]:
net.runtime_pollWait(0xcb1d878, 0x77, 0x0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/runtime/netpoll.goc:116 +0x6a
net.(*pollDesc).Wait(0xc212a86ca0, 0x77, 0x55d0c0, 0x24)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:81 +0x34
net.(*pollDesc).WaitWrite(0xc212a86ca0, 0x24, 0x55d0c0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:90 +0x30
net.(*netFD).connect(0xc212a86c40, 0x0, 0x0, 0xb4c97e8, 0xc212a84500, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:86 +0x166
net.(*netFD).dial(0xc212a86c40, 0xb4c87d8, 0x0, 0xb4c87d8, 0xc212a878d0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:121 +0x2fd
net.socket(0x2402c0, 0x3, 0x2, 0x1, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:91 +0x40b
net.internetSocket(0x2402c0, 0x3, 0xb4c87d8, 0x0, 0xb4c87d8, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/ipsock_posix.go:136 +0x161
net.dialTCP(0x2402c0, 0x3, 0x0, 0xc212a878d0, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/tcpsock_posix.go:155 +0xef
net.dialSingle(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:225 +0x3d8
net.func·015(0x0, 0x0, 0x0, 0x2402c0, 0x3, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:158 +0xde
net.dial(0x2402c0, 0x3, 0xb4c8748, 0xc212a878d0, 0xafbbcd8, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:40 +0x45
net.(*Dialer).Dial(0xafbbd78, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:165 +0x3e0
net.Dial(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:138 +0x75
net/http.(*Transport).dial(0xc210057280, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:401 +0xd4
net/http.(*Transport).dialConn(0xc210057280, 0xc2112efa80, 0x0, 0x0, 0x0)
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:444 +0x6e
net/http.func·014()
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:419 +0x3e
created by net/http.(*Transport).getConn
        /usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:421 +0x11a

我在 Mac OSX 10.9.2 笔记本电脑上运行此脚本,该笔记本电脑配备 16GB RAM 和 2.6GHz Intel Core i5 处理器。

我可以做些什么来让我的笔记本电脑“充斥”尽可能多的并发 HTTP 请求?

【问题讨论】:

    标签: http concurrency go


    【解决方案1】:

    正如Rob Napier 建议的那样,您几乎肯定会达到文件描述符限制。

    编辑:改进的并发版本:

    这个程序创建了一个max goroutines 的工作池,它将请求从通道中拉出,处理它们,然后在响应通道上发送它们。请求由dispatcher 排队,goroutines 由workerPool 启动,workers 每个处理一个作业,直到请求通道为空,consumer 处理响应通道直到成功响应的数量等于请求的数量。

    package main
    
    import (
        "flag"
        "fmt"
        "log"
        "net/http"
        "runtime"
        "time"
    )
    
    var (
        reqs int
        max  int
    )
    
    func init() {
        flag.IntVar(&reqs, "reqs", 1000000, "Total requests")
        flag.IntVar(&max, "concurrent", 200, "Maximum concurrent requests")
    }
    
    type Response struct {
        *http.Response
        err error
    }
    
    // Dispatcher
    func dispatcher(reqChan chan *http.Request) {
        defer close(reqChan)
        for i := 0; i < reqs; i++ {
            req, err := http.NewRequest("GET", "http://localhost/", nil)
            if err != nil {
                log.Println(err)
            }
            reqChan <- req
        }
    }
    
    // Worker Pool
    func workerPool(reqChan chan *http.Request, respChan chan Response) {
        t := &http.Transport{}
        for i := 0; i < max; i++ {
            go worker(t, reqChan, respChan)
        }
    }
    
    // Worker
    func worker(t *http.Transport, reqChan chan *http.Request, respChan chan Response) {
        for req := range reqChan {
            resp, err := t.RoundTrip(req)
            r := Response{resp, err}
            respChan <- r
        }
    }
    
    // Consumer
    func consumer(respChan chan Response) (int64, int64) {
        var (
            conns int64
            size  int64
        )
        for conns < int64(reqs) {
            select {
            case r, ok := <-respChan:
                if ok {
                    if r.err != nil {
                        log.Println(r.err)
                    } else {
                        size += r.ContentLength
                        if err := r.Body.Close(); err != nil {
                            log.Println(r.err)
                        }
                    }
                    conns++
                }
            }
        }
        return conns, size
    }
    
    func main() {
        flag.Parse()
        runtime.GOMAXPROCS(runtime.NumCPU())
        reqChan := make(chan *http.Request)
        respChan := make(chan Response)
        start := time.Now()
        go dispatcher(reqChan)
        go workerPool(reqChan, respChan)
        conns, size := consumer(respChan)
        took := time.Since(start)
        ns := took.Nanoseconds()
        av := ns / conns
        average, err := time.ParseDuration(fmt.Sprintf("%d", av) + "ns")
        if err != nil {
            log.Println(err)
        }
        fmt.Printf("Connections:\t%d\nConcurrent:\t%d\nTotal size:\t%d bytes\nTotal time:\t%s\nAverage time:\t%s\n", conns, max, size, took, average)
    }
    

    生产:

    连接数:1000000
    并发数:200
    总大小:15000000 字节
    总时间:36m39.6778103s
    平均时间:2.199677ms

    警告:这非常会迅速达到系统资源限制。在我的笔记本电脑上,超过 206 个并发工作人员会导致我的本地测试网络服务器崩溃!

    Playground

    原始答案: 下面的程序使用缓冲的chan bool 作为信号量通道,这限制了并发请求的数量。您可以调整此数字和请求总数,以便对系统进行压力测试并确定最大值。

    package main
    
    import (
        "fmt"
        "net/http"
        "runtime"
        "time"
    )
    
    type Resp struct {
        *http.Response
        err error
    }
    
    func makeResponses(reqs int, rc chan Resp, sem chan bool) {
        defer close(rc)
        defer close(sem)
        for reqs > 0 {
            select {
            case sem <- true:
                req, _ := http.NewRequest("GET", "http://localhost/", nil)
                transport := &http.Transport{}
                resp, err := transport.RoundTrip(req)
                r := Resp{resp, err}
                rc <- r
                reqs--
            default:
                <-sem
            }
        }
    }
    
    func getResponses(rc chan Resp) int {
        conns := 0
        for {
            select {
            case r, ok := <-rc:
                if ok {
                    conns++
                    if r.err != nil {
                        fmt.Println(r.err)
                    } else {
                        // Do something with response
                        if err := r.Body.Close(); err != nil {
                            fmt.Println(r.err)
                        }
                    }
                } else {
                    return conns
                }
            }
        }
    }
    
    func main() {
        reqs := 100000
        maxConcurrent := 1000
        runtime.GOMAXPROCS(runtime.NumCPU())
        rc := make(chan Resp)
        sem := make(chan bool, maxConcurrent)
        start := time.Now()
        go makeResponses(reqs, rc, sem)
        conns := getResponses(rc)
        end := time.Since(start)
        fmt.Printf("Connections: %d\nTotal time: %s\n", conns, end)
    }
    

    这将打印如下内容:

    连接数:100000
    总时间:6m8.2554629s

    此测试是在本地 Web 服务器上完成的,每个请求返回的总响应大小为 85B,因此这不是一个现实的结果。另外,除了关闭它的主体外,我没有对响应进行任何处理。

    在最多 1000 个并发请求时,我的笔记本电脑需要 6 分钟多一点的时间来处理 100,000 个请求,所以我猜一百万个请求需要一个多小时。调整 maxConcurrent 变量应该可以帮助您获得系统的最大性能。

    【讨论】:

    • 我喜欢你的信号量通道——效果很好。但是,您正在为每个连接创建一个新的传输,这意味着套接字不会被重用。此外,您正在makeResponse 中按顺序执行 http 事务的连接部分,所以我认为您没有获得最大的并发性。更多所有 HTTP 内容到 getResponse 函数来修复。
    • @NickCraig-Wood,你是对的,我目前正在重写这个示例以正确并发。希望我很快就会有更新。我将留给 OP 解决的连接重用问题 :-)
    • @NickCraig-Wood 请查看改进版。不需要信号量通道,只需要一个工作池:-)
    • 不错!请注意,如果您在dispatcher 的末尾加上close(reqChan),那么您可以在worker 中使用更简单的for req :=rangereqChan。您可以为consumer 做同样的事情,但需要付出更多努力(您需要sync.Waitgroup),这将避免您必须计算返回的项目数。 [这里[(github.com/ncw/rclone/blob/master/fs/operations.go#L220) 是我的一个项目中这种模式的一个例子。
    • @NickCraig-Wood 谢谢!我会看看我能做些什么来改善答案。我很欣赏这个链接,我一直希望在这个并发的东西上做得更好:-)
    【解决方案2】:

    您几乎肯定会遇到文件描述符限制。默认限制为 2560(旧限制为 256,但我认为他们在某个时候将其 x10)。我相当确定你可以设置的最高值是 10,000。

    我不知道您是否能够通过这种方式从一台机器上同时获得一百万个连接。您可能想尝试混合使用进程和 goroutine:10k 个进程,每个进程 1000 个 goroutine,但如果您遇到系统范围的限制,我不会感到惊讶。

    为了得到你想要的,我相信你需要限制速率(使用缓冲的通道信号量),这样如果目标只是达到目标,你就不会同时建立超过数千个连接尽可能简单地从一台主机(和一张网卡)获取 API。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-02-27
      • 1970-01-01
      • 2018-11-28
      • 1970-01-01
      相关资源
      最近更新 更多