【问题标题】:Queuing ajax requests using go routines使用 go 例程对 ajax 请求进行排队
【发布时间】:2016-05-25 02:34:30
【问题描述】:

我有以下代码:

var (
    WorkersNum int = 12
    HTTPAddr string = "127.0.0.1:8080"
    Delay = 3e9
)

var (
    RequestQueue = make(chan Request, 1024)
    WorkerQueue chan chan Request
)

type Request struct {
    Buf []byte
    Delay time.Duration
}

type Worker struct {
    ID          int
    Request     chan Request
    WorkerQueue chan chan Request
    QuitChan    chan bool
}

func main() {
    fmt.Println("Starting the dispatcher")
    StartDispatcher()

    fmt.Println("Registering the handler")
    http.HandleFunc("/", handleRequest)

    fmt.Println("HTTP server listening on", HTTPAddr)
    if err := http.ListenAndServe(HTTPAddr, nil); err != nil {
        fmt.Println(err.Error())
    }
}

func StartDispatcher() {
    WorkerQueue = make(chan chan Request, WorkersNum)

    for i := 0; i < WorkersNum; i++ {
        fmt.Println("Starting worker", i + 1)
        worker := NewWorker(i + 1, WorkerQueue)
        worker.Start()
    }

    go func() {
        for {
            select {
            case request := <-RequestQueue:
                    fmt.Println("Received requeust")
                    go func() {
                        worker := <-WorkerQueue
                        fmt.Println("Dispatching request")
                        worker <- request
                    }()
            }
        }
    }()
}

func NewWorker(id int, workerQueue chan chan Request) Worker {
    worker := Worker{
        ID:          id,
        Request:     make(chan Request),
        WorkerQueue: workerQueue,
        QuitChan:    make(chan bool),
    }
    return worker
}

func (w *Worker) Start() {
    go func() {
        for {
            w.WorkerQueue <- w.Request
            select {
            case request := <-w.Request:
                    fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, request.Delay.Seconds())
                    time.Sleep(request.Delay)
                    writeToFile(request.Buf)
                    fmt.Printf("worker%d: Saved to file!\n", w.ID)
                case <-w.QuitChan:
                    fmt.Printf("worker%d stopping\n", w.ID)
                    return
            }
        }
    }()
}

func handleRequest(w http.ResponseWriter, r *http.Request) {
    // make sure it's POST
    if r.Method != "POST" {
        w.Header().Set("Allow", "POST")
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
    // add cors
    w.Header().Set("Access-Control-Allow-Origin", "*")
    // retrieve
    buf, err := ioutil.ReadAll(r.Body)
    if err != nil {
        //http.Error(w, err, http.StatusBadRequest)
        return
    }
    request := Request{Buf: buf, Delay: Delay}
    RequestQueue <- request
    fmt.Println("Request queued")
}

我对 go 语言和 goroutine 还是很陌生 - 你能帮我理解这段代码是如何工作的吗?

首先我在将 Worker.Request 分配给 Worker.WorkerQueue 的每个工作人员上调用 start() 函数 - 如何将空通道分配给空通道数组?

然后在 StartDispatcher() 中创建等待请求的 goroutine。

当请求到来时,我将它添加到 RequestQueue 变量中。下一步是什么? Start() 函数应该触发,但 case 正在等待 w.Request。未填充,因为它是 RequestQueue 变量发生变化。

你能给我一些简单的解释吗?

【问题讨论】:

    标签: ajax go goroutine


    【解决方案1】:

    而且我不喜欢go func() {...} inside Worker.Start(),IMO Worker.Start() 必须是同步的,那么你必须在 StartDispatcher() 中调用它为go worker.Start()

    它是如何工作的。

    在 StartDispatcher() 中,它在循环中创建工作人员,然后将其输入通道放在 WorkerQueue 缓冲通道上(缓冲通道的工作方式类似于数组,但通道)并阻塞等待请求。然后我们启动一个新的 goroutine 来处理传入的请求:从缓冲通道 WorkerQueue 中选择第一个 worker 的输入通道(worker 变量)并向其发送请求。

    Worker 会捡起它,完成工作,然后进入下一个循环:将他的输入通道放入 WorkerQueue(是的,它是 StartDispatcher() 启动时第一次完成的地方)。

    您可以随时关闭工作人员 QuitChan,工作人员将在 case &lt;-w.QuitChan 情况下终止(从关闭的频道读取立即返回)。

    顺便说一句,您的RequestQueue = make(chan Request, 1024) 也是缓冲通道,因此写入它不会阻塞(除非它已满)。

    希望对你有帮助。

    【讨论】:

    • 谢谢!它对我有很大帮助,但我还有一个小问题 - 我应该在哪里发送 false 到 w.QuitChan?
    • 最好发送至close(w.QuitChan) 而不是发送false。而且,我不知道 =) 你不跟踪/保留指向工人的指针。唯一指向工人的地方是 StartDispatcher() 中的第一个“for”循环。将这些指针保存在一个切片中,稍后您将能够停止它们。
    猜你喜欢
    • 2011-06-14
    • 1970-01-01
    • 2011-03-03
    • 2021-08-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-08-28
    相关资源
    最近更新 更多