【问题标题】:How to add filename as a column to a a Java RDD with Spark?如何使用 Spark 将文件名作为列添加到 Java RDD?
【发布时间】:2021-08-25 18:48:15
【问题描述】:

使用 apache Spark,我们需要处理一堆文件,并跟踪哪些文件中有特定的关键字。

我正在尝试创建一个包含两列的数据框:

  • 文件中的一行
  • 包含该行的文件

这是我目前所拥有的:

String[] sourceLogPaths = Files.walk(Paths.get(getLogSourceDirectory())).filter(Files::isRegularFile).map(path -> path.toString()).collect(Collectors.toList()).toArray((new String[0]));
SparkSession spark = SparkSession.builder().appName("LogSearcher").master("local").getOrCreate();

// sourceLogPaths is an array of different file names
JavaRDD<String> textFile = spark.read().textFile(sourceLogPaths).javaRDD();
JavaRDD<Row> rowRDD = textFile.map(RowFactory::create);
// How to add a field that shows the associated filename for each row?
List<StructField> fields = Arrays.asList(DataTypes.createStructField("line", DataTypes.StringType, true)); 
StructType schema = DataTypes.createStructType(fields);
SQLContext sqlContext = spark.sqlContext();
Dataset<Row> df = sqlContext.createDataFrame(rowRDD, schema);

df.show();

打印出来的:

+--------------------+
|                line|
+--------------------+
|1331901000.000000...|
|1331901000.000000...|
|1331901000.000000...|
...

谁能帮助我了解如何将原始文件的名称添加为第二列?

搜索建议导致了建议like this,但我不确定在这种情况下如何翻译。

提前致谢,我是 Spark 的新手,如有任何建议,我们将不胜感激。

【问题讨论】:

  • 不是 java spark 专家,但我想试一试。你能做类似spark.read().textFile(sourceLogPaths).withColumn("filename", input_file_name()).javaRDD(); 的事情吗?
  • 我很感激!

标签: java dataframe apache-spark pyspark apache-spark-sql


【解决方案1】:

我不是 Java 人,但在 python 中使用 spark 你可以提供整个文件夹或文件模式,并在下面使用类似这样的东西.. 如果本地文件系统使用 file: 在前面。 filename 会将文件名添加到数据中。

df = spark.read.text('/datafolder/foldername/*')
df = df.withColumn("filename", input_file_name())

【讨论】:

  • 谢谢,我认为这有帮助
【解决方案2】:

感谢@Rafa,我认为这就是答案:

String[] sourceLogPaths = Files.walk(Paths.get(getLogSourceDirectory())).filter(Files::isRegularFile).map(path -> path.toString()).collect(Collectors.toList()).toArray((new String[0]));
SparkSession spark = SparkSession.builder().appName("LogSearcher").master("local").getOrCreate();

// sourceLogPaths is an array of different file names
JavaRDD<String> textFile = spark.read().textFile(sourceLogPaths).javaRDD();
JavaRDD<Row> rowRDD = textFile.map(RowFactory::create);
// How to add a field that shows the associated filename for each row?
List<StructField> fields = Arrays.asList(DataTypes.createStructField("line", DataTypes.StringType, true)); 
StructType schema = DataTypes.createStructType(fields);
SQLContext sqlContext = spark.sqlContext();

// Below line has the additional column added
Dataset<Row> df = sqlContext.createDataFrame(rowRDD, schema).withColumn("file_name", input_file_name());

df.show();

打印

+--------------------+--------------------+
|                line|           file_name|
+--------------------+--------------------+
|1331901000.000000...|file:///Users/acu...|
|1331901000.000000...|file:///Users/acu...|
|1331901000.000000...|file:///Users/acu...|
|1331901000.010000...|file:///Users/acu...|

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-09
    • 2017-09-13
    • 2017-06-26
    • 1970-01-01
    • 2015-09-23
    • 1970-01-01
    相关资源
    最近更新 更多