【问题标题】:Kafka Connect: Convert message from bytes to JsonKafka Connect:将消息从字节转换为 Json
【发布时间】:2020-12-15 03:29:12
【问题描述】:

我正在尝试使用 Google PubSub 源连接器从我的谷歌云获取数据到 kafka。我确实得到了数据,但消息以字节的形式出现。我提到了here,如上所述,我使用了 JSON 转换器来更改它。

这是我的连接器代码部分:

name=CPSSourceConnector
connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector
tasks.max=10
kafka.topic=test-topic
kafka.topic.replication.factor=1
kafka.key.attribute=message
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
cps.subscription=test-sub
cps.project=sensor-alpha

这就是我在我的 kafka 中得到的:

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"bytes",
            "optional":false,
            "field":"message"
         },
         {
            "type":"string",
            "optional":false,
            "field":"subFolder"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceRegistryLocation"
         },
         {
            "type":"string",
            "optional":false,
            "field":"projectId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceNumId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceRegistryId"
         }
      ],
      "optional":false
   },
   "payload":{
      "message":"eyJzZW5zb3JfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMtMzAyIiwgInRfY2Vsc2l1cyI6IDEwLCAicmVnaXN0cnlfaWQiOiAiYmFsZW5hLXJlZ2lzdHJ5IiwgInByZXNzdXJlIjogMTAsICJ0aW1lc3RhbXAiOiAxNTk4NDM2NTk3LjQxNTEwNDYsICJkZXZpY2VfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMiLCAic3RyaW5nX2JhdHRlcnkiOiAiYmF0dGVyeV9ub3JtYWwiLCAic3RyaW5nX2luZmxhdGUiOiAidGlyZV9vdmVyX2luZmxhdGVkIn0=",
      "subFolder":"",
      "deviceId":"deviceid",
      "deviceRegistryLocation":"region_value",
      "projectId":"projectid",
      "deviceNumId":"device_num_value",
      "deviceRegistryId":"registryid"
   }
}

即使在提供连接器之后,我也会以字节形式收到详细信息。我应该做些什么来将其转换为 json 格式?

【问题讨论】:

    标签: java apache-kafka apache-kafka-connect google-cloud-pubsub


    【解决方案1】:

    Cloud Pub/Sub Kafka 连接器不会检查或转换它接收到的消息中的数据;它只是将数据字段作为字节传递,这是 PubsubMessage 中字段的类型。目前没有办法让连接器本身读取消息的内容并将其转换为 JSON。

    【讨论】:

    猜你喜欢
    • 2020-09-09
    • 2018-03-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-23
    • 1970-01-01
    • 2021-08-09
    相关资源
    最近更新 更多