【发布时间】:2021-08-25 18:48:15
【问题描述】:
使用 apache Spark,我们需要处理一堆文件,并跟踪哪些文件中有特定的关键字。
我正在尝试创建一个包含两列的数据框:
- 文件中的一行
- 包含该行的文件
这是我目前所拥有的:
String[] sourceLogPaths = Files.walk(Paths.get(getLogSourceDirectory())).filter(Files::isRegularFile).map(path -> path.toString()).collect(Collectors.toList()).toArray((new String[0]));
SparkSession spark = SparkSession.builder().appName("LogSearcher").master("local").getOrCreate();
// sourceLogPaths is an array of different file names
JavaRDD<String> textFile = spark.read().textFile(sourceLogPaths).javaRDD();
JavaRDD<Row> rowRDD = textFile.map(RowFactory::create);
// How to add a field that shows the associated filename for each row?
List<StructField> fields = Arrays.asList(DataTypes.createStructField("line", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
SQLContext sqlContext = spark.sqlContext();
Dataset<Row> df = sqlContext.createDataFrame(rowRDD, schema);
df.show();
打印出来的:
+--------------------+
| line|
+--------------------+
|1331901000.000000...|
|1331901000.000000...|
|1331901000.000000...|
...
谁能帮助我了解如何将原始文件的名称添加为第二列?
搜索建议导致了建议like this,但我不确定在这种情况下如何翻译。
提前致谢,我是 Spark 的新手,如有任何建议,我们将不胜感激。
【问题讨论】:
-
不是 java spark 专家,但我想试一试。你能做类似
spark.read().textFile(sourceLogPaths).withColumn("filename", input_file_name()).javaRDD();的事情吗? -
我很感激!
标签: java dataframe apache-spark pyspark apache-spark-sql