【问题标题】:Spark join exponentially slowSpark join 速度呈指数级增长
【发布时间】:2014-10-07 14:47:46
【问题描述】:

我正在尝试加入两个 Spark RDD。我有一个与类别相关联的事务日志。我已将我的事务 RDD 格式化为以类别 ID 作为键。

transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']), 
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']), 
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]

categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]

事务日志大约 20 GB(3.5 亿行)。 类别列表小于 1KB。

当我跑步时

transactions_cat.join(categories).count()

Spark 开始变得非常缓慢。我有一个有 643 个任务的阶段。前 10 个任务大约需要 1 分钟。然后每个任务越来越慢(第 60 个任务大约需要 15 分钟)。我不确定出了什么问题。

请查看这些屏幕截图以获得更好的想法。

我正在运行 Spark 1.1.0,有 4 名工作人员使用 python shell,总内存为 50 GB。 仅计算事务 RDD 非常快(30 分钟)

【问题讨论】:

    标签: python scala apache-spark


    【解决方案1】:

    问题可能是 Spark 没有注意到您有一个简单的连接问题案例。当你加入的两个RDDs 之一太小时,最好不要成为RDD。然后,您可以推出自己的hash join 实现,这实际上比听起来要简单得多。基本上,您需要:

    • 使用collect() 将您的类别列表从RDD 中提取出来——由此产生的通信将很容易收回成本(或者,如果可能,首先不要将其设为RDD
    • 将其转换为一个哈希表,其中一个条目包含一个键的所有值(假设您的键不是唯一的)
    • 对于大 RDD 中的每一对,在哈希表中查找键并为列表中的每个值生成一对(如果未找到,则该特定对不会产生任何结果)

    我有一个implementation in Scala -- 随时询问有关翻译的问题,但应该很容易。

    另一个有趣的可能性是尝试使用Spark SQL。我很确定该项目的长期目标将包括自动为您执行此操作,但我不知道他们是否已经实现了。

    【讨论】:

    • 嗨,我面临着完全相同的问题,尽管是数据帧。七个 10x2 数据帧的简单连接,然后在结果上运行 count(),使 Spark 失去了控制,包含 1000 多个任务和 14 个阶段。有什么方法可以在 Spark 之前的手动加入中修复/优化这个问题?
    猜你喜欢
    • 2015-10-18
    • 1970-01-01
    • 2023-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-07
    • 1970-01-01
    相关资源
    最近更新 更多