【发布时间】:2018-08-22 23:29:00
【问题描述】:
我只是使用 querycassandra 处理器查询 cassandra 表,但我不明白如何将我的 Json 输出文件作为输入文件传递给 ExecutePyspark 处理器,然后我需要将我的 Spark 输出数据传递给 Hive。请帮助我,谢谢。
我的查询 Cassandra 属性:
【问题讨论】:
标签: apache-spark cassandra pyspark apache-nifi kylo
我只是使用 querycassandra 处理器查询 cassandra 表,但我不明白如何将我的 Json 输出文件作为输入文件传递给 ExecutePyspark 处理器,然后我需要将我的 Spark 输出数据传递给 Hive。请帮助我,谢谢。
我的查询 Cassandra 属性:
【问题讨论】:
标签: apache-spark cassandra pyspark apache-nifi kylo
考虑如下使用 4 个处理器的流程:
QueryCassandra -> UpdateAttribute -> PutFile -> ExecutePySpark
第 1 步:QueryCassandra 处理器:在 Cassandra 上执行 CQL 并将结果输出到流文件中。
第 2 步:UpdateAttribute 处理器:为属性 filename 分配一个值,该值包含磁盘上将包含查询结果的临时文件的名称。使用NiFi expression language 生成文件名,以便每次运行都不同。创建属性result_directory 并为磁盘上NiFi 具有写入权限的文件夹分配一个值。
filename
值:cassandra_result_${now():toNumber()}
属性:result_directory
/tmp
第 3 步:PutFile 处理器:使用在第 2 步中填充的值 ${result_directory} 配置 Directory 属性。
第 4 步:ExecutePySpark 处理器:通过PySpark App Args 处理器属性将文件名及其位置作为参数传递给 PySpark 应用程序。然后,应用程序可以使用代码从磁盘上的文件中读取数据,对其进行处理并写入 Hive。
PySpark App Args
${result_directory}/${filename}
此外,您可以在第 2 步 (UpdateAttribute) 中配置更多属性,然后在第 4 步 (ExecutePySpark) 中将这些属性作为参数传递,并由 PySpark 应用程序在写入 Hive 时考虑(例如,Hive 数据库和表名) .
【讨论】: