【问题标题】:StreamingContext with filter in Scala/SparkScala/Spark 中带有过滤器的 StreamingContext
【发布时间】:2016-09-26 23:05:15
【问题描述】:

我使用 Scala 的 StreamingContext 成功地计算了单词:

val scc = new StreamingContext(sc,Seconds(1))
val dstream = scc.textFileStream("""file:///pathToDirectoryWindows""");
//dstream is DStream[String] 
val words = dstream.flatMap(line=>line.split(" "));

但我尝试对过滤做同样的事情,即只考虑扩展名为 .txt 的文件。看来textFileStream不允许过滤,所以我尝试了fileStream:

val fstream=scc.fileStream("""file:///pathToFolderWin""",x=>x.getName().contains(".txt"), true); 

但是这次我不能拆分,因为结果不是 DStream[String],而是 inputDStream[(Nothing, Nothing)] 。 如何处理字符串但过滤文件?非常感谢,列维

【问题讨论】:

    标签: scala apache-spark spark-streaming


    【解决方案1】:

    使用StreamingContext.fileStream 时,您必须明确指定 Hadoop 键类型、Hadoop 值类型和传入格式。例如,如果密钥类型是Long,您将收到Text,输入格式为TextInputFormat,您可以这样写:

    val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/file")
                   .map { case (key, text) => (key.toString, text.toString.split(" "))}
    

    这将产生一个DStream[(String, Array[String)],其中Array[String] 是拆分后的行。

    或者如果你只想要你要写的值:

    val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/file")
                   .map { case (_, text) => text.toString.split(" "))}
    

    你会得到一个DStream[Array[String]]

    编辑

    要对文件的扩展名应用过滤器,你可以Apache Commons IO - FilenameUtils.getExtension

    val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](
                      "/path/to/file", (file: Path) => 
                         FilenameUtils.getExtension(file.toString).equalsIgnoreCase("txt"))
                   .map { case (_, text) => text.toString.split(" "))}
    

    【讨论】:

    • 非常感谢,它可以工作,但是如何按文件名过滤文件?那是我的动机,我可能没有说得足够清楚。同样由于某种原因“/path/to/file”还不够,它需要像 new Sring("/path/to/file") 或其他类型的东西。那么,问题是如何添加文件名过滤器?再次感谢 Levi תודה
    • @Levi 您提供的过滤器有什么问题?
    • 我试着听从你的建议并写道: val fstream = scc.fileStream[LongWritable, Text, TextInputFormat](new String("""file:///C:\Users\lkitross\workspaceLuna \MyFirstScalaProject\"""));但我不知道如何在文件夹中的文件名上添加过滤器。也就是说,我如何仅将文件“.txt”而不是“.tst”作为输入流。列维
    • 我是 Scala 的新手,所以琐碎的事情对我来说仍然很困难。 (文件:路径)导致错误:未找到类型:路径。我尝试添加导入,但没有帮助,或者我没有找到正确的导入:org.apache.commons.io.FilenameUtils, org.apache.commons.io.FileUtils;org.apache.commons.io.FileSystemUtils, import java.io.文件;和其他一些。拜托,最后一击……李维斯
    • @YuvalItzchakov val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/file") .map { case (key, text) => (key.toString, text.toString.split(" "))} 此代码无法编译
    【解决方案2】:

    另一种解决方案:

    import org.apache.hadoop.fs.Path
    val fstream=scc.fileStream("""file:///pathToFolderWin""", (path: Path) => path.getName().endsWith(".txt"), true) 
    

    【讨论】:

      猜你喜欢
      • 2015-06-27
      • 2016-03-02
      • 2020-06-07
      • 1970-01-01
      • 1970-01-01
      • 2017-06-26
      • 1970-01-01
      • 2020-09-17
      • 1970-01-01
      相关资源
      最近更新 更多