【发布时间】:2021-12-10 19:30:30
【问题描述】:
背景
生产者生产一些数据并按顺序发送到 Kafka,例如:
{uuid:123 状态:1}
{uuid:123 状态:3}
状态 1 表示开始
状态 3 表示成功
我使用 sarama.NewConsumerGroup(xx, xx, config).Consume(xx, xx, myhandler) 与代码一起消费:
func (h myhandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
key := fmt.Sprintf("%q-%d-%d", msg.Topic, msg.Partition, msg.Offset)
_, err := rdb.RedisClient.Get(h.ctx, key).Result()
if err == redis.Nil {
msgQueue <- msg.Value
sess.MarkMessage(msg, "")
rdb.RedisClient.Set(h.ctx, key, none, 12*time.Hour)
} else if err != nil {
log.Errorln("RedisClient get key error : ", err)
return err
} else {
continue
}
}
return nil
}
msgQueue := make(chan interface{}, 1000)
然后我将 msgQueue 中的值解码为一个结构,并在 mysql 中插入一条记录。
问题
通常,最终数据状态为'3',但我发现有时它是'1'
我发现频道 msgQueue 中的消息顺序不固定。
那么如何保证数据的最终状态是3?
如何解决
我提供的方法不够好,看看如何优化。
conn := &gorm.DB{}
data := &Log{}
if data.Status != 1 {
conn = conn.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "uuid"}},
DoUpdates: clause.AssignmentColumns([]string{"status"}),
})
}
conn.Create(data)
return conn.Error
而且mysql对于uuid有唯一的约束索引。
当数据顺序为{uuid: 123 status: 1}时, {uuid: 123 status: 3},没错。
当数据顺序为{uuid: 123 status: 3}时, {uuid: 123 status: 1},最终状态也是正确的,但是会返回错误Error 1062: Duplicate entry '123' for key 'unique_index_uuid'。
不漂亮。那么我该如何优化或者有其他方法可以做到吗?
【问题讨论】:
标签: mysql go apache-kafka go-gorm sarama