【问题标题】:Set message key of Kafka topic from Apache nifi从 Apache nifi 设置 Kafka 主题的消息密钥
【发布时间】:2021-06-24 10:53:26
【问题描述】:

我在 Apache Nifi 中建立了一个简单的管道,它从 twitter 中提取一些推文并将它们转储到 kafka 主题中。推文被转储到主题中而没有错误,但我无法弄清楚为什么主题消息的键未设置。 我使用“EvaluateJSONPath”处理器从通过“GetTwitter”处理器提取的 JSON 中设置流文件的 属性。 如果我暂停并看到进入“publishKafkaRecord”处理器的队列,我可以看到设置为 kafka 消息键的属性在那里,所以这似乎有效:

在我的“publishKafkaRecord”处理器中,我相应地设置了 Message 键字段:

在运行流文件并查看主题消息的内容后,每条消息的 key 值都设置为 null。我在配置处理器时是否遗漏了一些重要的事情?

更新:还尝试在消息键属性中引用任何其他流文件属性,但每条消息的键仍设置为 null。

【问题讨论】:

标签: apache-kafka apache-nifi


【解决方案1】:

如果您阅读documentation for the PublishKafkaRecord 处理器,您会发现 Record 处理器没有从 FlowFile 属性中获取键值,而是从 Record 中的一个字段

输入记录中应用作 Kafka 消息键的字段的名称。 支持表达式语言:true(将使用流文件属性和变量注册表进行评估)

这有点令人困惑,因为它确实提到了支持 EL - 但它仍然希望您的 EL 的结果是记录中的 字段名称,它将从中获得价值。

例如,您可以将Message Key Field 值设置为kafka.key,它将在每个记录中查找名为kafka.key字段,并将其值用作键。它不是在寻找属性。

现在,如果您将Message Key Field 设置为${kafka.key},它将查找名为kafka.key 的属性,并在记录中使用此属性的值作为字段的名称它应该使用。

例如,如果您有一个名为kafka.key 的属性,其值为mykey,并且您将Message Key Field 设置为${kafka.key},那么它将在记录中查找字段调用mykey 作为消息的键。

因此,在您的配置中,您没有将键设置为135792...etc.,而是在每个记录中查找名为135792...etc.字段,并尝试将该字段的值用作关键...这是null,因为您的记录中没有名为135792...etc. 的字段。

因此,您可以在PublishKafkaRecord 之前使用UpdateRecord - 将名为kafka.key 的字段设置为属性kafka.key 的值。然后将您的 PublishKafkaRecord 更改为 kafka.key

但是,在我看来,每个 FlowFile 有 1 条记录,因此可能可以对您的流程进行一些优化,因为 Records 旨在与包含许多 Records 的 FlowFile 一起使用。

【讨论】:

  • 感谢您的精彩解释,我现在非常清楚。确实每个 FlowFile 有 1 条记录,所以我决定使用 PublishKafka 而不是 PublishKafkaRecord。我能够毫无问题地设置消息密钥。再次感谢,因为我被困了好几天。
  • 谢谢,这个答案很有帮助。我可以轻松地为我的记录设置消息键。一个快速的问题:Nifi 是否也可以根据消息键属性 + 默认分区器为我的主题自行分区。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-03-14
  • 2018-07-07
  • 2020-02-16
  • 2021-01-14
相关资源
最近更新 更多