【发布时间】:2021-03-11 04:36:10
【问题描述】:
我有一个电子商务应用程序,每次用户将某些东西添加到购物车时,我都会向 kafka 服务器发送一条消息。我可以发送消息并从客户端使用它,但是,我对错误处理感到好奇。有时,我的 Go 服务器由于网络错误或其他一些原因而失败。添加到购物车功能将是应用程序的重要组成部分,因此我不希望 kafka 生产者无法使用该功能或变得依赖它。我尝试通过为 kafka Producer 创建一个单独的函数来将它们分开,我认为 kafka.Produce() 函数是非阻塞的,因此即使失败,用户仍然应该能够将项目添加到购物车。这是一个示例代码(我为 kafka 部分放置了完整代码,但为了便于阅读,我修剪了添加到购物车的实现)。如果出现问题或超过几秒钟的超时时间,有没有办法退出 kafka 功能?因此,添加到购物车功能不会挂起或导致服务器失败。我对 Go 中的通道和并发不是很有经验,所以我无法确定这是否会成为当前设计的问题。
加入购物车
func addToCart(c *context.Context, rw web.ResponseWriter, req *web.Request) {
cartID := req.PathParams["id"]
var items []map[string]interface{}
if err := json.NewDecoder(req.Body).Decode(&items); err != nil {
errors.Write(rw, 400, "Unable to parse request body JSON or invalid data format.")
return
}
//MAKE SOME OPERATIONS AND SAVE IT TO DATABASE
cart, jsonErr := saveToDB(c, cartID, items)
if jsonErr != nil {
jsonErr.Write(rw)
return
}
webLib.Write204(rw)
deliveryChan := make(chan kafka.Event)
kafkaMessage("cart_topic", "sample-cart-event-message", deliveryChan, rw, rq)
return
}
卡夫卡
func kafkaMessage(topic string, message []byte, deliveryChan chan kafka.Event, rw web.ResponseWriter, req *web.Request) {
err := c.KafkaProducer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: message,
}, deliveryChan)
if err != nil {
c.Log("error:%s", err)
return
}
e, ok := <-deliveryChan
if !ok{
c.Log("Channel is closed for kafka producer")
return
}
m, ok := e.(*kafka.Message)
if !ok{
c.Log("There has been an error obtaining the kafka message")
return
}
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
c.Log("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
【问题讨论】:
标签: go apache-kafka confluent-platform channel