【问题标题】:Chaining a cache() command in pyspark?在 pyspark 中链接 cache() 命令?
【发布时间】:2020-01-15 15:58:12
【问题描述】:

我刚刚开始了解 pyspark,并且正在研究如何优化带有缓存的代码。链接 cache() 命令有意义吗?这是我的代码的样子

token_count_dict = dict(sorted_tokens_rdd.collect())
tokens = list(token_count_dict.keys())

popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
    .filter(lambda x: x[1] in tokens)\
    .distinct()\
    .map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
    .reduceByKey(lambda x, y: x+y)\
    .map(lambda x: (x[0], a_function(x[1], token_count_dict[x[0][1]])))\
    .sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
    .map(lambda x: (x[0][0], (x[0][1], x[1])))\
    .groupByKey()\
    .map(lambda x: [x[0], list(x[1])])

print(popular_tokens.toDebugString().decode("utf-8"))

输出是:

(2) PythonRDD[149] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[148] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[147] at partitionBy at <unknown>:0 []
 +-(2) PairwiseRDD[146] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
    |  PythonRDD[145] at groupByKey at <ipython-input-24-d694a6d94459>:5 []
    |  MapPartitionsRDD[144] at mapPartitions at PythonRDD.scala:122 []
    |  ShuffledRDD[143] at partitionBy at <unknown>:0 []
    +-(2) PairwiseRDD[142] at sortBy at <ipython-input-24-d694a6d94459>:5 []
       |  PythonRDD[141] at sortBy at <ipython-input-24-d694a6d94459>:5 []
       |  MapPartitionsRDD[138] at mapPartitions at PythonRDD.scala:122 []
       |  ShuffledRDD[137] at partitionBy at <unknown>:0 []
       +-(2) PairwiseRDD[136] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
          |  PythonRDD[135] at reduceByKey at <ipython-input-24-d694a6d94459>:5 []
          |  MapPartitionsRDD[134] at mapPartitions at PythonRDD.scala:122 []
          |  ShuffledRDD[133] at partitionBy at <unknown>:0 []
          +-(2) PairwiseRDD[132] at distinct at <ipython-input-24-d694a6d94459>:5 []
             |  PythonRDD[131] at distinct at <ipython-input-24-d694a6d94459>:5 []
             |  ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
             |      CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
             |  ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

根据上面的谱系,我看到了几个可以从缓存中受益(?)的分支。那么,下面的火花优化实践是不是更好?

根据我所做的研究,共识似乎是在血统分支处缓存()。当我使用 %%timeit 对两个实现的运行进行计时时,没有区别。

popular_tokens = uid_txt_rdd.flatMapValues(tok.tokenize)\
    .cache()\
    .filter(lambda x: x[1] in tokens)\
    .distinct()\
    .cache()\
    .map(lambda x: ((partition[x[0]] if x[0] in partition.keys() else 7, x[1]), 1))\
    .cache()\
    .reduceByKey(lambda x, y: x+y)\
    .map(lambda x: (x[0], get_rel_popularity(x[1], token_count_dict[x[0][1]])))\
    .cache()\
    .sortBy(lambda x: (x[0][0], -x[1], x[0][1]))\
    .cache()\
    .map(lambda x: (x[0][0], (x[0][1], x[1])))\
    .cache()\
    .groupByKey()\
    .map(lambda x: [x[0], list(x[1])])

输出似乎还有很多分支

(2) PythonRDD[130] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[129] at mapPartitions at PythonRDD.scala:122 []
 |  ShuffledRDD[128] at partitionBy at <unknown>:0 []
 +-(2) PairwiseRDD[127] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
    |  PythonRDD[126] at groupByKey at <ipython-input-23-5914874b5d65>:5 []
    |  PythonRDD[125] at RDD at PythonRDD.scala:48 []
    |  PythonRDD[124] at RDD at PythonRDD.scala:48 []
    |  MapPartitionsRDD[123] at mapPartitions at PythonRDD.scala:122 []
    |  ShuffledRDD[122] at partitionBy at <unknown>:0 []
    +-(2) PairwiseRDD[121] at sortBy at <ipython-input-23-5914874b5d65>:5 []
       |  PythonRDD[120] at sortBy at <ipython-input-23-5914874b5d65>:5 []
       |  PythonRDD[117] at RDD at PythonRDD.scala:48 []
       |      CachedPartitions: 2; MemorySize: 7.4 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
       |  MapPartitionsRDD[116] at mapPartitions at PythonRDD.scala:122 []
       |  ShuffledRDD[115] at partitionBy at <unknown>:0 []
       +-(2) PairwiseRDD[114] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
          |  PythonRDD[113] at reduceByKey at <ipython-input-23-5914874b5d65>:5 []
          |  PythonRDD[112] at RDD at PythonRDD.scala:48 []
          |      CachedPartitions: 2; MemorySize: 193.2 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
          |  PythonRDD[111] at RDD at PythonRDD.scala:48 []
          |      CachedPartitions: 2; MemorySize: 188.7 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
          |  MapPartitionsRDD[110] at mapPartitions at PythonRDD.scala:122 []
          |  ShuffledRDD[109] at partitionBy at <unknown>:0 []
          +-(2) PairwiseRDD[108] at distinct at <ipython-input-23-5914874b5d65>:5 []
             |  PythonRDD[107] at distinct at <ipython-input-23-5914874b5d65>:5 []
             |  PythonRDD[106] at RDD at PythonRDD.scala:48 []
             |      CachedPartitions: 2; MemorySize: 652.0 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
             |  ./hw2-files-10mb.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
             |      CachedPartitions: 2; MemorySize: 2.6 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
             |  ./hw2-files-10mb.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

感谢您帮助菜鸟!

【问题讨论】:

    标签: apache-spark caching pyspark rdd


    【解决方案1】:

    缓存是节省计算与消耗存储之间的选择。您不能缓存所有内容,因为这会消耗内存和磁盘。内存是有限的,当你读回它时缓存到磁盘涉及到 IO。我会建议缓存一个构建和多次使用的昂贵数据框。

    如果它只使用一次,那么即使构建成本很高,我也不会缓存它,因为它必须构建一次才能使用。这就是为什么您没有看到任何性能优势的原因,因为您不重用缓存的数据。

    在您的示例中,我将在所有过滤器、排序、映射和分组发生后缓存最终结果,假设整个事情都被重用了。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-11-18
      • 2015-11-10
      • 1970-01-01
      • 2012-02-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多