【问题标题】:using foreachRDD and foreach to iterate over an rdd in pyspark使用 foreachRDD 和 foreach 迭代 pyspark 中的 rdd
【发布时间】:2016-09-26 07:40:58
【问题描述】:

关于 Spark 1.6.1、pyspark 的问题

我有流式数据进来

{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555}

我有一个写入 HBase 的函数,称为 writeToHBase(rdd),期望 rdd 具有以下结构的元组:

(rowkey, [rowkey, column-family, key, value])

从输入格式可以看出,我必须获取原始数据集并遍历所有键,通过发送函数调用发送每个键/值对。

通过阅读 spark 流编程指南,“使用 foreachRDD 的设计模式”部分http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13

似乎建议在执行数据集外部的操作时使用 foreachRDD。就我而言,我想通过网络将数据写入 HBase,因此我在流数据上使用 foreachRDD 并调用将处理发送数据的函数:

stream.foreachRDD(lambda k: process(k))

我现在对 spark 函数的理解非常有限,所以我无法找到一种方法来迭代我的原始数据集以使用我的 write 函数。如果它是 python 可迭代的,我可以这样做:

def process(rdd):
    for key, value in my_rdd.iteritems():
        writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value]))

通过在 rdd 本身中找到 rowkey 来获得它的位置

rdd.map(lambda x: x['rowkey'])

如何在 pyspark 中完成 process() 的任务?我看到一些使用 foreach 的示例,但我不能让它做我想做的事。

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    为什么你要迭代 rdd 而你的 writeToHBase 函数需要一个 rdd 作为参数。只需在您的流程函数中调用writeToHBase(rdd) 即可。

    如果您需要从 rdd 中获取每条记录,您可以调用

    def processRecord(record):
            print(record)   
    rdd.foreach(processRecord)
    

    在 processRecord 函数中,您将获得要处理的单个记录。

    【讨论】:

      猜你喜欢
      • 2015-12-22
      • 2017-01-09
      • 1970-01-01
      • 1970-01-01
      • 2016-04-28
      • 2014-11-20
      • 2014-11-05
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多