【发布时间】:2019-02-03 10:07:11
【问题描述】:
我正在尝试将文本文件转换为镶木地板文件。我只能从其他文件格式或用 scala/python 编写的代码中找到“如何转换为镶木地板”。 这是我想出的
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
private static final StructField[] fields = new StructField[]{
new StructField("timeCreate", DataTypes.StringType, false, Metadata.empty()),
new StructField("cookieCreate", DataTypes.StringType, false,Metadata.empty())
};//simplified
private static final StructType schema = new StructType(fields);
public static void main(String[] args) throws IOException {
SparkSession spark = SparkSession
.builder().master("spark://levanhuong:7077")
.appName("Convert text file to Parquet")
.getOrCreate();
spark.conf().set("spark.executor.memory", "1G");
WriteParquet(spark, args);
}
public static void WriteParquet(SparkSession spark, String[] args){
JavaRDD<String> data = spark.read().textFile(args[0]).toJavaRDD();
JavaRDD<Row> output = data.map((Function<String, Row>) s -> {
DataModel model = new DataModel(s);
return RowFactory.create(model);
});
Dataset<Row> df = spark.createDataFrame(output.rdd(),schema);
df.printSchema();
df.show(2);
df.write().parquet(args[1]);
}
args[0] 是输入文件的路径,args[1] 是输出文件的路径。这是简化的数据模型。 DateTime 字段在 set() 函数中的格式正确
public class DataModel implements Serializable {
DateTime timeCreate;
DateTime cookieCreate;
public DataModel(String data){
String model[] = data.split("\t");
setTimeCreate(model[0]);
setCookieCreate(model[1]);
}
这是错误。错误日志指向df.show(2),但我认为错误是由map() 引起的。我不知道为什么,因为我没有在代码中看到任何强制转换
>java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1
of type org.apache.spark.api.java.function.Function in instance
of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1
我认为这足以重现错误,如果我需要提供更多信息,请告诉我。
【问题讨论】:
-
你可以使用
spark.read.csv作为你的数据,只需设置分隔符“\t”来读取tsv格式。 -
@Hitobat 谢谢,这让我更进一步。我可以显示()但是当我写它时抛出这个错误“NoClassDefFoundError:org/apache/parquet/hadoop/metadata/CompressionCodecName”
-
idk 为什么在终端提交而不是在 IntelliJ 中运行解决了这个问题。
标签: java apache-spark parquet