【问题标题】:JSON column as Key in kafka producerJSON 列作为 kafka 生产者中的 Key
【发布时间】:2020-04-08 14:18:33
【问题描述】:

正如我们所知,我们可以向 kafka 生产者发送一个密钥,该密钥在内部进行哈希处理,以查找主题数据中的哪个分区。 我有一个 producer,我在其中发送 JSON 格式的数据。

kafka-console-producer --broker-list 127.0.0.1:9092 --topic USERPROFILE << EOF 
{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", "countrycode":"IN", "rating":4.9 }
{"user_id" : 101, "firstname":"eli","lastname":"eli", "countrycode":"GB", "rating":3.0 }
EOF

现在我想在发送数据时使用“countrycode”作为我的密钥。 在普通分隔数据中,我们可以指定 2 个参数:

--property "parse.key=true" 
--property "key.separator=:

但是发送 JSON sata 的时候怎么做呢。

我正在使用 confluent 的 Kafka 的 python API,如果我必须根据函数分类来编写任何东西来实现这一点,如果你能用 python 说出来,我将不胜感激。

【问题讨论】:

    标签: python apache-kafka kafka-producer-api confluent-platform


    【解决方案1】:

    JSON 只是一个字符串。控制台生产者不解析 JSON,只有 Avro 控制台生产者会。

    我会避免使用 key.separator=:,因为 JSON 包含 :。您可以使用| 字符代替,然后您只需输入

    countrycode|{"your":"data"}
    

    在 Python 中,the produce function takes a key, yes。您可以像这样解析数据,以便为键提取值。

    key = 'countrycode'
    records = [{"user_id" : 100, "firstname":"Punit","lastname":"Gupta", key:"IN", "rating":4.9 },
               {"user_id" : 101, "firstname":"eli","lastname":"eli", key:"GB", "rating":3.0 }
    ]
    
    import json
    for r in records:
        producer.produce('topic', key=r[key], value=json.dumps(r))
        # first record will send a record containing ('IN', {  ... 'countrycode':'IN'})
    

    【讨论】:

    • 你能给我一个来自我上面的生产者的例子吗?
    • 您需要将数据从值复制到键中。在控制台生成器中生成 JSON 时,如果没有一些额外的 bash 函数,您将无法解析它
    • 我不确定我是否理解这个问题...无论如何,您都在使用 CPU。您的生产者不应与代理运行在同一台机器上,因此 CPU 在实际场景中会有所不同。
    • 那是 Kafka Connect 下的 Java ......我仍然不确定我是否遵循您要问的问题。但是,如果您想使用它,那么您可以生成一个没有键的主题,然后在将数据写入 Connect Sink 之前使用 Connect 将字段提取到键。
    • @abb 我没有说这是不可能的。我说应该避免--key.separator=':'。版本无关紧要。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-01-15
    • 2022-01-15
    • 1970-01-01
    • 2023-03-22
    • 2019-05-09
    • 2018-05-05
    • 1970-01-01
    相关资源
    最近更新 更多