【问题标题】:Bottleneck in Saving 10 M records in Redis DB在 Redis DB 中保存 1000 万条记录的瓶颈
【发布时间】:2021-10-13 03:46:36
【问题描述】:

我有一个用例,我想从 Google BigTable 读取数据并保存到 redis 数据库,团队将进一步将其用于内部处理。

下面提到的代码相同

func (sS *someStruct) GetAndSetDataForCurrentDay() error {

    tm := time.Now()
    currDay := getPrefix(tm) //Some algo to get the prefix for BigTable Keys

    client, err := sS.GetClient("ClientName") // Big Table Initilisation
    tbl := client.Open("TableName")

    var redisRd []class_ad.RedisData //This might be culprit :(

    err = tbl.ReadRows(
        context.Background(),
        bt.PrefixRange(currDay),
        func(row bt.Row) bool {
            data, _ := sS.readAllRowData(row)
            rd, _ := sS.parseAllRows(data)
            redisRd = append(redisRd, rd...) // redisRd is global slice which I feel might be causing bottleneck
            return true
        },
    )
    redisClient, err := redisv2.GetConnectionx("redisConnName") //Redis Coonection Established Outside
    saveToRedis(redisClient, redisRd, time.Now())
    defer redisClient.Close()

    return err
}

func (sS *someStruct) parseAllRows(data []ColumnData) ([]class_ad.RedisData, error) {

    for _, v := range data {
        if v.ColumnName == "SomeColumn1" {
            someValue = string(v.Value)
            continue
        }

        if v.ColumnName == "SomeColumn2" {
            someOtherValue, err = v
            continue
        }
        .
        .
        .
        .
        .
        .
        .
        .
    }
    return []class_ad.RedisData{"key1#Parallal", "Key2#Parallal"}, err
}

        
func (sS *someStruct) readAllRowData(row bt.Row) ([]ColumnData, error) {
    rowData := make([]ColumnData, 0)
    for columnFamilyName, columnFamilyData := range row {
        for _, column := range columnFamilyData {
            singleColumnData := ColumnData{
                Key:              column.Row,
                ColumnFamilyName: columnFamilyName,
                ColumnName:       column.Column,
                Value:            column.Value,
            }
            rowData = append(rowData, singleColumnData)
        }
    }
    return rowData, nil
}
    
func saveToRedis(redisClient *redisv2.Connectionx, rowData []class_ad.RedisData, now time.Time) {
    channel := make(chan bool) //Channel created
    for _, data := range rowData {
        go setKeyDataInRedis(redisClient, data, now, channel)
    }
    success, fail := 0, 0
    for range rowData {
        if <-channel {
            success++
            continue
        }
        fail++
    }
    close(channel)

    log.Printf("Success %v", success)
    log.Printf("Failure %v", fail)
}

func setKeyDataInRedis(redisClient *redisv2.Connectionx, data class_ad.RedisData, now time.Time, channel chan bool) {
    redisClient.ExpireAt(data.Key, endDate.Unix())
    if redisClient.HMSet(data.Key, map[string]string{
        "COLUMN_1": strconv.FormatFloat(data.Column1, 'f', -1, 64),
        "COLUMN_2": strconv.FormatFloat(data.Conversion, 'f', -1, 64),
        .
        .
        .
        .
        "CREATION_DATE":     now.String(),
        "MODIFICATION_DATE": now.String()}) != nil {
        channel <- false
        return
    }
    channel <- true
}

我们每天阅读的记录数约为 1000 万条。上面的代码遵循非常高的 CPU 使用率,导致服务被杀死。该行的任何解决方案。在 Redis DB 中批量保存数据。

【问题讨论】:

  • “非常高的 CPU 使用率”是高效运行繁重工作负载的自然结果。这不是一个问题。听起来像“导致服务被杀死”的任何东西都是这种情况下的问题。

标签: go redis slice


【解决方案1】:

当前的设计不是最理想的。我看到两个问题:

  1. 您在redisRd 下存储了超大的列表,因为您在开始写入redis 之前读取了所有行。这需要大量的内存。已经提供的代码不能证明需要同时在内存中存储所有行。
  2. saveToRedis 中的 For 循环在您迭代 rowData 的地方几乎同时生成数百万个 goroutine。他们都将开始尝试与redis进行通信。客户端配置可能会保护您的服务器免受大量连接错误的影响。但这并没有改变这样一个事实,即如此数量的 goroutine 需要协调和几 GB 的内存。

上述问题可能还会因为 GC 导致额外的 CPU 使用。

您可以将全局列表 redisRd 替换为 chan class_ad.RedisData。通道应由tbl.ReadRows 回调缓冲和写入。在开始从 BigQuery 表中读取数据之前,您应该设置一些 goroutines(可以从 50 个开始),这些 goroutines 将负责从该通道读取数据并将它们写入 redis。这样的 goroutine 的数量将定义并发执行的集合操作的数量。当然,这种操作的结果不应该返回给tbl.ReadRows,而是发送到另一个频道。你也可以pipelining 来减少发送到 redis 的数据包数量。所有这些更改应该会大大减少内存使用并加快整个过程。以下是我对其外观的变化:

package main

import (
    "context"
    "log"
    "sync"
)

type (
    // Row is a structure returned by big query client
    Row                struct{}
    bigQueryRepository interface {
        ReadRows(ctx context.Context, callback func(Row) bool) error
    }
    BigQueryReader struct {
        client bigQueryRepository
    }
    // Record is a structure used by the redis repository
    Record      struct{}
    RedisWriter struct{}

    RowsWriter interface {
        Write(ctx context.Context, row []Row) error
    }
    WorkDistributor struct {
        OnError func(err error)
        Workers []RowsWriter
        rows    chan Row
    }
)

func (r BigQueryReader) ReadRows(ctx context.Context, writer RowsWriter) error {
    return r.client.ReadRows(ctx, func(row Row) bool {
        if err := writer.Write(ctx, []Row{row}); err != nil {
            // Since there is only one goroutine which reads the data from bigquery,
            // translation to Record (potentially heavy operation) is done by workers. 
            // There are multiple workers, so they can make better use of multiple CPU cores. 
            log.Printf("Error writing a row: %s", err)
            return false
        }
        return true
    })
}

func (r RedisWriter) convertRowToRecord(row Row) []Record {
    // convert row to records
    return nil
}

// Write multiple rows. Method guarantees that either all rows are written or error occurs.
func (r RedisWriter) Write(ctx context.Context, rows []Row) error {
    // Use MULTI / EXEC commands to simulate transaction since we want to write all rows or none.
    // Send MULTI to redis
    for i := range rows {
        records := r.convertRowToRecord(rows[i])
        for j := range records {
            // Send SET operations using pipeline
            _ = j
        }
    }
    // Send EXEC to redis
    return nil
}

func (w WorkDistributor) Write(ctx context.Context, rows []Row) error {
    for i := range rows {
        w.rows <- rows[i]
    }
    return nil
}

func (w WorkDistributor) Run() {
    wg := sync.WaitGroup{}
    for i := 0; i < len(w.Workers); i++ {
        wg.Add(1)
        go func(idx int, worker RowsWriter) {
            defer wg.Done()
            for row := range w.rows {
                // Instead of writing single row there could be some batching mechanism to gather multiple rows together and pass them to worker.
                err := worker.Write(context.Background(), []Row{row})
                if err != nil {
                    w.OnError(err)
                }
            }
        }(i, w.Workers[i])
    }
    wg.Wait()
}

const numberOfWorkers = 50

func main() {
    wrks := make([]RowsWriter, numberOfWorkers)
    for i := 0; i < numberOfWorkers; i++ {
        wrks[i] = RedisWriter{}
    }
    writer := WorkDistributor{Workers: wrks, OnError: func(err error) {
        log.Printf("Error writing to redis: %s", err)
    }, rows: make(chan Row, numberOfWorkers)}
    bigQuery := BigQueryReader{}
    err := bigQuery.ReadRows(context.Background(), writer)
    if err != nil {
        log.Fatal(err)
    }
}

有 3 个主要元素:行读取器、行写入器和工作分配器。多亏了这一点,您可以轻松地为每个元素编写单元测试。

值得一提的是,高 CPU 使用率并不意味着有问题。您应该监控 CPU 是用于执行应用程序逻辑(这很好)还是用于 GC、大量内存分配等,这可能表明您的应用程序可以优化。

【讨论】:

  • 谢谢@Jaroslaw。我有个小主意。你能用一些虚拟代码 sn-p 来丰富你的答案吗?
  • 我添加了一些示例。如果您有任何问题,请随时提出。
  • 管道是我无法使用的,因为一些基础设施限制。我在想是否可以在 tbl.ReadRows( 内使用批处理而不是 redisData。 WDYT?
  • 您能否详细说明为什么不能使用管道? tbl.ReadRows 中的批处理有什么帮助?
  • 对于 Redis,我们在为 Redis 连接创建的包装器中避免了这种情况。所以我排除了这一点。用于批处理。我的意思是,一旦批次计数达到,我就会为这些记录调用 Redis save。
猜你喜欢
  • 1970-01-01
  • 2013-01-25
  • 1970-01-01
  • 2019-09-14
  • 2011-09-21
  • 1970-01-01
  • 1970-01-01
  • 2014-07-09
  • 2017-11-13
相关资源
最近更新 更多