【问题标题】:Push messages in avro format via golang to kafka通过 golang 将 avro 格式的消息推送到 kafka
【发布时间】:2020-05-15 21:14:45
【问题描述】:

我试图通过 confluent go 客户端向 kafka 推送一些消息,但问题是消息需要以 avro 格式推送。在 java springboot 应用程序中也可以轻松实现。

我有一种预感,好像这一切都可以通过 confluent go 客户端实现。虽然我有另一种方法可以通过 confluent rest 代理推送这些消息,但这意味着 3-4 倍的性能损失,我会拒绝这样做。

我尝试用 goAvro 转换 avro 中的消息。虽然我在生产时没有收到任何错误,但数据部分没有以 avro 格式存储。

avroCodec, err := goavro.NewCodec(schemaString)

if err != nil {
    log.Panic(err.Error())
}

appointmentByte,_ := json.Marshal(appointment)

native, _, _ := avroCodec.NativeFromTextual(appointmentByte)

binaryValue, _ := avroCodec.BinaryFromNative(nil,  native)

var recordValue []byte

schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(id))

recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, binaryValue...)

log.Print(recordValue)

key, _ := uuid.NewUUID()

fmt.Print(key.String())
p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{
        Topic: &topic, Partition: kafka.PartitionAny},
    Key: []byte(key.String()), Value: recordValue}, nil)

【问题讨论】:

  • 你能不能试着做一段最小的、单独的代码给我们看。
  • @nilsocket 上面的代码处理消息的编码部分
  • { "topic": "约会值", "key": "4efcfb26-42cd-11ea-a7f5-3af9d398b113", "value": "\u0000\u0000\u0000\u0000+", “分区”:0,“偏移”:4 }
  • 在kafka存储的原始版本的数据就是上面的方式
  • json.Marshal(appointment) 创建 json,而不是 Avro...您从哪里获取 ID?

标签: go apache-kafka avro confluent-schema-registry


【解决方案1】:

您可以在 Github 上搜索您的问题的解决方案。它目前不是项目的一部分,但正在开发中

https://github.com/confluentinc/confluent-kafka-go/issues/69

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-10
    • 2019-01-01
    • 2019-07-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多