【问题标题】:How to gracefully handle errors in apache kafka producer如何优雅地处理 apache kafka producer 中的错误
【发布时间】:2021-03-11 04:36:10
【问题描述】:

我有一个电子商务应用程序,每次用户将某些东西添加到购物车时,我都会向 kafka 服务器发送一条消息。我可以发送消息并从客户端使用它,但是,我对错误处理感到好奇。有时,我的 Go 服务器由于网络错误或其他一些原因而失败。添加到购物车功能将是应用程序的重要组成部分,因此我不希望 kafka 生产者无法使用该功能或​​变得依赖它。我尝试通过为 kafka Producer 创建一个单独的函数来将它们分开,我认为 kafka.Produce() 函数是非阻塞的,因此即使失败,用户仍然应该能够将项目添加到购物车。这是一个示例代码(我为 kafka 部分放置了完整代码,但为了便于阅读,我修剪了添加到购物车的实现)。如果出现问题或超过几秒钟的超时时间,有没有办法退出 kafka 功能?因此,添加到购物车功能不会挂起或导致服务器失败。我对 Go 中的通道和并发不是很有经验,所以我无法确定这是否会成为当前设计的问题。

加入购物车

func addToCart(c *context.Context, rw web.ResponseWriter, req *web.Request) {
    cartID := req.PathParams["id"]
    var items []map[string]interface{}
    if err := json.NewDecoder(req.Body).Decode(&items); err != nil {
        errors.Write(rw, 400, "Unable to parse request body JSON or invalid data format.")
        return
    }

    //MAKE SOME OPERATIONS AND SAVE IT TO DATABASE
    cart, jsonErr := saveToDB(c, cartID, items)
    if jsonErr != nil {
        jsonErr.Write(rw)
        return
    }

    webLib.Write204(rw)

    deliveryChan := make(chan kafka.Event)

    kafkaMessage("cart_topic", "sample-cart-event-message", deliveryChan, rw, rq)
    return
}

卡夫卡

func kafkaMessage(topic string, message []byte, deliveryChan chan kafka.Event, rw web.ResponseWriter, req *web.Request) {
    err := c.KafkaProducer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          message,
    }, deliveryChan)

    if err != nil {
        c.Log("error:%s", err)
        return
    }

    e, ok := <-deliveryChan
    if !ok{
        c.Log("Channel is closed for kafka producer")
        return
    }

    m, ok := e.(*kafka.Message)
    if !ok{
        c.Log("There has been an error obtaining the kafka message")
        return
    }

    if m.TopicPartition.Error != nil {
        fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
    } else {
        c.Log("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }
}

【问题讨论】:

    标签: go apache-kafka confluent-platform channel


    【解决方案1】:

    所以发送到 kafka 是异步的,但实际上你是通过等待“成功”消息将其转换为同步函数。

    几个快速选项。

    1:您可以完全忽略异步消息by passing a nil channel 的状态为deliveryChan。然后你真的得到了一个“一劳永逸”的异步模型。听起来这可能就是您正在寻找的。​​p>

    2:你可以在 goroutine 中运行kafkaMessage,只需更改为

    deliveryChan := make(chan kafka.Event)
    go kafkaMessage("cart_topic", "sample-cart-event-message", deliveryChan, rw, rq)
    return
    

    然后您可以在该功能中继续等待消息、日志记录等。如果需要,您甚至可以添加重试!请注意,在这种情况下,您可能会积压 goroutines 等待响应消息/重试等,因为您实际上是将操作作为 goroutines 排队。对于大多数应用程序来说,这不会是一个问题,因为您不会在处理过程中不断落后,但仍然需要密切关注监控!

    这里还有很多其他模式可以遵循,但这些模式的提升率相当低,并为您提供了一些选择。

    【讨论】:

    • 感谢您的回复@turtemonvh!对于第二个选项,我认为这就是我所做的,除了不使用 go 关键字。使用具有相同结构的 go 关键字后,当用户将商品添加到购物车时,应用程序会挂起。如果我从 .Produce() 的参数中删除 deliveryChan,那么消费者会收到消息,但在 kafka 部分中,程序在第 24 行(最后一个 if-else 语句)之前完成。所以我无法检查消息是否成功。对此有什么建议吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-19
    • 1970-01-01
    • 2018-11-10
    • 1970-01-01
    • 2021-05-26
    • 2017-11-10
    • 1970-01-01
    相关资源
    最近更新 更多