【问题标题】:Spark data manipulation with wholeTextFiles使用 wholeTextFiles 进行 Spark 数据操作
【发布时间】:2023-04-03 23:11:01
【问题描述】:

我有 20k 个约 2MB 的压缩文件可以在 spark 中操作。我最初的想法是使用wholeTextFiles() 以便获得文件名-> 内容元组。这很有用,因为我需要维护这种配对(因为处理是基于每个文件完成的,每个文件代表一分钟收集的数据)。但是,每当我需要映射/过滤/等数据并维护此文件名 - > 关联时,代码就会变得丑陋(而且可能效率不高?),即

Data.map(lambda (x,y) : (x, y.changeSomehow))

数据本身,也就是每个文件的内容,作为一个单独的 RDD 读起来会很好,因为它包含 10k 行数据;但是,一个人不能拥有一个 rdd 的 rdds(据我所知)。

有什么方法可以缓解这个过程吗?任何基本上允许我将每个文件的内容用作 rdd 的解决方法,因此允许我在不丑陋的跟踪文件名的情况下执行rdd.map(lambda x: change(x))(以及使用列表推导而不是转换)?

当然,目标也是保持分布式方法,并且不以任何方式抑制它。

处理的最后一步是通过 reduce 收集所有内容。

更多背景知识:尝试每分钟识别(接近)船舶碰撞,然后绘制它们的路径

【问题讨论】:

  • 如果你需要速度,我建议使用 scala。 Python 慢了大约 10 倍,一方面是因为 python 本身就更慢,另一方面是因为它必须将数据从 jvm 传送到 python
  • @Reactormonk 我想这样做。但是,我必须使用 python 脚本来解码文件中的几乎每一行。更准确地说,我说的是 AIS 消息,我只在 python 中找到了代码(考虑到我有时间编写自己的代码太复杂了)。如果您知道解决方法,请告诉
  • 看起来 2.5x 更准确。 databricks.com/blog/2015/04/24/… / emptypipes.org/2015/01/17/python-vs-scala-vs-spark 所以只有半个数量级而不是一个完整的数量级。使用 pypy 就更少了。
  • @Reactormonk 啊,我完全忘记了 Scala 中的 Java 非常容易,因为有 JVM。谢谢!

标签: apache-spark rdd


【解决方案1】:

如果你有正常的map函数(o1->o2),你可以使用mapValues函数。你还有 flatMap (o1 -> Collection()) 函数:flatMapValues。

它将保留密钥(在您的情况下 - 文件名)并仅更改值。

例如:

rdd = sc.wholeTextFiles (...)
# RDD of i.e. one pair, /test/file.txt -> Apache Spark
rddMapped = rdd.mapValues (lambda x: veryImportantDataOf(x))
# result: one pair: /test/file.txt -> Spark

使用 reduceByKey 可以减少结果

【讨论】:

  • 这已经是一个进步了,谢谢!所以我的一些行现在看起来像data.mapValues(lambda x: [list comprehension]) ,其中列表包含文件中的每一行。如果 x 是它自己的一个 rdd,而不是一个列表,那就太棒了,就效率和可读性而言。你同意吗,也许知道如何去做?
  • 密钥对RDD的值不能是RDD。它可以是任何可序列化的数据结构,即您可以创建案例类DocumentContent。 RDD 类似于数据转换的逻辑计划,它不仅仅是一个集合。驱动程序必须构造一个调用链来进行 RDD 动作,嵌入式 RDD 会导致很多问题,这就是为什么 RDD 的 RDD 是不允许的
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-04-18
  • 1970-01-01
  • 1970-01-01
  • 2018-05-13
  • 1970-01-01
  • 1970-01-01
  • 2018-02-17
相关资源
最近更新 更多