【问题标题】:Using kafka-json-schema-console-producer to produce message with a key schema and a value schema使用 kafka-json-schema-console-producer 生成具有键模式和值模式的消息
【发布时间】:2021-11-25 05:30:41
【问题描述】:

我正在尝试使用 kafka-json-schema-console-producer 发布一条消息,其中包含 both 一个键(带有架构)和一个值(带有架构)。不幸的是,我找不到符合我要求的示例。

我可以按照文档发送简单的消息:

kafka-json-schema-console-producer \
  --broker-list localhost:9092 \
  --topic some-topic \
  --property value.schema='
{
  "definitions" : {
    "record:myrecord" : {
      "type" : "object",
      "required" : [ "name", "calories" ],
      "additionalProperties" : false,
      "properties" : {
        "name" : {"type" : "string"},
        "calories" : {"type" : "number"},
        "colour" : {"type" : "string"}
      }
    }
  },
  "$ref" : "#/definitions/record:myrecord"
}' < snacks.txt

两个问题:

  1. 如何添加密钥架构?是否像添加“key.schema”并使用与 value.schema 类似的语法一样简单?
  2. 发送带有键模式和值模式的 JSON 消息的实际命令是什么样的?

【问题讨论】:

    标签: json apache-kafka jsonschema confluent-platform confluent-schema-registry


    【解决方案1】:
    1. 是的,添加--property key.schema。还有key.schema.file 用于磁盘上的jsonschema 文件或key.schema.id 用于已在注册表中的ID 的选项。

    2. 参考源代码示例 - https://github.com/confluentinc/schema-registry/blob/master/json-schema-serializer/src/main/java/io/confluent/kafka/formatter/json/JsonSchemaMessageReader.java#L72

    提取(从 v6.2.0 开始)

     * bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic t1 \
     * --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader \
     * --property schema.registry.url=http://localhost:8081 \
     * --property parse.key=true \
     * --property key.schema='{"type":"string"}' \
     * --property value.schema='{"type":"object","properties":{"f1":{"type":"string"}}}'
     * "type":"string"}]}'
    

    kafka-json-schema-console-producer 是的简写

    kafka-console-producer --line-reader io.confluent.kafka.formatter.JsonSchemaMessageReader
    

    【讨论】:

      【解决方案2】:

      感谢@OneCricketeer 的帮助,我得以完成这项工作。下面是一个完整的示例,它将使用 JSON 模式发送键和值的消息。 (顺便说一句:我展示了一个使用 Docker 的解决方案,但修改它以不使用 Docker 应该很简单)

      请注意,有几件事让我感到困惑:

      • 您将需要“key.parse=true”属性(如果省略,则不会解析密钥,您会收到错误消息,或者在使用消息时密钥不存在)
      • 键定义看起来与值定义相同,但需要与值不同的定义/记录名称(对于值:“definition/record:myrecord”,对于键:“definition/record:mykey”
      docker exec -it schema-registry \
      /usr/bin/kafka-json-schema-console-producer \
      --broker-list http://kafka:9092 \
      --topic source-1 \
      --property key.separator="|" \
      --property value.schema='
      {
        "definitions" : {
          "record:myrecord" : {
            "type" : "object",
            "required" : [ "name", "calories" ],
            "properties" : {
              "name" : {"type" : "string"},
              "calories" : {"type" : "number"},
              "colour" : {"type" : "string"}
            }
          }
        },
        "$ref" : "#/definitions/record:myrecord"
      }' \
      --property parse.key=true \
      --property key.schema='
      {
        "definitions" : {
          "record:mykey" : {
            "type" : "object",
            "required" : [ "id" ],
            "additionalProperties" : false,
            "properties" : {
              "id" : {"type" : "integer"}
            }
          }
        },
        "$ref" : "#/definitions/record:mykey"
      

      还有数据:

      {"id":1} | {"timestamp":"foo", "data":"bar"}
      

      【讨论】:

      • 注意:如果您的密钥不打算有其他字段,则不应使用"type": "object",只需使用"type" : "integer"
      猜你喜欢
      • 2014-12-20
      • 2020-09-15
      • 2020-04-22
      • 2021-12-18
      • 1970-01-01
      • 2021-11-25
      • 1970-01-01
      • 2016-06-13
      • 2021-02-04
      相关资源
      最近更新 更多