【发布时间】:2016-09-26 07:40:58
【问题描述】:
关于 Spark 1.6.1、pyspark 的问题
我有流式数据进来
{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555}
我有一个写入 HBase 的函数,称为 writeToHBase(rdd),期望 rdd 具有以下结构的元组:
(rowkey, [rowkey, column-family, key, value])
从输入格式可以看出,我必须获取原始数据集并遍历所有键,通过发送函数调用发送每个键/值对。
通过阅读 spark 流编程指南,“使用 foreachRDD 的设计模式”部分http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13
似乎建议在执行数据集外部的操作时使用 foreachRDD。就我而言,我想通过网络将数据写入 HBase,因此我在流数据上使用 foreachRDD 并调用将处理发送数据的函数:
stream.foreachRDD(lambda k: process(k))
我现在对 spark 函数的理解非常有限,所以我无法找到一种方法来迭代我的原始数据集以使用我的 write 函数。如果它是 python 可迭代的,我可以这样做:
def process(rdd):
for key, value in my_rdd.iteritems():
writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value]))
通过在 rdd 本身中找到 rowkey 来获得它的位置
rdd.map(lambda x: x['rowkey'])
如何在 pyspark 中完成 process() 的任务?我看到一些使用 foreach 的示例,但我不能让它做我想做的事。
【问题讨论】:
标签: python apache-spark pyspark