【问题标题】:Parsing array of JSON objects into individual documents将 JSON 对象数组解析为单个文档
【发布时间】:2020-07-31 14:53:53
【问题描述】:

我正在收集代理生成的日志。它会生成一个较大的 JSON 输出,我需要将其分解为较小的 JSON 文档并使用 sarama 写入 kafka。由于 kafka 消息的最大大小限制,我在分解成几个单独的 JSON 文档时遇到了问题。任何建议将不胜感激。日志消息没有任何固定字段或数据类型,除了指示日志活动的日期/时间字段

示例 #1

[{"date":1596206786.847531,"rand_value":11885153394315023285},{"date":1596206787.847446,"rand_value":6208802038498064748},{"date":1596206788.847526,"rand_value":932964293334035461},{"date":1596206789.847568,"rand_value":13217490172547025909}]

示例 2

[{"date":1596206786.847743,"cpu_p":0,"user_p":0,"system_p":0,"cpu0.p_cpu":0,"cpu0.p_user":0,"cpu0.p_system":0,"cpu1.p_cpu":0,"cpu1.p_user":0,"cpu1.p_system":0,"cpu2.p_cpu":0,"cpu2.p_user":0,"cpu2.p_system":0,"cpu3.p_cpu":0,"cpu3.p_user":0,"cpu3.p_system":0,"cpu4.p_cpu":0,"cpu4.p_user":0,"cpu4.p_system":0,"cpu5.p_cpu":0,"cpu5.p_user":0,"cpu5.p_system":0,"cpu6.p_cpu":0,"cpu6.p_user":0,"cpu6.p_system":0,"cpu7.p_cpu":0,"cpu7.p_user":0,"cpu7.p_system":0},{"date":1596206787.847689,"cpu_p":1.25,"user_p":0.75,"system_p":0.5,"cpu0.p_cpu":2,"cpu0.p_user":1,"cpu0.p_system":1,"cpu1.p_cpu":1,"cpu1.p_user":0,"cpu1.p_system":1,"cpu2.p_cpu":2,"cpu2.p_user":1,"cpu2.p_system":1,"cpu3.p_cpu":3,"cpu3.p_user":2,"cpu3.p_system":1,"cpu4.p_cpu":1,"cpu4.p_user":0,"cpu4.p_system":1,"cpu5.p_cpu":1,"cpu5.p_user":1,"cpu5.p_system":0,"cpu6.p_cpu":2,"cpu6.p_user":2,"cpu6.p_system":0,"cpu7.p_cpu":0,"cpu7.p_user":0,"cpu7.p_system":0},{"date":1596206788.847754,"cpu_p":0.75,"user_p":0.5,"system_p":0.25,"cpu0.p_cpu":0,"cpu0.p_user":0,"cpu0.p_system":0,"cpu1.p_cpu":1,"cpu1.p_user":0,"cpu1.p_system":1,"cpu2.p_cpu":2,"cpu2.p_user":1,"cpu2.p_system":1,"cpu3.p_cpu":0,"cpu3.p_user":0,"cpu3.p_system":0,"cpu4.p_cpu":0,"cpu4.p_user":0,"cpu4.p_system":0,"cpu5.p_cpu":1,"cpu5.p_user":1,"cpu5.p_system":0,"cpu6.p_cpu":1,"cpu6.p_user":0,"cpu6.p_system":1,"cpu7.p_cpu":1,"cpu7.p_user":0,"cpu7.p_system":1},{"date":1596206789.847805,"cpu_p":0.8750000000000001,"user_p":0.5,"system_p":0.375,"cpu0.p_cpu":1,"cpu0.p_user":0,"cpu0.p_system":1,"cpu1.p_cpu":1,"cpu1.p_user":1,"cpu1.p_system":0,"cpu2.p_cpu":2,"cpu2.p_user":1,"cpu2.p_system":1,"cpu3.p_cpu":0,"cpu3.p_user":0,"cpu3.p_system":0,"cpu4.p_cpu":1,"cpu4.p_user":1,"cpu4.p_system":0,"cpu5.p_cpu":0,"cpu5.p_user":0,"cpu5.p_system":0,"cpu6.p_cpu":2,"cpu6.p_user":2,"cpu6.p_system":0,"cpu7.p_cpu":0,"cpu7.p_user":0,"cpu7.p_system":0}]
package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "os"
)


func main() {

    ibytes, err := ioutil.ReadFile("hello.json")
    if err != nil {
        fmt.Println(err)
        os.Exit(-1)
    }

    var msgs []map[string]interface{}

    err = json.Unmarshal(ibytes, &msgs)
    if err != nil {
        fmt.Println("Serialization Error", err)
        os.Exit(-1)
    }

    for _,msg:=range msgs {
       fmt.Println("%s",msg)
    }
}

我可以遍历单个消息,但不能以友好的格式写入 kafka。

【问题讨论】:

  • 到目前为止您尝试过什么?您的问题是关于如何分解大消息?
  • @BurakSerdar 我已经用我的示例程序更新了问题
  • @BurakSerdar 是的,我的主要问题是分解为保留 json 结构的卡夫卡友好方式
  • 什么是“Kafka友好”?
  • @Volker 我在打印单个消息时看到没有大括号或 json 相似性。我在构造 Producer Message 中使用 sarama.StringEncoder。那失败了

标签: json go sarama


【解决方案1】:

我设法使用下面的代码自己找到了解决方案

var PlaceHolder  []interface{}
    err=json.Unmarshal(dbytes,&PlaceHolder)
    if err!=nil {
       return errors.New(fmt.Sprintf("Error during JSON Unmarshalling (%s) ",err))
        }

    for _,doc:=range PlaceHolder {
        event,_:=json.Marshal(doc)
        if err!=nil{
             log.Println("Skipping: Error during JSON Marshaling (%s) ",err)
         continue
         }
         KafkaMessage:= &sarama.ProducerMessage{
                Topic: this.Topic,
                Value: sarama.StringEncoder(event),
            }
        msgs=append(msgs,KafkaMessage)
    }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-22
    • 2020-05-02
    • 1970-01-01
    • 1970-01-01
    • 2014-12-02
    • 2014-08-21
    相关资源
    最近更新 更多