【问题标题】:Spark Streaming app streams files that have already been streamedSpark Streaming 应用程序流式传输已流式传输的文件
【发布时间】:2014-11-11 17:11:13
【问题描述】:

我们在 YARN ec2 集群中部署了一个 Spark 流应用程序,该集群具有 1 个名称节点和 2 个数据节点。我们提交的应用程序有 11 个执行程序,每个执行程序有 1 个内核和 588 MB RAM。 该应用程序从 S3 中不断写入的目录中流出;这是实现这一目标的代码行:

val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](Settings.S3RequestsHost , (f:Path)=> true, true )
//some maps and other logic here
ssc.start()
ssc.awaitTermination()

使用 fileStream 而不是 textFileStream 的目的是自定义 spark 在进程启动时处理现有文件的方式。我们只想处理进程启动后添加的新文件并忽略现有文件。我们配置了 10 秒的批处理持续时间。

当我们将少量文件添加到 s3 时,这个过程很顺利,比如 4 或 5 个。我们可以在流式 UI 中看到阶段是如何在执行程序中成功执行的,每个处理的文件都有一个。但有时当我们尝试添加大量文件时,我们会遇到奇怪的行为;应用程序开始流式传输已经流式传输的文件。

例如,我在 s3 中添加了 20 个文件。文件分 3 批处理。第一批处理 7 个文件,第二批处理 8 个,第三批处理 5 个。此时不再向 S3 添加文件,但 spark 开始使用相同的文件无休止地重复这些阶段! 有什么想法会导致这种情况吗?

我为此问题发布了一张 Jira 票证: https://issues.apache.org/jira/browse/SPARK-3553

【问题讨论】:

  • 你是如何添加文件的?请注意Spark Streaming Programming Guide 中的语句“必须通过原子地将文件移动或重命名到数据目录中来在 dataDirectory 中创建文件”。
  • 嗨@SpiroMichaylov。我正在使用 cloudberry 上传文件。原子的意思是一个一个?我不确定 cloudberry 是否使用批量上传。可能是这导致了问题...
  • 我对“原子”的解释是整个文件必须同时出现,这与上面的引用一致。它包括创建空文件并附加到它。您可能需要做的是让 cloudberry 将文件放在其他位置,然后定期运行脚本,将文件移动或重命名到您附加流应用程序的目录中。我不知道它是否会导致您的问题,但如果 cloudberry 不这样做,它导致您一些问题。
  • 是的@SpiroMichaylov,这行得通! =) 请将此作为答案发布,以便我将其标记为已解决。 (我没有定期运行脚本,而是简单地上传了带有 .tmp 扩展名的文件,并在上传后使用 s3 put-copy api 将它们重命名为 .csv。然后在流应用程序中,我只过滤了 *.csv 文件使用fileStream 函数,如我的问题所示)
  • 我发布了一个摘要作为答案。很高兴它有帮助!

标签: amazon-ec2 amazon-s3 apache-spark hadoop-yarn spark-streaming


【解决方案1】:

请注意Spark Streaming Programming Guide 中的语句“必须通过原子地将文件移动或重命名到数据目录中来在 dataDirectory 中创建文件”。整个文件必须同时出现,而不是创建空文件并附加到它。

一种方法是让 cloudberry 将文件放在其他位置,然后定期运行脚本,将文件移动或重命名到您已将流应用程序附加到的目录中。

【讨论】:

    猜你喜欢
    • 2018-12-15
    • 2020-07-17
    • 2018-05-27
    • 2018-01-29
    • 2016-08-25
    • 2016-09-21
    • 1970-01-01
    • 1970-01-01
    • 2017-09-08
    相关资源
    最近更新 更多