【问题标题】:How can I consume message from kafka in order?如何按顺序使用来自 kafka 的消息?
【发布时间】:2021-12-10 19:30:30
【问题描述】:

背景

生产者生产一些数据并按顺序发送到 Kafka,例如:

{uuid:123 状态:1}

{uuid:123 状态:3}

状态 1 表示开始

状态 3 表示成功

我使用 sarama.NewConsumerGroup(xx, xx, config).Consume(xx, xx, myhandler) 与代码一起消费:

func (h myhandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {

        key := fmt.Sprintf("%q-%d-%d", msg.Topic, msg.Partition, msg.Offset)
        _, err := rdb.RedisClient.Get(h.ctx, key).Result()
        if err == redis.Nil {
            msgQueue <- msg.Value
            sess.MarkMessage(msg, "")
            rdb.RedisClient.Set(h.ctx, key, none, 12*time.Hour)
        } else if err != nil {
            log.Errorln("RedisClient get key error : ", err)
            return err
        } else {
            continue
        }

    }
    return nil
}

msgQueue := make(chan interface{}, 1000)

然后我将 msgQueue 中的值解码为一个结构,并在 mysql 中插入一条记录。

问题

通常,最终数据状态为'3',但我发现有时它是'1'

我发现频道 msgQueue 中的消息顺序不固定。

那么如何保证数据的最终状态是3

如何解决

我提供的方法不够好,看看如何优化。

    conn := &gorm.DB{}
    data := &Log{}
    if data.Status != 1 {
        conn = conn.Clauses(clause.OnConflict{
            Columns:   []clause.Column{{Name: "uuid"}},
            DoUpdates: clause.AssignmentColumns([]string{"status"}),
        })
    }
    conn.Create(data)
    return conn.Error

而且mysql对于uuid有唯一的约束索引。

当数据顺序为{uuid: 123 status: 1}时, {uuid: 123 status: 3},没错。

当数据顺序为{uuid: 123 status: 3}时, {uuid: 123 status: 1},最终状态也是正确的,但是会返回错误Error 1062: Duplicate entry '123' for key 'unique_index_uuid'

不漂亮。那么我该如何优化或者有其他方法可以做到吗?

【问题讨论】:

    标签: mysql go apache-kafka go-gorm sarama


    【解决方案1】:

    这取决于主题分区。 Kafka 不提供主题内的排序保证,仅在分区内提供。

    换句话说,如果你发送消息 A,然后消息 B 到分区 0,那么顺序将是:首先 A,然后 B。但如果它们最终在不同的分区上在将 A 写入其分区之前,可能会发生 B 已写入其分区的情况。

    这是来自 Confluent 网站的引述:

    Kafka 仅提供分区内记录的总排序,而不是主题中不同分区之间的总排序。对于大多数应用程序来说,按分区排序与按键分区数据的能力相结合就足够了。但是,如果您需要对记录进行总排序,这可以通过只有一个分区的主题来实现,尽管这意味着每个消费者组只有一个消费者进程。

    Link

    【讨论】:

    • 感谢您告诉我为什么订单不固定。但我真正的问题是如何确保数据的最终状态为 3 ?我提供的方法不够好,看看如何优化。
    • 我在我的问题下添加了。请帮忙!
    • 所以基本上你的问题与Kafka无关,它与你的业务逻辑有关。您需要考虑如何处理跳过消息状态的情况(例如,如果您收到状态为 3 的消息,而状态 1 从未发送过怎么办)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-16
    • 2018-03-19
    • 1970-01-01
    • 2017-06-19
    • 1970-01-01
    相关资源
    最近更新 更多