【问题标题】:how to convert text file to parquet with java spark如何使用java spark将文本文件转换为镶木地板
【发布时间】:2019-02-03 10:07:11
【问题描述】:

我正在尝试将文本文件转换为镶木地板文件。我只能从其他文件格式或用 scala/python 编写的代码中找到“如何转换为镶木地板”。 这是我想出的

import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

private static final StructField[] fields = new StructField[]{
            new StructField("timeCreate", DataTypes.StringType, false, Metadata.empty()),
            new StructField("cookieCreate", DataTypes.StringType, false,Metadata.empty())
};//simplified
private static final StructType schema = new StructType(fields);

public static void main(String[] args) throws IOException {
    SparkSession spark = SparkSession
            .builder().master("spark://levanhuong:7077")
            .appName("Convert text file to Parquet")
            .getOrCreate();
    spark.conf().set("spark.executor.memory", "1G");
    WriteParquet(spark, args);

}
public static void WriteParquet(SparkSession spark, String[] args){
    JavaRDD<String> data = spark.read().textFile(args[0]).toJavaRDD();
    JavaRDD<Row> output = data.map((Function<String, Row>) s -> {
        DataModel model = new DataModel(s);
        return RowFactory.create(model);
    });

    Dataset<Row> df = spark.createDataFrame(output.rdd(),schema);
    df.printSchema();
    df.show(2);
    df.write().parquet(args[1]);
}

args[0] 是输入文件的路径,args[1] 是输出文件的路径。这是简化的数据模型。 DateTime 字段在 set() 函数中的格式正确

public class DataModel implements Serializable {
DateTime timeCreate;
DateTime cookieCreate;

public DataModel(String data){
    String model[] = data.split("\t");
    setTimeCreate(model[0]);
    setCookieCreate(model[1]);
}

这是错误。错误日志指向df.show(2),但我认为错误是由map() 引起的。我不知道为什么,因为我没有在代码中看到任何强制转换

    >java.lang.ClassCastException: cannot assign instance of

 java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1 
    of type org.apache.spark.api.java.function.Function in instance 

    of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1

我认为这足以重现错误,如果我需要提供更多信息,请告诉我。

【问题讨论】:

  • 你可以使用spark.read.csv作为你的数据,只需设置分隔符“\t”来读取tsv格式。
  • @Hitobat 谢谢,这让我更进一步。我可以显示()但是当我写它时抛出这个错误“NoClassDefFoundError:org/apache/parquet/hadoop/metadata/CompressionCodecName”
  • idk 为什么在终端提交而不是在 IntelliJ 中运行解决了这个问题。

标签: java apache-spark parquet


【解决方案1】:

可以使用一点其他方法,效果很好:

    JavaRDD<String> data = spark().read().textFile(args[0]).toJavaRDD();
    JavaRDD<DataModel> output = data.map(s -> {
        String[] parts = s.split("\t");
        return new DataModel(parts[0], parts[1]);
    });
    Dataset<Row> result = spark().createDataFrame(output, DataModel.class);

“DataModel”类看起来更简单,没有功能:

public class DataModel implements Serializable {
private final String timeCreate;
private final String cookieCreate;

public DataModel(String timeCreate, String cookieCreate) {
    this.timeCreate = timeCreate;
    this.cookieCreate = cookieCreate;
}

public String getTimeCreate() {
    return timeCreate;
}

public String getCookieCreate() {
    return cookieCreate;
}

}

【讨论】:

  • 我使用您的代码进行了测试运行,但仍然无法正常工作,而且 spark() 中的 () 语法错误。未达到 map() 中的代码,所以我认为这无关紧要。也许它与图书馆有关?我更新了导入的类,请问有什么不同吗?
  • 是的,在您的情况下,“spark”后面的括号不需要。如果调用操作,将到达“map”内的代码,例如“df.show(2);”作为我代码中的最后一条语句。
  • 是的,我知道 map() 将被其他操作调用,但是当我尝试在 map() 中添加断点或 print() 时,它们都没有到达(或者这是正常的?) .而且由于代码是相同的,我能想到的最后一件事是库
  • 从“地图”打印将在执行程序日志中可见,而不是在驱动程序节点上。检查映射器是否有效,只需打印“result.count()”。如果值不为零,则映射器工作正常。
猜你喜欢
  • 2016-04-16
  • 2014-11-25
  • 1970-01-01
  • 2018-11-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-01-18
  • 2020-08-13
相关资源
最近更新 更多