【问题标题】:NiFi: best flow to insert data from Kafka to Cassandra?NiFi:将数据从 Kafka 插入到 Cassandra 的最佳流程?
【发布时间】:2018-12-12 23:07:07
【问题描述】:

我花了 2 天时间进行研究,现在我需要你们的帮助。先感谢您。

我有以下流程: 1)ConsumeKafka(消息为JSON格式) 2) 评估JsonPath 3) 更新属性 4) AttributesToJson

以上所有流程都可以正常工作,但以下流程无法正常工作: 5)PutCassandraRecord(我需要有关如何配置此处理器的帮助。我知道我的 Cassandra 服务器、端口、键空间、表名,记录读取器是 JsonPathReader)。还有什么??? 6) 添加了控制器服务 - JsonPathReader(这里我需要有关如何配置此记录读取器的帮助)。 7) 我收到以下附件中的异常。我在哪里以及如何获取或配置模式注册表?

我检查了这个问题和答案:Apache Nifi/Cassandra - how to load CSV into Cassandra table

如果我的流程有误,请纠正我。谢谢。

问候, 好想

【问题讨论】:

  • 您的流文件是否有一个名为 valor.vaengine 的属性?
  • 嗨 Bryan,Valor 表示 Cassandra 的键空间名称。 'vaengine' 是表的名称。在我的 JSON 文件中,没有与此键空间和表名相关的属性。
  • 当您使用 ${something} 时,它是一个表达式语言语句,引用名为“某物”的流文件属性。流文件属性与流文件的内容不同,因此需要创建属性 valor.vaengine 就像 Shu 在他的回答中提到的那样
  • 感谢 Bryan 澄清了我的概念。

标签: apache-nifi


【解决方案1】:

multiple ways我们可以配置Record Reader/writer控制器服务

我将尝试解释以下两个架构访问策略

  • 使用“架构名称”属性
  • 使用“架构文本”属性

使用 SchemaText 属性:

在这个访问策略中,处理器会在 VariableRegistry/FlowfileAttributes 中寻找avro.schema 属性(或者)我们可以给schema in the property value

示例:

我已将架构文本属性值作为我的 avro 架构

使用“架构名称”属性:

在此策略处理器中检查Schema Name 属性值${valor.vaengine}(这是一个属性名称),因此我们需要将此属性的值与流文件关联

然后控制器服务使用 ${valor.vaengine} 值使用来自 AvroSchemaRegistry 的适当架构,该架构已被此控制器服务使用。

在您的情况下,您的 flowfile 没有 ${valor.vaengine} 属性,要将此属性添加到 flowfile 使用 UpdateAttribute 处理器添加新属性为

valor.vaengine

<schema_name_in_avroschemaregistry>

使用this 模板了解有关Record Reader/writer 控制器服务的配置/使用的更多详细信息


您正在为此控制器服务使用JsonPathReader 控制器服务

我们需要添加至少一个用户定义的属性来启用控制器服务,例如属性名称为id值为$.id

【讨论】:

  • 亲爱的舒非常感谢您。我将掌握并尝试您的解决方案并回复您。谢谢。
猜你喜欢
  • 1970-01-01
  • 2019-05-22
  • 1970-01-01
  • 1970-01-01
  • 2016-07-18
  • 2019-04-16
  • 2016-11-04
  • 2015-08-10
  • 2018-06-12
相关资源
最近更新 更多