【发布时间】:2021-10-13 03:46:36
【问题描述】:
我有一个用例,我想从 Google BigTable 读取数据并保存到 redis 数据库,团队将进一步将其用于内部处理。
下面提到的代码相同
func (sS *someStruct) GetAndSetDataForCurrentDay() error {
tm := time.Now()
currDay := getPrefix(tm) //Some algo to get the prefix for BigTable Keys
client, err := sS.GetClient("ClientName") // Big Table Initilisation
tbl := client.Open("TableName")
var redisRd []class_ad.RedisData //This might be culprit :(
err = tbl.ReadRows(
context.Background(),
bt.PrefixRange(currDay),
func(row bt.Row) bool {
data, _ := sS.readAllRowData(row)
rd, _ := sS.parseAllRows(data)
redisRd = append(redisRd, rd...) // redisRd is global slice which I feel might be causing bottleneck
return true
},
)
redisClient, err := redisv2.GetConnectionx("redisConnName") //Redis Coonection Established Outside
saveToRedis(redisClient, redisRd, time.Now())
defer redisClient.Close()
return err
}
func (sS *someStruct) parseAllRows(data []ColumnData) ([]class_ad.RedisData, error) {
for _, v := range data {
if v.ColumnName == "SomeColumn1" {
someValue = string(v.Value)
continue
}
if v.ColumnName == "SomeColumn2" {
someOtherValue, err = v
continue
}
.
.
.
.
.
.
.
.
}
return []class_ad.RedisData{"key1#Parallal", "Key2#Parallal"}, err
}
func (sS *someStruct) readAllRowData(row bt.Row) ([]ColumnData, error) {
rowData := make([]ColumnData, 0)
for columnFamilyName, columnFamilyData := range row {
for _, column := range columnFamilyData {
singleColumnData := ColumnData{
Key: column.Row,
ColumnFamilyName: columnFamilyName,
ColumnName: column.Column,
Value: column.Value,
}
rowData = append(rowData, singleColumnData)
}
}
return rowData, nil
}
func saveToRedis(redisClient *redisv2.Connectionx, rowData []class_ad.RedisData, now time.Time) {
channel := make(chan bool) //Channel created
for _, data := range rowData {
go setKeyDataInRedis(redisClient, data, now, channel)
}
success, fail := 0, 0
for range rowData {
if <-channel {
success++
continue
}
fail++
}
close(channel)
log.Printf("Success %v", success)
log.Printf("Failure %v", fail)
}
func setKeyDataInRedis(redisClient *redisv2.Connectionx, data class_ad.RedisData, now time.Time, channel chan bool) {
redisClient.ExpireAt(data.Key, endDate.Unix())
if redisClient.HMSet(data.Key, map[string]string{
"COLUMN_1": strconv.FormatFloat(data.Column1, 'f', -1, 64),
"COLUMN_2": strconv.FormatFloat(data.Conversion, 'f', -1, 64),
.
.
.
.
"CREATION_DATE": now.String(),
"MODIFICATION_DATE": now.String()}) != nil {
channel <- false
return
}
channel <- true
}
我们每天阅读的记录数约为 1000 万条。上面的代码遵循非常高的 CPU 使用率,导致服务被杀死。该行的任何解决方案。在 Redis DB 中批量保存数据。
【问题讨论】:
-
“非常高的 CPU 使用率”是高效运行繁重工作负载的自然结果。这不是一个问题。听起来像“导致服务被杀死”的任何东西都是这种情况下的问题。