【问题标题】:GoLang sequential goroutinesGoLang 顺序 goroutine
【发布时间】:2022-01-16 11:49:20
【问题描述】:

我是 golang 的新手,并且有一个用例,其中对一个类型的值的操作必须以顺序方式运行,而对其他类型的值的操作可以同时运行。

  1. 假设数据来自流连接(按顺序)
    key_name_1, value_1 
    key_name_2, value_2
    key_name_1, value_1
    
  2. 现在key_name_1key_name_2可以同时被goroutine操作了。
  3. 但是由于下一个流式值(第 3 行)又是 key_name_1,所以这个操作应该只在前面的操作(第 1 行)已经完成的情况下由 goroutine 处理,否则它应该等待第一个操作完成才可以应用操作。 为了便于讨论,我们可以假设操作很简单 将新值添加到之前的值。

在 golang 中以尽可能高的性能实现这一目标的正确方法是什么?


确切的用例是数据库更改流式传输到队列中,现在如果值发生更改,重要的是在另一个数据库上对相同序列应用操作,否则一致性将受到影响。冲突很少见,但可能会发生。

【问题讨论】:

  • 如果工作只是将新值添加到以前的值,那么同步成本将淹没实际工作。在这种情况下,按顺序执行所有操作是最简单且性能最高的设计。
  • 这只是一个例子,工作很复杂,而且事件量很大,所以没有理由顺序执行所有事件,只有相同类型的事件应该顺序执行。否则处理的滞后将变得巨大。
  • 在没有完整上下文的情况下,我能想到的最简单的方法是为每个操作创建一个通道,但是,如果键/操作的数量不受限制,这不是一个可扩展的解决方案,也是最好的方法将是上面的评论。
  • 为了“尽可能高的性能”,我们需要更多的上下文。冲突发生的频率、每秒请求的数量、处理请求需要多长时间等。即使这样,最高性能通常也是凭经验实现的。

标签: go concurrency streaming goroutine


【解决方案1】:

作为给定键的互斥性的简单解决方案,您可以只使用引用计数锁的锁定映射。这不是高负载的最佳选择,但在您的情况下可能就足够了。

type processLock struct {
    mtx      sync.Mutex
    refcount int
}

type locker struct {
    mtx   sync.Mutex
    locks map[string]*processLock
}

func (l *locker) acquire(key string) {
    l.mtx.Lock()
    lk, found := l.locks[key]
    if !found {
        lk = &processLock{}
        l.locks[key] = lk
    }
    lk.refcount++
    l.mtx.Unlock()
    lk.mtx.Lock()
}

func (l *locker) release(key string) {
    l.mtx.Lock()
    lk := l.locks[key]
    lk.refcount--
    if lk.refcount == 0 {
        delete(l.locks, key)
    }
    l.mtx.Unlock()
    lk.mtx.Unlock()
}

在处理密钥之前调用acquire(key),完成后调用release(key)

Live demo.

警告!上面的代码保证排他性,但不保证顺序。要按顺序解锁,您需要FIFO mutex

【讨论】:

  • 这个好干净
  • github.com/golang/go/issues/20135你知道这个shceme是否会重用内存吗?
  • @mh-cbon 实际上并没有那么糟糕,在 10^9 次地图添加/删除(使用this test)后,我得到了相当稳定的结果。但在最坏的情况下,可以定期重建地图,例如在delete之后。
  • 听到这个消息真是太好了。我会记住这一点进行测试(有时......)。是的,我同意解决方案。它完美地完成了答案。非常感谢您花费的时间。
【解决方案2】:

只是关于固定大小的工人数量的消息路由的建议。

IDK 如果它是最快的,我猜不是,但它很简单且可重新配置(通过一些努力可以使其 JiT 可调整大小)。

我确实相信哈希部分可以优化,并且存在一些巧妙的技术。否则你也可以通过增加通道缓冲区在内存中堆积一些项目,再次简单。

无论如何,https://go.dev/play/p/fnxNJ9VZK8q

// You can edit this code!
// Click here and start typing.
package main

import (
    "fmt"
    "hash/fnv"
    "sync"
)

func main() {
    events := []map[string]interface{}{
        map[string]interface{}{
            "key1": "v",
            "key2": "v",
            "key3": "v",
        },
        map[string]interface{}{
            "key1": "v",
            "key2": "v",
            "key3": "v",
        },
    }

    type workerWork struct {
        work     interface{}
        workerID int
    }

    workers := 4
    inputs := []chan interface{}{}
    output := make(chan workerWork)

    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        i := i
        input := make(chan interface{})
        inputs = append(inputs, input)
        wg.Add(1)
        go func() {
            defer wg.Done()
            for v := range input {
                output <- workerWork{work: v, workerID: i}
            }
        }()
    }

    go func() {
        wg.Wait()
        close(output)
    }()

    go func() {
        h := fnv.New32a()
        for _, batch := range events {
            for k, v := range batch {
                h.Write([]byte(k))
                ui := int(h.Sum32())
                d := ui % workers
                inputs[d] <- map[string]interface{}{k: v}
                h.Reset()
            }
        }
        for _, i := range inputs {
            close(i)
        }
    }()

    for work := range output {
        fmt.Printf("%#v\n", work)
    }

}

【讨论】:

    猜你喜欢
    • 2018-08-12
    • 2015-12-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多