【问题标题】:How do I transfer my cassandra data to pyspark using QueryCassandra and ExecutePySpark Nifi Processors?如何使用 QueryCassandra 和 ExecutePySpark Nifi 处理器将我的 cassandra 数据传输到 pyspark?
【发布时间】:2018-08-22 23:29:00
【问题描述】:

我只是使用 querycassandra 处理器查询 cassandra 表,但我不明白如何将我的 Json 输出文件作为输入文件传递给 ExecutePyspark 处理器,然后我需要将我的 Spark 输出数据传递给 Hive。请帮助我,谢谢。

我的查询 Cassandra 属性:

Pyspark 属性:

【问题讨论】:

    标签: apache-spark cassandra pyspark apache-nifi kylo


    【解决方案1】:

    考虑如下使用 4 个处理器的流程:

    QueryCassandra -> UpdateAttribute -> PutFile -> ExecutePySpark

    第 1 步QueryCassandra 处理器:在 Cassandra 上执行 CQL 并将结果输出到流文件中。

    第 2 步UpdateAttribute 处理器:为属性 filename 分配一个值,该值包含磁盘上将包含查询结果的临时文件的名称。使用NiFi expression language 生成文件名,以便每次运行都不同。创建属性result_directory 并为磁盘上NiFi 具有写入权限的文件夹分配一个值。

    • 属性:filename
    • 值:cassandra_result_${now():toNumber()}

    • 属性:result_directory

    • 值:/tmp

    第 3 步PutFile 处理器:使用在第 2 步中填充的值 ${result_directory} 配置 Directory 属性。

    第 4 步ExecutePySpark 处理器:通过PySpark App Args 处理器属性将文件名及其位置作为参数传递给 PySpark 应用程序。然后,应用程序可以使用代码从磁盘上的文件中读取数据,对其进行处理并写入 Hive。

    • 属性:PySpark App Args
    • 值:${result_directory}/${filename}

    此外,您可以在第 2 步 (UpdateAttribute) 中配置更多属性,然后在第 4 步 (ExecutePySpark) 中将这些属性作为参数传递,并由 PySpark 应用程序在写入 Hive 时考虑(例如,Hive 数据库和表名) .

    【讨论】:

    • 非常感谢 Jagrut,它正在工作并且能够通过我的 spark 应用程序将数据写入 hive,但我的 spark 应用程序只需要在那里执行转换,并且需要单独的处理器将数据写入 Hive。 spark中是否有任何机制来生成流文件并将其传递给下一个处理器?
    • @KarthikMannava 您可以尝试使用 ExecuteStream 处理器,这样您就可以在 Python 脚本中从标准输入读取并写入标准输出。我认为 Spark 中没有任何内置支持直接写入 NiFi FlowFile。
    • @GregHart 如何使用执行流命令处理器在我的 Spark 应用程序中调用流文件内容?当我直接尝试使用 python 读取标准输入时,流文件卡在 queryCassandra 和 ExecuteStreamCommand 处理器之间的队列中。
    • @KarthikMannava 您将从标准输入读取并写入标准输出。如果这不起作用,您应该创建一个新问题来显示您的 Python 代码。
    猜你喜欢
    • 2019-05-22
    • 1970-01-01
    • 1970-01-01
    • 2019-12-14
    • 1970-01-01
    • 2021-06-27
    • 1970-01-01
    • 1970-01-01
    • 2015-11-03
    相关资源
    最近更新 更多