【问题标题】:extracting specific value from pyspark streaming rdd从 pyspark 流式传输 rdd 中提取特定值
【发布时间】:2016-05-14 10:19:13
【问题描述】:

我有 json rdd,我使用 pprint 打印如下所示

[u'{']
[u'"hash" ', u' "0000000000000000059134ebb840559241e8e2799f3ebdff56723efecfd6567a",']
[u'"confirmations" ', u' 969,']
[u'"size" ', u' 52543,']
[u'"height" ', u' 395545,']
[u'"version" ', u' 4,']
[u'"merkleroot" ', u' "8cf3eea32f692e5ebc9c25bb912ab3aff43c02761609d52cdd48afc5a05918fb",']
[u'"tx" ', u' [']
[u'"b3df3d5fedadd07a46753af556c336c41e038a9aec7ddd9921ad249828fd6d66",']
[u'"4ada431255d104c1c76ef56bdef4186ea89793223133e535383ff39d5a322910",']

我想提取倒数第​​二个值[u'"b3df3d5fedadd07a46753af556c336c41e038a9aec7ddd9921ad249828fd6d66",']

索引不起作用时如何获取此值。代码如下

from pyspark.streaming import StreamingContext
import json

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "txcount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999)
dump_rdd = lines.map(lambda x: json.dumps(x))
load_rdd = dump_rdd.map(lambda x: json.loads(x))
tx = load_rdd.map(lambda x: x.split(":"))
tx.pprint()

【问题讨论】:

  • 干脆不要尝试。您不应该以这种方式传递多行输入。
  • @zero323 可以说,如果这是单行,那么您将如何提取它?
  • @zero323 把你的答案写出来让我接受

标签: json apache-spark pyspark spark-streaming rdd


【解决方案1】:

socketTextStream 不是为处理多行记录而设计的。虽然重新组合完整的记录并非不可能,但我怀疑这是否值得。如果您想使用 socketTextStream 为简单起见,只需编码(例如使用 Base64 编码)或在传递给 Spark 之前清理上游数据。

【讨论】:

  • @zeros323。我在之前的评论中问过“如果它是单行 json 怎么办?”
  • 非常感谢您的建议和帮助
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-02-16
  • 2016-03-24
  • 1970-01-01
  • 1970-01-01
  • 2017-11-02
相关资源
最近更新 更多