【问题标题】:rdd.pipe throwing java.lang.IllegalStateException for grep -i shell command?rdd.pipe 为 grep -i shell 命令抛出 java.lang.IllegalStateException?
【发布时间】:2020-04-20 00:07:36
【问题描述】:

我正在运行在 RDD spark 操作中使用管道的代码:

按照我试过的sn-p:

//PIPE - run a external shell script in spark

val x = sc.parallelize(Array("A", "Ba", "C", "AD"))
val y = x.pipe("grep -i A")
println(x.collect())
println(y.collect())

但我得到了:

org.apache.spark.SparkException:作业因阶段失败而中止:阶段 61.0 中的任务 0 失败 1 次,最近一次失败:阶段 61.0 中丢失任务 0.0(TID 592,本地主机,执行程序驱动程序):java.lang .IllegalStateException: Subprocess exited with status 1. Command ran: grep -i A for running the above sn-p.

有没有办法通过pipe运行grep -i命令?

我尝试调用 .sh 脚本并且它正在工作,但我想将它作为 shell 命令运行。 Reference

【问题讨论】:

  • Subprocess exited with status 1 表示 Grep 命令以非零状态退出。 grep 的退出代码 1 仅表示没有选择/匹配任何行 - 对于 RDD 中没有 A 的所有元素都是如此
  • @tomgalpin: 但我在输入 RDD 中有一个元素 A。
  • 正确,但是 grep 命令是针对所有元素独立运行的。因此,例如,当它再次运行“Ba”时,grep“无法”匹配任何行,因此以失败的退出代码退出,从而给出上述错误。当您在 .sh 文件中执行此操作时,.sh 文件可能每次都退出 0。这更有意义吗?
  • @tomgalpin 谢谢,解决此问题的任何替代方法

标签: bash scala apache-spark rdd


【解决方案1】:

这是因为数据是分区的。即使您在.sh 文件中使用您提到的相同命令,您也会收到相同的错误。如果您将 RDD 重新分区到一个分区,它应该可以正常工作:

val y = x.repartition(1).pipe("grep -i A")

根据official documentation

pipe(command, [envVars])

通过 shell 命令对 RDD 的每个分区进行管道传输,例如Perl 或 bash 脚本。 RDD 元素被写入进程的标准输入和行 其标准输出的输出作为字符串的 RDD 返回。

当您使用grep 命令时,您不能彼此独立地处理每一行,因为如果它对一个元素失败,它就存在。

【讨论】:

  • And even if you use the same command within .sh file as you mention you'll get the same error. 除非您在脚本中处理错误。
  • x.repartition(1).pipe 就像一个魅力。感谢团队。
猜你喜欢
  • 2013-11-21
  • 2021-07-28
  • 1970-01-01
  • 1970-01-01
  • 2016-12-10
  • 1970-01-01
  • 1970-01-01
  • 2021-11-09
相关资源
最近更新 更多