【发布时间】:2020-05-15 21:14:45
【问题描述】:
我试图通过 confluent go 客户端向 kafka 推送一些消息,但问题是消息需要以 avro 格式推送。在 java springboot 应用程序中也可以轻松实现。
我有一种预感,好像这一切都可以通过 confluent go 客户端实现。虽然我有另一种方法可以通过 confluent rest 代理推送这些消息,但这意味着 3-4 倍的性能损失,我会拒绝这样做。
我尝试用 goAvro 转换 avro 中的消息。虽然我在生产时没有收到任何错误,但数据部分没有以 avro 格式存储。
avroCodec, err := goavro.NewCodec(schemaString)
if err != nil {
log.Panic(err.Error())
}
appointmentByte,_ := json.Marshal(appointment)
native, _, _ := avroCodec.NativeFromTextual(appointmentByte)
binaryValue, _ := avroCodec.BinaryFromNative(nil, native)
var recordValue []byte
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(id))
recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, binaryValue...)
log.Print(recordValue)
key, _ := uuid.NewUUID()
fmt.Print(key.String())
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(key.String()), Value: recordValue}, nil)
【问题讨论】:
-
你能不能试着做一段最小的、单独的代码给我们看。
-
@nilsocket 上面的代码处理消息的编码部分
-
{ "topic": "约会值", "key": "4efcfb26-42cd-11ea-a7f5-3af9d398b113", "value": "\u0000\u0000\u0000\u0000+", “分区”:0,“偏移”:4 }
-
在kafka存储的原始版本的数据就是上面的方式
-
json.Marshal(appointment)创建 json,而不是 Avro...您从哪里获取 ID?
标签: go apache-kafka avro confluent-schema-registry