【问题标题】:How to wait for the implementation of如何等待执行
【发布时间】:2015-12-22 14:17:01
【问题描述】:

我有一个大的日志文件,你想并行分析。

我有这个代码:

package main

import (
    "bufio"
    "fmt"
    "os"
    "time"
)

func main() {
    filename := "log.txt"
    threads := 10

    // Read the  file
    file, err := os.Open(filename)
    if err != nil {
        fmt.Println("Could not open file with the database.")
        os.Exit(1)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)

    // Channel for strings
    tasks := make(chan string)

    // Run the threads that catch events from the channel and understand one line of the log file
    for i := 0; i < threads; i++ {
        go parseStrings(tasks)
    }

    // Start a thread load lines from a file into the channel
    go getStrings(scanner, tasks)

    // At this point I have to wait until all of the threads executed
    // For example, I set the sleep
    for {
        time.Sleep(1 * time.Second)
    }
}

func getStrings(scanner *bufio.Scanner, tasks chan<- string) {
    for scanner.Scan() {
        s := scanner.Text()
        tasks <- s
    }
}

func parseStrings(tasks <-chan string) {
    for {
        s := <-tasks
        event := parseLine(s)
        fmt.Println(event)
    }
}

func parseLine(line string) string {
    return line
}

实际上,当我等待所有线程结束时? 有人建议我创建一个单独的线程,我将在其中添加结果,但是如何添加?

【问题讨论】:

标签: multithreading go concurrency


【解决方案1】:

使用管道模式和“扇出/扇入”模式:

package main

import (
    "bufio"
    "fmt"
    "strings"
    "sync"
)

func main() {
    file := "here is first line\n" +
        "here is second line\n" +
        "here is line 3\n" +
        "here is line 4\n" +
        "here is line 5\n" +
        "here is line 6\n" +
        "here is line 7\n"
    scanner := bufio.NewScanner(strings.NewReader(file))

    // all lines onto one channel
    in := getStrings(scanner)

    // FAN OUT
    // Multiple functions reading from the same channel until that channel is closed
    // Distribute work across multiple functions (ten goroutines) that all read from in.
    xc := fanOut(in, 10)

    // FAN IN
    // multiplex multiple channels onto a single channel
    // merge the channels from c0 through c9 onto a single channel
    for n := range merge(xc) {
        fmt.Println(n)
    }
}

func getStrings(scanner *bufio.Scanner) <-chan string {
    out := make(chan string)
    go func() {
        for scanner.Scan() {
            out <- scanner.Text()
        }
        close(out)
    }()
    return out
}

func fanOut(in <-chan string, n int) []<-chan string {
    var xc []<-chan string
    for i := 0; i < n; i++ {
        xc = append(xc, parseStrings(in))
    }
    return xc
}

func parseStrings(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        for n := range in {
            out <- parseLine(n)
        }
        close(out)
    }()
    return out
}

func parseLine(line string) string {
    return line
}

func merge(cs []<-chan string) <-chan string {
    var wg sync.WaitGroup
    wg.Add(len(cs))

    out := make(chan string)
    for _, c := range cs {
        go func(c <-chan string) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Check it out on the playground.

【讨论】:

    【解决方案2】:

    var wg sync.WaitGroup

    当启动每个 goroutine 时:

    wg.Add(1)
    

    当 goroutine 工作完成时递减计数器

    wg.Done()
    

    因此,而不是

    for {
        time.Sleep(1 * time.Second)
    }
    

     wg.Wait()
    

    【讨论】:

      【解决方案3】:

      只需使用sync.WaitGroup

      package main
      
      import(
          "sync"
      )
      
      func stuff(wg *sync.WaitGroup) {
          defer wg.Done() // tell the WaitGroup it's done
          /* stuff */
      }
      
      func main() {
          count := 50
          wg := new(sync.WaitGroup)
          wg.Add(count) // add number of gorutines to the WaitGroup
          for i := 0; i < count; i++ {
              go stuff(wg)
          }
          wg.Wait() // wait for all
      }
      

      Play

      【讨论】:

        猜你喜欢
        • 2016-06-02
        • 2012-05-03
        • 1970-01-01
        • 2014-03-19
        • 1970-01-01
        • 2019-12-30
        • 1970-01-01
        • 2023-03-21
        • 2022-07-01
        相关资源
        最近更新 更多