【问题标题】:Spark: groupBy taking lot of timeSpark: groupBy 花费大量时间
【发布时间】:2015-03-14 15:47:16
【问题描述】:

在我的应用程序中获取性能数据时,groupby 占用了大量时间。

我的 RDD 的结构如下:

JavaPairRDD<CustomTuple, Map<String, Double>>

自定义元组: 此对象包含有关 RDD 中当前行的信息,例如星期、月份、城市等。

public class CustomTuple implements Serializable{

private Map hierarchyMap = null;
private Map granularMap  = null;
private String timePeriod = null;
private String sourceKey  = null;
}

地图

此地图包含有关该行的统计数据,例如投资多少、GRP 多少等。

<"Inv", 20>

<"GRP", 30>

我在这个 RDD 上的 DAG 下执行

  1. 在此 RDD 上应用过滤器并确定相关行的范围:过滤器
  2. 在此 RDD 上应用过滤器并确定相关行的范围:过滤器
  3. 加入 RDD:加入
  4. 应用地图阶段来计算投资:地图
  5. 应用 GroupBy 阶段根据所需视图对数据进行分组:GroupBy
  6. 应用地图阶段以根据上述步骤中实现的分组聚合数据(例如查看跨时间段的数据),并根据希望收集的结果集创建新对象:地图
  7. 收集结果:收集

因此,如果用户想要查看跨时间段的投资,则返回下面的列表(这是在上面的第 4 步中实现的):

<timeperiod1, value> 

当我检查操作所花费的时间时,GroupBy 花费了执行整个 DAG 所花费时间的 90%。

IMO,我们可以用一个单一的 reduce 替换 GroupBy 和后续的 Map 操作。 但reduce 将适用于JavaPairRDD> 类型的对象。 所以我的 reduce 就像 T reduce(T,T,T) 其中 T 是 CustomTuple, Map。

或者也许在上面 DAG 中的第 3 步之后,我运行另一个 map 函数,该函数返回一个 RDD 类型,用于需要聚合的指标,然后运行 ​​reduce。

另外,我不确定聚合函数是如何工作的,在这种情况下它能否帮助我。

其次,我的应用程序将收到有关不同键的请求。在我当前的 RDD 设计中,每个请求都需要我在这个键上重新分区或重新组合我的 RDD。这意味着对于每个请求,分组/重新分区将花费我 95% 的时间来计算作业。

<"market1", 20>
<"market2", 30>

这非常令人沮丧,因为当前没有 Spark 的应用程序性能比使用 Spark 的性能好 10 倍。

感谢任何见解。

[EDIT]我们还注意到 JOIN 花费了很多时间。也许这就是 groupby 需要时间的原因。[编辑]

TIA!

【问题讨论】:

    标签: aggregate apache-spark reduce


    【解决方案1】:

    Spark 的文档鼓励您避免操作 groupBy 操作,而是建议 combineByKey 或其派生操作(reduceByKey 或 aggregateByKey)。您必须使用此操作才能在 shuffle 之前和之后进行聚合(如果我们使用 Hadoop 术语,则在 Map 和 Reduce 阶段),因此您的执行时间会提高(我不知道它是否会是 10 次更好,但必须更好)

    如果我理解您的处理,我认为您可以使用单个 combineByKey 操作 以下代码的解释是针对 scala 代码进行的,但您可以毫不费力地将其转换为 Java 代码

    combineByKey 有三个参数: combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C ) ⇒ C): RDD[(K, C)]

    • createCombiner:在此操作中,您创建一个新类以组合您的数据,这样您就可以将您的 CustomTuple 数据聚合到一个新的类 CustomTupleCombiner 中(我不知道您是否想要只计算一个总和,或者您可能想对这些数据应用一些处理,但可以在此操作中选择任一选项)

    • mergeValue:在此操作中,您必须描述一个 CustomTuple 如何与另一个 CustumTupleCombiner 相加(我再次假设一个简单的汇总操作)。例如,如果您想通过键对数据求和,您将在 CustumTupleCombiner 类中有一个 Map ,因此操作应该类似于: CustumTupleCombiner.sum(CustomTuple) 使 CustumTupleCombiner.Map(CustomTuple.key)-> CustomTuple.Map( CustomTuple.key) + CustumTupleCombiner.value

    • mergeCombiners:在此操作中,您必须定义如何合并两个Combiner 类,在我的示例中为CustumTupleCombiner。所以这将类似于 CustumTupleCombiner1.merge(CustumTupleCombiner2) 类似于 CustumTupleCombiner1.Map.keys.foreach( k -> CustumTupleCombiner1.Map(k)+CustumTupleCombiner2.Map(k)) 或类似的东西

    patated 代码没有得到证明(这甚至无法编译,因为我是用 vim 制作的)但我认为这可能适用于您的场景。

    希望对你有用

    【讨论】:

    • 感谢 jlopezmat。您能否使用我的用例中的示例进行详细说明。它会更好地澄清。例如,我希望使用 CustomTuple 地图中的某个字段来聚合我的值,例如,市场。那确实很有帮助。我想了解它将如何替换 groupby,其中我将在同一场景中将 market 作为 key 返回,然后在 map 阶段进行聚合。 TIA!
    【解决方案2】:

    洗牌由 [K,V] 对的键的任何更改或repartition() 调用触发。分区是根据 K(key)值计算的。默认情况下,分区是使用您的键的哈希值计算的,由hashCode() 方法实现。在您的情况下,您的 Key 包含两个 Map 实例变量。 hashCode() 方法的默认实现还必须计算这些映射的 hashCode(),从而导致对所有 it 元素进行迭代,从而再次计算这些元素的 hashCode()

    解决办法是:

    1. 请勿在您的密钥中包含Map 实例。这似乎很不寻常。
    2. 实现并覆盖您自己的 hashCode(),以避免通过 Map 实例变量。
    3. 您可以完全避免使用Map 对象。如果它是在多个元素之间共享的东西,您可能需要考虑在 spark 中使用广播变量。在洗牌期间序列化地图的开销也可能是一个很大的影响因素。
    4. 通过调整两个连续分组依据之间的哈希值来避免任何洗牌。
    5. 通过选择在连续使用期间将分区保持在本地的亲和力的分区器来保持本地节点的混洗。

    hashCode() 上的好读物,包括对 Josh Bloch 的引用的引用,可以在 wiki 中找到。

    【讨论】:

      猜你喜欢
      • 2018-01-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-11
      • 2016-06-21
      • 2016-01-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多